当前位置: 首页 > 图文教程 > 网络编程 > ASP.NET > .Net下的MSMQ的同步异步调用

ASP.NET
asp.net GridView控件中模板列CheckBox全选、反选、取消
asp.net GridView 删除时弹出确认对话框(包括内容提示)
asp.net DropDownList 三级联动下拉菜单实现代码
asp DataTable添加列和行的三种方法
Asp.net 页面调用javascript变量的值
asp.net 长文章通过设定的行数分页
asp.net 定时间点执行任务的简易解决办法
asp.net 页面延时五秒,跳转到另外的页面
asp.net 动态输出透明gif图片
asp.net DataList与Repeater用法区别
asp.net Javascript获取CheckBoxList的value
asp.net程序在调式和发布之间图片路径问题的解决方法
asp.net下生成英文字符数字验证码的代码
asp.net 页面版文本框智能提示JSCode (升级版)
ASP.NET URL伪静态重写实现方法
ASP.NET 2.0 中Forms安全认证
asp.net 动态添加多个用户控件
asp.net Repeater显示父子表数据,无闪烁
asp.net 无法获取的内部内容,因为该内容不是文本 的解决方法
asp.net GridView排序简单实现

ASP.NET 中的 .Net下的MSMQ的同步异步调用


出处:互联网   整理: 软晨网(RuanChen.com)   发布: 2009-08-14   浏览: 53 ::
收藏到网摘: n/a

一、MSMQ简介

MSMQ(微软消息队列)是Windows操作系统中消息应用程序的基础,是用于创建分布式、松散连接的消息通讯应用程序的开发工具。消息队列

和电子邮件有着很多相似处,他们都包含多个属性,用于保存消息,消息类型中都指出发送者和接收者的地址;然而他们的用处却有着很大的

区别:消息队列的发送者和接收者是应用程序,而电子邮件的发送者和接收者通常是人。如同电子邮件一样,消息队列的发送和接收也不需要

发送者和接收者同时在场,可以存储在消息队列或是邮件服务器中。

二、消息队列的安装

默认情况下安装操作系统是不安装消息队列的,你可以在控制面板中找到添加/删除程序,然后选择添加/删除Windows组件一项,然后选择应

用程序服务器,双击它进入详细资料中选择消息队列一项进行安装,如图:

三、消息队列类型

消息对列分为3类:
 
公共队列

MachineName\QueueName

能被别的机器所访问,如果你的多个项目中用到消息队列,那么你可以把队列定义为公共队列
 
专用队列

MachineName\Private$\QueueName

只针对于本机的程序才可以调用的队列,有些情况下为了安全起见定义为私有队列。

日志队列

MachineName\QueueName\Journal$

四、消息队列的创建

MessageQueue Mq=new MessageQueue(“.\\private$\\Mymq”);

通过Path属性引用消息队列的代码也十分简单:

MessageQueue Mq=new MessageQueue();

Mq.Path=”.\\private$\\Mymq”;

使用 Create 方法可以在计算机上创建队列:

System.Messaging.MessageQueue.Create(@".\private$\Mymq");

这里注意由于在C#中要记住用反斜杠将“\”转义。

由于消息对列所放置的地方经常改变,所以建议消息队列路径不要写死,建议放在配置文件中。

五、消息的发送

消息的发送可以分为简单消息和复杂消息,简单消息类型就是常用的数据类型,例如整型、字符串等数据;复杂消息的数据类型通常对应于系统中的复杂数据类型,例如结构,对象等等。

Mq.Send("Hello!");

在这里建议你可以事先定义一个对象类,然后发送这个对象类的实例对象,这样以后无论在增加什么发送信息,只需在对象类中增加相应的属性即可。

六、消息的接收和阅读

(1)同步接收消息

  接收消息的代码很简单:

 Mq.Receive();
        Mq.Receive(TimeSpan timeout); //设定超时时间
 Mq.ReceiveById(ID);
        Mq.Peek();
 
通过Receive方法接收消息同时永久性地从队列中删除消息;

通过Peek方法从队列中取出消息而不从队列中移除该消息。

如果知道消息的标识符(ID),还可以通过ReceiveById方法和PeekById方法完成相应的操作。

(2)异步接受消息
  
利用委托机制:MessQueue.ReceiveCompleted +=new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
 
(3)消息阅读

在应用程序能够阅读的消息和消息队列中的消息格式不同,应用程序发送出去的消息经过序列化以后才发送给了消息队列
而在接受端必须反序列化,利用下面的代码可以实现:

 public void mq_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e)
  {
   System.Messaging.Message m = MessQueue.EndReceive(e.AsyncResult);
   m.Formatter = new System.Messaging.XmlMessageFormatter(new string[]{"System.String,mscorlib"});
   Console.WriteLine("Message: " + (string)m.Body);
   MessQueue.BeginReceive() ;

  }

反序列化还有另一种写法:m.Formatter = new XmlMessageFormatter ( new Type [] { typeof (string) } );

七、由于消息队列的代码有些是固定不便的,所以把这些代码封装成一个类方便以后使用:

以下为引用的内容:
1using System;
  2using System.Messaging;
  3using System.Threading;

  5
  6namespace LoveStatusService
  7{
  8    /**//// <summary>
  9    /// Summary description for Msmq.
 10    /// </summary>
 11    public class Msmq
 12    {
 13        public Msmq()
 14        {
 15            //
 16            // TODO: Add constructor logic here
 17            //
 18        }
 19
 20       
 21        private MessageQueue _messageQueue=null;
 22        //最大并发线程数
 23        private static int MAX_WORKER_THREADS=Convert.ToInt32( System.Configuration.ConfigurationSettings.AppSettings["MAX_WORKER_THREADS"].ToString());
 24        //Msmq路径
 25        private static string MsmqPath=System.Configuration.ConfigurationSettings.AppSettings["LoveStatusMQPath"];
 26        //等待句柄
 27        private WaitHandle[] waitHandleArray = new WaitHandle[MAX_WORKER_THREADS];
 28        //任务类型
 29        //1. Send Email 2. Send Message  3. Send Email and Message
 30        private string TaskType=System.Configuration.ConfigurationSettings.AppSettings["TaskType"];
 31        public MessageQueue MessQueue
 32        {
 33            get
 34            {
 35           
 36                if (_messageQueue==null)
 37                {
 38                    if(MessageQueue.Exists(MsmqPath))
 39                    {
 40                        _messageQueue = new MessageQueue(MsmqPath);   
 41                    }
 42                    else
 43                    {
 44                        _messageQueue = MessageQueue.Create(MsmqPath);   
 45                    }   
 46                }
 47               
 48
 49                return _messageQueue;
 50            }
 51        }
 52       
 53
 54    Private Method#region Private Method
 55
 56        private void mq_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e)
 57        {
 58            MessageQueue mqq = (MessageQueue)sender;
 59            System.Messaging.Message m = mqq.EndReceive(e.AsyncResult);
 60            //m.Formatter = new System.Messaging.XmlMessageFormatter(new string[]{"System.String,mscorlib"});
 61            m.Formatter =new System.Messaging.XmlMessageFormatter(new Type[] {typeof(UserObject)}) ;
 62            //log.Info("Receive UserID: " + (string)m.Body) ;
 63            UserObject obj=(UserObject)m.Body ;
 64            long curUserId=obj.curUserID ;
 65            long oppUserId=obj.oppUserID;
 66            string curUserName=obj.curUserName;
 67            string oppUserName=obj.oppUserName;
 68            string curEmail=obj.curEmail ;
 69            string oppEmail=obj.oppEmail;
 70            string subject =obj.subject ;
 71            string body=obj.body ;
 72            //AppLog.log.Info("curUserId:"+curUserId) ;
 73            //AppLog.log.Info("oppUserId:"+oppUserId) ;
 74            AppLog.log.Info("==type="+TaskType) ;
 75            switch(TaskType)
 76            {
 77                //Email
 78                case "1":
 79                    EmailForMQ.SendEmailForLoveStatus(curEmail,oppEmail,curUserName,oppUserName,subject) ;
 80                    AppLog.log.Info("==Send to=="+oppEmail) ;
 81                    break;
 82                //Message
 83                case "2":
 84                    MessageForMQ.SendMessage(curUserId,oppUserId,subject,body) ;
 85                    AppLog.log.Info("==Send Msg to=="+oppUserId) ;
 86                    break;
 87                //Email and Message       
 88                case "3":
 89                    EmailForMQ.SendEmailForLoveStatus(curEmail,oppEmail,curUserName,oppUserName,subject) ;
 90                    AppLog.log.Info("==Send to=="+oppEmail) ;
 91                    MessageForMQ.SendMessage(curUserId,oppUserId,subject,body) ;
 92                    AppLog.log.Info("==Send Msg to=="+oppUserId) ;
 93                    break;
 94                default:
 95                    break;
 96
 97            }
 98            mqq.BeginReceive() ;
 99
100        }
101
102    #endregion
103
104    Public Method#region Public Method
105
106        //一个将对象发送到队列的方法,这里发送的是对象
107        public void SendUserIDToMQ(object arr)
108        {
109            MessQueue.Send(arr) ;
110            Console.WriteLine("Ok") ;
111            Console.Read() ;
112        }
113
114        //同步接受队列内容的方法
115        public void ReceiveFromMQ()
116        {
117            Message ms=new Message() ;
118           
119            //ms=MessQueue.Peek();
120            try
121            {
122                ms=MessQueue.Receive(new TimeSpan(0,0,5));
123                if(ms!=null)
124                {
125                    ms.Formatter = new XmlMessageFormatter ( new Type [] { typeof (string) } );
126                    AppLog.log.Info((string)ms.Body)  ;
127                }
128            }
129            catch(Exception ex)
130            {
131               
132            }
133           
134       
135        }
136
137        //开始监听工作线程
138        public  void startListen()
139        {
140            AppLog.log.Info("--Thread--"+MAX_WORKER_THREADS) ;
141            MessQueue.ReceiveCompleted +=new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
142           
143            //异步方式,并发
144           
145            for(int i=0; i<MAX_WORKER_THREADS; i++)
146            {
147                // Begin asynchronous operations.
148                waitHandleArray[i] = MessQueue.BeginReceive().AsyncWaitHandle;
149            }
150
151            AppLog.log.Info("------Start Listen--------") ;
152
153            return;
154
155        }
156
157
158        //停止监听工作线程
159        public void stopListen()
160        {
161
162            for(int i=0;i<waitHandleArray.Length;i++)
163            {
164
165                try
166                {
167                    waitHandleArray[i].Close();
168                }
169                catch
170                {
171                    AppLog.log.Info("---waitHandleArray[i].Close() Error!-----") ;
172                }
173
174            }
175
176            try
177            {
178                // Specify to wait for all operations to return.
179                WaitHandle.WaitAll(waitHandleArray,1000,false);
180            }
181            catch
182            {
183                AppLog.log.Info("---WaitHandle.WaitAll Error!-----") ;
184            }
185            AppLog.log.Info("------Stop Listen--------") ;
186
187        }
188
189    #endregion
190   
191   
192
193   
194    }
195}
196

UserObject的代码

以下为引用的内容:
1using System;
  2
  3namespace Goody9807
  4{
  5    /**//// <summary>
  6    /// 用与在MQ上传输数据的对象
  7    /// </summary>
  8    public class UserObject
  9    {
 10        public UserObject()
 11        {
 12            //
 13            // TODO: Add constructor logic here
 14            //
 15        }
 16
 17        private long _curUserID;
 18        public long curUserID
 19        {
 20            get
 21            {
 22                return _curUserID;
 23            }
 24            set
 25            {
 26                _curUserID=value;
 27            }
 28        }
 29
 30        private  string _curUserName="";
 31        public string curUserName
 32        {
 33            get
 34            {
 35                return _curUserName;
 36            }
 37            set
 38            {
 39                _curUserName=value;
 40            }
 41        }
 42
 43        private string _curEmail="";
 44        public string curEmail
 45        {
 46            get
 47            {
 48                return _curEmail;
 49            }
 50            set
 51            {
 52                _curEmail=value;
 53            }
 54        }
 55
 56
 57        private long _oppUserID;
 58        public long oppUserID
 59        {
 60            get
 61            {
 62                return _oppUserID;
 63            }
 64            set
 65            {
 66                _oppUserID=value;
 67            }
 68        }
 69
 70        private  string _oppUserName="";
 71        public string oppUserName
 72        {
 73            get
 74            {
 75                return _oppUserName;
 76            }
 77            set
 78            {
 79                _oppUserName=value;
 80            }
 81        }
 82
 83        private string _oppEmail="";
 84        public string oppEmail
 85        {
 86            get
 87            {
 88                return _oppEmail;
 89            }
 90            set
 91            {
 92                _oppEmail=value;
 93            }
 94        }
 95
 96        private string _subject ="";
 97        public string subject
 98        {
 99            get
100            {
101                return _subject;
102            }
103            set
104            {
105                _subject=value;
106            }
107        }
108
109        private string _body="";
110        public string body
111        {
112            get
113            {
114                return _body;
115            }
116            set
117            {
118                _body=value;
119            }
120        }
121    }
122}
123

另一个同事写的封装类

以下为引用的内容:
1using System;
  2
  3using System.Threading;
  4
  5using System.Messaging;
  6
  7
  8
  9namespace Wapdm.SmsApp
 10
 11{
 12
 13     /**//// <summary>
 14
 15     /// <para>
 16
 17     /// A Logger implementation that writes messages to a message queue.
 18
 19     /// The default event formatter used is an instance of XMLEventFormatter
 20
 21     /// </para>
 22
 23     /// </summary>
 24
 25     public sealed class MsgQueue
 26
 27     {
 28
 29
 30
 31         private const string BLANK_STRING                   = "";
 32
 33         private const string PERIOD                         = @".\private$";  //".";
 34
 35         private const string ELLIPSIS                       = "";   
 36
 37   
 38
 39         private string serverAddress;
 40
 41         private string queueName;
 42
 43         private string queuePath;
 44
 45        
 46
 47         private bool IsContextEnabled; 
 48
 49   
 50
 51         private MessageQueue queue;
 52
 53   
 54
 55         private object queueMonitor                         = new object();
 56
 57   
 58
 59         private MsgQueue() {}
 60
 61
 62
 63         public static MsgQueue mq = null;
 64
 65         public static WaitHandle[] waitHandleArray = new WaitHandle[Util.MAX_WORKER_THREADS];
 66
 67    
 68
 69         public MsgQueue(string _serverAddress, string _queueName, string _summaryPattern)
 70
 71         {
 72
 73              if ((_serverAddress == null) || (_queueName == null) || (_summaryPattern == null))
 74
 75              {
 76
 77                   throw new ArgumentNullException();
 78
 79              }
 80
 81              ServerAddress = _serverAddress;
 82
 83              QueueName = _queueName;
 84
 85              IsContextEnabled = true;            
 86
 87         }
 88
 89   
 90
 91         public MsgQueue(string _serverAddress, string _queueName)
 92
 93         {
 94
 95              if ((_serverAddress == null) || (_queueName == null))
 96
 97              {
 98
 99                   throw new ArgumentNullException();
100
101              }
102
103              ServerAddress = _serverAddress;
104
105              QueueName = _queueName;
106
107              IsContextEnabled = true;
108
109         }
110
111   
112
113         public MsgQueue(string _queueName)
114
115         {
116
117              if (_queueName == null)
118
119              {
120
121                   throw new ArgumentNullException();
122
123              }
124
125              serverAddress = PERIOD;
126
127              QueueName = _queueName;
128
129              IsContextEnabled = true;            
130
131              if ( IsContextEnabled == false )
132
133                   throw new ArgumentNullException();
134
135         }
136
137   
138
139         public string ServerAddress
140
141         {
142
143              get
144
145              {
146
147                   return serverAddress;
148
149              }
150
151              set
152
153              {
154
155                   if (value == null)
156
157                   {
158
159                       value = PERIOD;
160
161                   }
162
163                   value = value.Trim();
164
165                   if (value.Equals(BLANK_STRING))
166
167                   {
168
169                       throw new ArgumentException("Invalid value (must contain non-whitespace characters)");
170
171                  }
172
173                   lock (queueMonitor)
174
175                   {
176
177                       serverAddress = value;
178
179                       queuePath = serverAddress + '\\' + queueName;
180
181                       InitializeQueue();
182
183                   }
184
185              }
186
187         }
188
189
190
191         public string QueueName
192
193         {
194
195              get
196
197              {
198
199                   return queueName;
200
201              }
202
203              set
204
205              {
206
207                   if (value == null)
208
209                   {
210
211                       throw new ArgumentNullException();
212
213                   }
214
215                   value = value.Trim();
216
217                   if (value.Equals(BLANK_STRING))
218
219                   {
220
221                       throw new ArgumentException("Invalid value (must contain non-whitespace characters)");
222
223                   }
224
225                   lock (queueMonitor)
226
227                   {
228
229                       queueName = value;
230
231                       queuePath = serverAddress + '\\' + queueName;
232
233                       InitializeQueue();
234
235                   }
236
237              }
238
239         }
240
241   
242
243         private void InitializeQueue()
244
245         {
246
247              lock (queueMonitor)
248
249              {            
250
251                   if (queue != null)
252
253                   {
254
255                       try { queue.Close(); }
256
257                       catch {}
258
259                       queue = null;
260
261                   }
262
263
264
265                   try
266
267                   {
268
269                       if(!MessageQueue.Exists(queuePath))
270
271                            MessageQueue.Create(queuePath);
272
273                   }
274
275                   catch {}
276
277                   try
278
279                   {
280
281                       queue = new MessageQueue(queuePath);
282
283                       queue.SetPermissions("EveryOne",MessageQueueAccessRights.FullControl);
284
285                       queue.Formatter = new XmlMessageFormatter(new Type[] {typeof(MoMsg)});
286
287                   }
288
289                   catch (Exception e)
290
291                   {
292
293                       try { queue.Close(); }
294
295                       catch {}
296
297                       queue = null;
298
299                       throw new ApplicationException("Couldn't open queue at '" + queuePath + "': " + e.GetType().FullName + ": " + e.Message);
300
301                   }
302
303
304
305              }
306
307         }
308
309   
310
311         private  void AcquireResources()
312
313         {
314
315              InitializeQueue();
316
317         }
318
319   
320
321         public  void ReleaseResources()
322
323         {
324
325              lock (queueMonitor)
326
327              {
328
329                   if (queue != null)
330
331                   {
332
333                       try
334
335                       {
336
337                            queue.Close();
338
339                       }
340
341                       catch {}
342
343                       queue = null;
344
345                   }
346
347              }   
348
349         }
350
351   
352
353         //阻塞方式
354
355         public MoMsg Read( )
356
357         {
358
359              MoMsg _event = null;            
360
361              lock (queueMonitor)
362
363              {
364
365                   if (queue == null)
366
367                   {
368
369                       InitializeQueue();
370
371                   }
372
373                   try
374
375                   {
376
377                       Message message = queue.Receive( new TimeSpan(0,0,1) );//等待10秒
378
379                       _event = (MoMsg) (message.Body);
380
381                       return _event;
382
383                   }
384
385                   catch (Exception )
386
387                   {
388
389                       try { queue.Close(); }
390
391                       catch {}
392
393                       queue = null;
394
395                   }           
396
397              }
398
399              return null;
400
401         }
402
403
404
405         public void Write(MoMsg _event)
406
407         {
408
409              if (_event == null)
410
411              {
412
413                   return;
414
415              }
416
417              lock (queueMonitor)
418
419              {
420
421                   try
422
423                   {
424
425                       if (queue == null)
426
427                       {
428
429                            InitializeQueue();
430
431                       }
432
433                  
434
435                       Message message = new Message();
436
437                       message.Priority = _event.Priority;
438
439                       message.Recoverable = true;
440
441                       message.Body = _event; //eventFormatter.Format(_event);
442
443
444
445                       queue.Send(message);
446
447                   }
448
449                   catch (Exception e)
450
451                   {
452
453                       try { queue.Close(); }
454
455                       catch {}
456
457                       queue = null;
458
459                       Util.Log.log("Couldn't write Message (" + e.GetType().FullName + ": " + e.Message + ")");
460
461                   }           
462
463              }
464
465         }
466
467
468
469         public static bool statusTest()
470
471         {
472
473              bool reValue = false;
474
475              try
476
477              {
478
479                   MessageEnumerator re = mq.queue.GetMessageEnumerator();
480
481                   bool rev = re.MoveNext();
482
483                   reValue = true;
484
485              }
486
487              catch
488
489              {
490
491                   reValue = false;
492
493              }
494
495
496
497              return reValue;
498
499         }
500
501
502
503         public static void startListen()
504
505         {
506
507              mq = new MsgQueue(Util.MqName);
508
509
510
511              mq.queue.ReceiveCompleted +=new ReceiveCompletedEventHandler(queue_ReceiveCompleted);
512
513             
514
515              //异步方式,并发
516
517              for(int i=0; i<Util.MAX_WORKER_THREADS; i++)
518
519              {
520
521                   // Begin asynchronous operations.
522
523                   waitHandleArray[i] =
524
525                       mq.queue.BeginReceive().AsyncWaitHandle;
526
527              }
528
529
530
531              return;
532
533         }
534
535
536
537         public static void stopListen()
538
539         {
540
541
542
543              for(int i=0;i<waitHandleArray.Length;i++)
544
545              {
546
547                   try
548
549                   {
550
551                       waitHandleArray[i].Close();
552
553                   }
554
555                   catch
556
557                   {
558
559                       //忽略错误
560
561                   }
562
563              }
564
565
566
567              try
568
569              {
570
571                  // Specify to wait for all operations to return.
572
573                   WaitHandle.WaitAll(waitHandleArray,1000,false);
574
575              }
576
577              catch
578
579              {
580
581                   //忽略错误
582
583              }
584
585         }
586
587
588
589         private static void queue_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
590
591         {
592
593              // Connect to the queue.
594
595              MessageQueue mqq = (MessageQueue)sender;
596
597
598
599              // End the asynchronous Receive operation.
600
601              Message m = mqq.EndReceive(e.AsyncResult);
602
603
604
605              Util.ProcessMo((MoMsg)(m.Body));
606
607
608
609              if(Util.isRunning)
610
611              {
612
613                   // Restart the asynchronous Receive operation.
614
615                   mqq.BeginReceive();
616
617              }
618
619           
620
621              return;
622
623         }
624
625     }
626
627}