现在的位置: 首页 > 综合 > 正文

利用MSMQ发送消息(对象)到NServiceBus终结点(不采用Send-Only方式)

2019年10月15日 ⁄ 综合 ⁄ 共 3853字 ⁄ 字号 评论关闭

待解决的问题:

某些应用程序需要异步发送消息到远端机器的NServiceBus终结点,但是又不希望知道接收并处理异步消息的是NServiceBus这种ESB终结点,也许异步消息处理者将来会换成其他的ESB终结点,所以不能采用Send-Only的方式,否则应用程序(消息发送方)还是需要引入NserviceBus.dll等类库。

所以我的思路是:

应用程序调用我的接口发送消息到指定的私有消息队列,然后NServiceBus终结点有一个线程,接收这样一个消息,这个消息没有实现IComman/Imessage接口,所以还要再将这个消息转换为NServiceBus能够处理的消息类型(实现ICommand,IMessage等接口,可以出发MessageHandler调用),然后将这个消息通过:

Bus.Send("EndPointName@localhost", message);

发送给本机的NServiceBus终结点。

后面会展示详细的代码。。。。

其实我还想过提供给应用程序的接口是一个WCF服务,服务的实现中采用Send-Only的方式将接收到的消息转换为NServiceBus能够识别的消息,然后采用Send方式发送到本机的NServiceBus终结点,但是我试了一下,WCF服务的实现中配置Send-Only方式那段代码会抛出一个很奇怪的异常,到现在我还没有分析出来为什么。

所以综上所述,提供给应用程序一个接口,将消息发送到我的ESB终结点,同时又没有任何耦合,我们采用的方式是直接操作消息队列,代码如下(关键片段),如要下载我的完整代码,地址为百度云盘

提供给应用程序的接口,包括消息定义,和消息发送接口:

namespace MSMQMessageSender
{
    [Serializable]
    public class AsynMessage
    {
        public AsynMessage() { }

        public int id { set; get; }
        public string body { set; get; }
    }
}
public static bool SendMessage(AsynMessage msg)
        {
            try
            {
                MessageQueue msmq = new MessageQueue(@"FormatName:Direct=TCP:192.1.11.186\Private$\NServiceBus.EndPoint"); //这里的消息队列地址是能够将消息发送到远端机器的消息队列的关键。
                msmq.Formatter = new XmlMessageFormatter(new Type[] { typeof(AsynMessage) });

                System.Messaging.Message message = new System.Messaging.Message();
                message.Label = "消息标题";
                message.Body = msg;
                msmq.Send(message);
            }
            catch (Exception ee)
            {
                Console.WriteLine("发送消息出现异常,原因是:" + ee.Message);
                return false;
            }
            return true;
        }

下面是NserviceBus中,首先是启动的时候需要创建消息队列:

namespace CBIP.Server
{
    class EndConfigPoint : IConfigureThisEndpoint, AsA_Publisher { }

    class ConfiguringTheDistributorWithTheFluentApi : INeedInitialization
    {
        public void Init()
        {
            if (!MessageQueue.Exists(@".\Private$\NServiceBus.EndPoint"))
            {
                System.Messaging.MessageQueue.Create(@".\Private$\NServiceBus.EndPoint");
            }
        }
    }
}

然后定义NserviceBus中能够识别的消息:

namespace NServiceBus.Messages
{
    [Serializable]
    public class CBIPAsynMessage : ICommand
    {
        public int id { get; set; }
        public string body { get; set; }
    }
}

然后是在NServiceBus的Start()方法中开启一个轮训线程,轮训消息队列中的消息:

namespace CBIP.Server
{
    public class ServerEndPoint : IWantToRunWhenBusStartsAndStops
    {
        public IBus Bus { get; set; }

        public void Start()
        {
            Thread thread = new Thread(MyThread);
            thread.Start();        
        }

        public void MyThread()
        {

            while (true)
            {
                MessageQueue MQ = new MessageQueue(@".\Private$\NServiceBus.EndPoint");

                //调用MessageQueue的Receive方法接收消息
                System.Messaging.Message message = null;
                try
                {
                    message = MQ.Receive(TimeSpan.FromSeconds(5));
                }
                catch (MessageQueueException ee)
                {
                    message = null;
                    Console.WriteLine("超时:" + ee.Message);
                }

                if (message != null)
                {                    
                    message.Formatter = new XmlMessageFormatter(new Type[] { typeof(MSMQMessageSender.AsynMessage) });
                    AsynMessage msg = (AsynMessage)message.Body;
                    Console.WriteLine(msg.id + ", " + msg.body);
                    
                    CBIPAsynMessage cbipMsg = new CBIPAsynMessage() //转换成NServiceBus能够识别的消息
                    {
                        id = msg.id,
                        body = msg.body
                    };
                    Bus.Send("CBIP.Server@localhost", cbipMsg);<span style="white-space:pre">	</span>//发送到本机NServiceBus节点
                    Console.WriteLine("发送到ESB完成");
                    
                }
                else
                {
                    Console.WriteLine("没有找到消息!");
                }                
            }
        }

        public void Stop()
        {

        }

    }
}

就这样在NServiceBus的消息处理者中就能正确收到消息了:

namespace CBIP.Server
{
    public class AsynMessageHandler : IHandleMessages<CBIPAsynMessage>
    {
        public void Handle(CBIPAsynMessage message)
        {
            Console.WriteLine("AsynMessageHandler收到一个消息");
        }
    }
}

直接以进程的方式启动NServiceBus终结点,上面的代码是可以正确工作的,但是当我将我的NServiceBus终结点安装位Windows NT服务之后,问题来了,轮询线程无论如何也收不到消息,然后我去查看消息队列NServiceBus.EndPoint,发现里面是有消息的,那就是轮询线程没办法取出来。


后来调试才发现时没有访问权限,错误是“Access to Message Queuing System id denied”

网上查了一下是需要在创建消息队列的时候,给指定用户配置访问权限的,NServiceBus中创建队列的代码改为下面这样就可以了:

if (!MessageQueue.Exists(@".\Private$\NServiceBus.EndPoint"))
{
        MessageQueue mq = System.Messaging.MessageQueue.Create(@".\Private$\NServiceBus.EndPoint");
	mq.SetPermissions("Administrator", MessageQueueAccessRights.FullControl);
}

======================================= 华丽的分割线 ==============================

另外,正常来说,NserviceBus的Master终结点安装了NServiceBus的软件,有实现了IConfigureThisEndpoint, AsA_Publisher { }这两个接口的类,有消息处理者,寄宿到NServiceBus.Host.exe,填写好命令行参数NServiceBus.Integration和NServiceBus.Master,并且电脑安装了消息队列之后,是可以没有问题地启动起来,并自动创建消息队列的,我的项目中开始也是这样,但是不知道为什么一段时间之后启动报错:“无法创建消息队列,或者没有对应权限”。

这个问题还没找到原因,目前采用的办法只能是显示判断是否存在消息队列,不存在就显示创建一个。

抱歉!评论已关闭.