现在的位置: 首页 > 综合 > 正文

使用C#和IBM MQSeries进行消息发布订阅(二)

2013年09月17日 ⁄ 综合 ⁄ 共 3356字 ⁄ 字号 评论关闭

 

使用C#和IBM MQSeries进行消息发布订阅(二)

开发环境:MQSeries7.0 Visual studio 2008

2011-8-19 创建

C#发布订阅的程序主要参考C:\Program Files\IBM\WebSphere MQ\tools\dotnet\samples\cs下的MQPubSubSample.cs,但总觉得IBM提供的发布订阅的例子不是很好,因为通常情况下我们的发布和订阅程序是两个部分,而在例子中将发布和订阅混杂在一起,以至于搞不清楚哪些选项用于发布,哪些用于订阅上。因此写了两个简化的函数,以便在C#中更好的使用MQ7发布订阅功能。

实际MQ7实现的发布订阅很简单,例如在托管队列的情况下,发布者需要提供的信息包括
1,主题
2,队列管理器
3,发布内容

订阅者需要提供
1,主题
2,订阅名称(持久订阅情况)
3,队列管理器

然后发布者的流程为
1,连接队列管理器
2,创建发布的消息
3,以发布的方式调用队列管理器的PUT方法,将消息发布

订阅者相对复杂一些
1,连接队列管理器
2,创建订阅,在创建订阅时选择持久订阅还是非持久订阅
3,创建订阅用的消息
4,通过订阅的GET方法取得消息

相关代码如下:

在使用MQ客户端的情况下,首先初始化MQ环境
 public void initMQ()
        {
            ////Set up WebSphere MQ environment
            MQEnvironment.Hostname = "10.52.22.24"; // server
            MQEnvironment.Channel = "QCH.ACC.001";  // server channel
            MQEnvironment.Port = 1501;    //port number
 }

发布功能
  public void Publish(String topicName, String qmName, String publishStr)
  {  
             initMQ();
           
            int destType = MQC.MQOT_TOPIC;
            string topicObject = null;
           
            MQQueueManager mqQMgr = null;

            try
            {
                //Create a connection to the queue manager
                mqQMgr = new MQQueueManager(qmName);

                //Publish
                MQMessage messageForPut = new MQMessage();
               
                messageForPut.WriteString(publishStr);
                mqQMgr.Put(destType, topicObject, null, topicName, messageForPut);
                MessageBox.Show("Publish message is: " + publishStr);

                //Disconnect from the queue manager
                mqQMgr.Disconnect();
            }
            catch (MQException mqEx)
            {
                MessageBox.Show("MQException caught. " + mqEx.Message);
            }
            finally
            {
                //Disconnect from the queue manager
                if (mqQMgr != null)
                    mqQMgr.Disconnect();
            }
 }

订阅功能(托管,持久)
        public void Subscribe(String topicName, String qmName, String subName)
        {
            initMQ();
           
            string topicObject = null;
            MQQueueManager mqQMgr = null;
            MQTopic topic = null;

            try
            {
                //Create a connection to the queue manager
                mqQMgr = new MQQueueManager(qmName);

                //Create durable subscribe
                int openOptionsForGet = MQC.MQSO_CREATE | MQC.MQSO_RESUME | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_MANAGED | MQC.MQSO_DURABLE;
                topic = mqQMgr.AccessTopic(topicName, topicObject, openOptionsForGet, null, subName);

                MQMessage messageForGet = new MQMessage();
                topic.Get(messageForGet);
                String messageDataFromGet = messageForGet.ReadLine();
                MessageBox.Show("Subscribe message is: " + messageDataFromGet);

            }
            catch (MQException mqEx)
            {
                if (mqEx.ReasonCode == 2033)
                    MessageBox.Show("No publish on topic " + topicName);
                else
                    MessageBox.Show("MQException caught. " + mqEx.Message);
            }
            finally
            {
                //Close subscribe
                if (topic != null)
                    topic.Close();

                //Disconnect from the queue manager
                if (mqQMgr != null)
                    mqQMgr.Disconnect();

            }
        }

使用示例:
            String pubStr = "IPHONE5 released at " + DateTime.Now.ToString();
            Publish("APPLE", "QM_001", pubStr);
            Subscribe("APPLE", "QM_001", "SUB_1");

在首先订阅注册后,MQ生成持久的订阅,名称为SUB_1,此后所有的发布通过打开该订阅都可以接收到,无论订阅方断开与否。

抱歉!评论已关闭.