现在的位置: 首页 > 综合 > 正文

消息队列的一些简述

2018年05月04日 ⁄ 综合 ⁄ 共 5402字 ⁄ 字号 评论关闭

http://hi.baidu.com/sai5d/item/0a7b95233992f1102b0f1c93

消息队列的一些简述

先说一下公共队列和专用队列,公共队列只能建立在域中,或者说建立在活动目录中,而工作组的计算机只能创建专用队列。

两个方法:
MessageQueue.Create方法,这个方法的路径只能使用

公共队列

MachineName\QueueName

专用队列

MachineName\Private$\QueueName

如上的路径方式,所以在工作组模式下,不支持远程计算机执行此方法,而专有队列应该在远程计算机上是无法创建的。

MessageQueue.Exists方法,在MSDN中对它有这样的描述:
Exists 是代价高昂的操作。仅当必要时才在应用程序内使用它。
Exists 方法不支持 前缀
无法调用 Exists 来验证远程专用队列是否存在。
在工作组中的计算机,只能用它来验证本机的专用队列。远程验证的也只能是公共队列。

主要谈谈格式名跟路径名:
路径名:
公共队列:machinename\queuename
专有队列:machinename\Private$\queuename
日志队列:machinename\queuename\Journal$
机器日志队列:machinename\Journal$
机器死信队列:machinename\DeadLetter$
机器事务死信队列:machinename\XactDeadLetter$
本机可以用.来代替机器名,远程专有队列只能使用格式名来访问,专有队列的路径名只能用于本地。

格式名:
在断开连接的情况下,要发送消息就要使用格式名。用它来作为MessageQueue的构造函数的参数。
公用格式名:
下面是用来引用公用队列和与其相关的队列日志的通用格式:

FormatName:PUBLIC=QueueGUID

FormatName:PUBLIC=QueueGUID;JOURNAL

专有队列:
下面是用于引用专用队列及其相关的日志的通用格式:

FormatName:PRIVATE=ComputerGUID\QueueNumber

FormatName:PRIVATE=ComputerGUID\QueueNumber;JOURNAL

首先要说一下,这种模式的格式名不支持工作组计算机上的专有队列的远程访问。而这个computerguid有人说是在注册表中的HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Cryptography中的MachineGuid而我在本地计算机上创建了队列消息后,在消息的属性里的发件人tab中,能看到源计算机的guid,所以我也不知道这两个地方的GUID到底哪个是这里的computerguid。而QueueNumber则是在Drive:\WINDOWS\system32\msmq\storage\lqs目录下可以找到,在这个目录下,有这样的文件,比如:00000013.b6c1d784,前面的00000013就应该是QueueNumber。

直接访问

FormatName:DIRECT=OS:Machinename\Private$\queuename
FormatName:DIRECT=protocol:ip\Private$\queuename
使用这两种方式,则可以使用它们来远程访问工作组中的计算机上的专有队列,不同的地方就是os的要用机器名,protocol的是要写ip地址,protocol可以是http https tcp等。

FormatName:DIRECT=ComputerAddress\PublicQueueName

FormatName:DIRECT=ComputerAddress\PRIVATE$\PrivateQueueName

FormatName:DIRECT=ComputerAddress\SYSTEM$;SystemQueueName

FormatName:DIRECT=HTTP://URL_Address/msmq/PublicQueueName

FormatName:DIRECT=HTTPS://URL_Address/msmq/PublicQueueName

FormatName:DIRECT=HTTPS://URL_Address/msmq/private$\PrivateQueueName

可用两种形式指定计算机地址:作为目标计算机的网络地址(这包括网络协议 TCP),或者作为基础操作系统本身所支持的目标计算机的任何名称(此处使用 OS 来表明将使用计算机本地协议)。以下示例阐释了这两种形式:

FormatName:DIRECT=TCP:157.18.3.1\MyPublicQueue

FormatName:DIRECT=OS:pc001.microsoft.com\MyPublicQueue

FormatName:DIRECT=OS:machinename\Private$\MTest

FormatName:DIRECT=tcp:192.168.1.2\Private$\MTest

对于 URL 命名的队列,队列名前面是用斜杠隔开的字符串 msmq。例如,可以使用下面的格式名引用“MyPublicQueue”

FormatName:DIRECT=HTTP://URL_Address/msmq/MyPublicQueue

最后略提一下安全性及服务客户端配置的问题,由于我没有活动目录我也没办法测试那种环境下的情况,在工作组中,两台机器都要安装Message Queuing服务并启动,启动账户不要更改,使用本地系统账户就可以了。另外要注意的一点是,在服务端当创建了队列以后,要赋给这个队列相应的安全设置。如果是在工作组,远程的对队列的访问都会使用ANONYMOUS LOGON,所以在创建队列后,可以在计算机管理的服务和应用程序中的消息队列中的相应队列上右键属性-安全中对ANONYMOUS LOGON进行相应的安全设置,否则像receive等一些操作是无法进行的从远程。

消息队列的创建,如果创建的队列已经存在则会抛错:
MessageQueue tq = MessageQueue.Create(@"Tamaki\Private$\tq");

发送消息:
queue.Send("a messagequeue test", "test label");

消息格式化器:
有XmlMessageFormatter,BinaryMessageFormattert和ActiveXMessageFormatter                
queue.Formatter默认的使用的是XmlMessageFormatter,发送和接收要使用相同的格式化器,binary的格式化器发送的消息比xml格式化器的长度要短,activex的格式化器格式化后的消息可以使用com对像来读取,反之亦然。发送前或接收前设置格式化器
queue.Formatter = new BinaryMessageFormatter();

queue.Formatter = new XmlMessageFormatter(new string[] { "System.String" });

发送复杂的消息:二进制格式化器发送的对像,需要使用serializable属性来修饰或者实现ISerializable接口。

接收消息:
Message m = queue.Receive();
使用receive方法,接收到消息后就会删除掉队列中的消息,如果队列中没有消息,此方法会一直延时等待,接收到一条消息就执行完此方法。如果只想接收消息而不删除消息,则使用queue.Peek();方法。

枚举队列中的消息:
foreach (Message m in queue)
Console.WriteLine(m.Body);
或者使用MessageEnumerator
using (MessageEnumerator me = queue.GetMessageEnumerator2())
{
while (me.MoveNext())//可以在MoveNext方法中指定等待的时间,过时后就不会继续等待消息了
{
Console.WriteLine(me.Current.Body);

}
}

异步读取消息:
queue.PeekCompleted += new PeekCompletedEventHandler(queue_PeekCompleted);
queue.BeginPeek();

void queue_PeekCompleted(object sender, PeekCompletedEventArgs e)
{
MessageQueue mq = (MessageQueue)sender;
Message m = mq.EndPeek(e.AsyncResult);
Console.WriteLine("async over");
Console.WriteLine(m.Body);
}

消息的优先级和可恢复性:
Message mm = new Message("message message message",new BinaryMessageFormatter());

mm.Recoverable = true;
mm.Priority = MessagePriority.High;
queue.Send(mm);

通过id来接收消息:
using (MessageEnumerator me = queue.GetMessageEnumerator2())
{
while (me.MoveNext())
{

string id = me.Current.Id;
queue.ReceiveById(id);

}
}

确认队列和使用死信队列:

//当消息mm被receive(注意是通过receive方法接收)之后,创建的确认队列Tamaki\Private$\confQueue会收到一条确认的消息,可以使用confq.ReceiveByCorrelationId(mm.Id)在确认队列中收取相关消息的确认消息。
Message mm = new Message("message message message",new BinaryMessageFormatter());
mm.AdministrationQueue = new MessageQueue(@"Tamaki\Private$\confQueue");
mm.AcknowledgeType = AcknowledgeTypes.FullReceive;

//使用死信队列,则在
mm.TimeToBeReceived = new TimeSpan(1, 1, 1);
mm.TimeToReachQueue = new TimeSpan(0, 0, 20);设置的时间之后,还没有被接收到或者到达目标队列,则被发送的消息会被发送到死信队列
mm.UseDeadLetterQueue = true;

响应队列:
Message mm = new Message("message message message",new BinaryMessageFormatter());
mm.ResponseQueue = new MessageQueue(@"Tamaki\Private$\rqueue");
queue.send(mm);
Message rmm = queue.receiveById(mm.Id);
Message respm = new Message("a response");
respm.CorrelationId = mm.Id;
rmm.ResponseQueue.Send(respm);

事务队列:
可恢复的消息不能保证消息消息的接收顺序,也不能保证消息只被接收一次。需要保证接收顺序和只被接收一次,则需要使用事务队列。
事务的处理是很短的,但消息队列的本质则是发送和接收会比较长。在事务队列中,分成三个事务,第一个是把消息发送到队列中,第二个事务把消息写入网络,第三个是接收消息。

//创建队列的时候需要设置第二个参数为true
MessageQueue tq = MessageQueue.Create(@"Tamaki\Private$\tq",true);
//需要使用MessageQueueTransaction对像,成功则进行commit否则就调用Abort方法取消写入的消息
MessageQueueTransaction mtran = new MessageQueueTransaction();
try
{
mtran.Begin();
tq.Send("tran test 1", "tran test", mtran);
tq.Send("tran test 2", "tran test", mtran);
tq.Send("tran test 3", "tran test", mtran);
mtran.Commit();
}
catch
{
mtran.Abort();
}

foreach (Message m in tq)
Console.WriteLine(m.Body);

消息队列的安装:
应用程序没有创建消息队列的权限的时候,也就是无权执行create方法的时候,可以创建安装程序,在安装程序里MessageQueueInstaller类来创建队列。

抱歉!评论已关闭.