现在的位置: 首页 > 综合 > 正文

ActiveMQ的一个 producertool/customertool示例

2018年08月11日 ⁄ 综合 ⁄ 共 5617字 ⁄ 字号 评论关闭

最近由于公司项目需要,开始学习JMS,用的是ActiveMQ

。由于这方面
网上的例子不是很多,而且有的也不完整。于是经过几天的摸索学习,写了一个简单的小例子,现在贴出来与大家分享。
ProducerTool.java
用于发送消息:

java 代码
  1. package


     homework;   
  2.   
  3. import


     javax.jms.Connection;
      
  4. import


     javax.jms.DeliveryMode;
      
  5. import


     javax.jms.Destination;
      
  6. import


     javax.jms.JMSException;
      
  7. import


     javax.jms.MessageProducer;
      
  8. import


     javax.jms.Session;
      
  9. import


     javax.jms.TextMessage;
      
  10.   
  11. import


     org.apache.activemq

    .ActiveMQ

    Connection;
      
  12. import


     org.apache.activemq

    .ActiveMQ

    ConnectionFactory;
      
  13.   
  14. public


     
    class


     ProducerTool {
      
  15.   
  16.     
    private


     String user = ActiveMQ

    Connection.DEFAULT_USER;
      
  17.   
  18.     
    private


     String password = ActiveMQ

    Connection.DEFAULT_PASSWORD;
      
  19.   
  20.     
    private


     String url = ActiveMQ

    Connection.DEFAULT_BROKER_URL;
      
  21.   
  22.     
    private


     String subject = 
    "TOOL.DEFAULT"

    ;
      
  23.   
  24.     
    private


     Destination destination = 
    null


    ;
      
  25.   
  26.     
    private


     Connection connection = 
    null


    ;
      
  27.   
  28.     
    private


     Session session = 
    null


    ;
      
  29.   
  30.     
    private


     MessageProducer producer = 
    null


    ;
      
  31.   
  32.     
    // 初始化

      
  33.     
    private


     
    void


     initialize() 
    throws


     JMSException, Exception {
      
  34.         ActiveMQ

    ConnectionFactory connectionFactory = 

    new


     ActiveMQ

    ConnectionFactory(
      
  35.                 user, password, url);   
  36.         connection = connectionFactory.createConnection();   
  37.         session = connection.createSession(
    false


    , Session.AUTO_ACKNOWLEDGE);
      
  38.         destination = session.createQueue(subject);   
  39.         producer = session.createProducer(destination);
      
  40.         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
      
  41.     }   
  42.   
  43.     
    // 发送消息

      
  44.     
    public


     
    void


     produceMessage(String message) 
    throws


     JMSException, Exception {
      
  45.         initialize();   
  46.         TextMessage msg = session.createTextMessage(message);
      
  47.         connection.start();   
  48.         System.out.println(
    "Producer:->Sending message: "

     + message);
      
  49.         producer.send(msg);   
  50.         System.out.println(
    "Producer:->Message sent complete!"

    );
      
  51.     }   
  52.   
  53.     
    // 关闭连接

      
  54.     
    public


     
    void


     close() 
    throws


     JMSException {
      
  55.         System.out.println(
    "Producer:->Closing connection"

    );
      
  56.         
    if


     (producer != 
    null


    )
      
  57.             producer.close();   
  58.         
    if


     (session != 
    null


    )
      
  59.             session.close();   
  60.         
    if


     (connection != 
    null


    )
      
  61.             connection.close();   
  62.     }   
  63. }   

 

ConsumerTool.java用于接受消息,我用的是基于消息监听的机制,需要实现MessageListener接口,这个接口有个
onMessage方法,当接受到消息的时候会自动调用这个函数对消息进行处理。

java 代码
  1. package


     homework;   
  2.   
  3. import


     javax.jms.Connection;
      
  4. import


     javax.jms.Destination;
      
  5. import


     javax.jms.JMSException;
      
  6. import


     javax.jms.MessageConsumer;
      
  7. import


     javax.jms.Session;
      
  8. import


     javax.jms.MessageListener;
      
  9. import


     javax.jms.Message;
      
  10. import


     javax.jms.TextMessage;
      
  11.   
  12. import


     org.apache.activemq

    .ActiveMQ

    Connection;
      
  13. import


     org.apache.activemq

    .ActiveMQ

    ConnectionFactory;
      
  14.   
  15. public


     
    class


     ConsumerTool 
    implements


     MessageListener {
      
  16.   
  17.     
    private


     String user = ActiveMQ

    Connection.DEFAULT_USER;
      
  18.   
  19.     
    private


     String password = ActiveMQ

    Connection.DEFAULT_PASSWORD;
      
  20.   
  21.     
    private


     String url = ActiveMQ

    Connection.DEFAULT_BROKER_URL;
      
  22.   
  23.     
    private


     String subject = 
    "TOOL.DEFAULT"

    ;
      
  24.   
  25.     
    private


     Destination destination = 
    null


    ;
      
  26.   
  27.     
    private


     Connection connection = 
    null


    ;
      
  28.   
  29.     
    private


     Session session = 
    null


    ;
      
  30.   
  31.     
    private


     MessageConsumer consumer = 
    null


    ;
      
  32.   
  33.     
    // 初始化

      
  34.     
    private


     
    void


     initialize() 
    throws


     JMSException, Exception {
      
  35.         ActiveMQ

    ConnectionFactory connectionFactory = 

    new


     ActiveMQ

    ConnectionFactory(
      
  36.                 user, password, url);   
  37.         connection = connectionFactory.createConnection();
      
  38.         session = connection.createSession(
    false


    , Session.AUTO_ACKNOWLEDGE);
      
  39.         destination = session.createQueue(subject);
      
  40.         consumer = session.createConsumer(destination);   
  41.            
  42.     }   
  43.   
  44.     
    // 消费消息

      
  45.     
    public


     
    void


     consumeMessage() 
    throws


     JMSException, Exception {
      
  46.         initialize();   
  47.         connection.start();   
  48.            
  49.         System.out.println(
    "Consumer:->Begin listening..."

    );
      
  50.         
    // 开始监听

      
  51.         consumer.setMessageListener(
    this


    );
      
  52.         
    // Message message = consumer.receive();

      
  53.     }   
  54.   
  55.     
    // 关闭连接

      
  56.     
    public


     
    void


     close() 
    throws


     JMSException {
      
  57.         System.out.println(
    "Consumer:->Closing connection"

    );
      
  58.         
    if


     (consumer != 
    null


    )
      
  59.             consumer.close();   
  60.         
    if


     (session != 
    null


    )
      
  61.             session.close();   
  62.         
    if


     (connection != 
    null


    )
      
  63.             connection.close();   
  64.     }   
  65.   
  66.     
    // 消息处理函数

      
  67.     
    public


     
    void


     onMessage(Message message) {
      
  68.         
    try


     {   
  69.             
    if


     (message 
    instanceof


     TextMessage) {
      
  70.                 TextMessage txtMsg = (TextMessage) message;   
  71.                 String msg = txtMsg.getText();   
  72.                 System.out.println(
    "Consumer:->Received: "

     + msg);
      
  73.             } 
    else


     {   
  74.                 System.out.println(
    "Consumer:->Received: "

     + message);
      
  75.             }   
  76.         } 
    catch


     (JMSException e) {
      
  77.             
    // TODO Auto-generated catch block

      
  78.             e.printStackTrace();   
  79.         }   
  80.     }   
  81. }   

 

如果想主动的去接受消息,而不用消息监听的话,把consumer.setMessageListener(this)改为Message
message = consumer.receive(),手动去调用MessageConsumer的receive方法即可。

下面是测试类Test.java:

java 代码
  1. package


     homework;   
  2.   
  3. import


     javax.jms.JMSException;
      
  4.   
  5. public


     
    class


     Test {
      
  6.   
  7.     
    /**

     
  8.      * @param args


     
  9.      */

      
  10.     
    public


     
    static


     
    void


     main(String[] args) 
    throws


     JMSException, Exception {
      
  11.         
    // TODO Auto-generated method stub

      
  12.         ConsumerTool consumer = 
    new


     ConsumerTool();
      
  13.         ProducerTool producer = 
    new


     ProducerTool();
      
  14.         
    // 开始监听

      
  15.         consumer.consumeMessage();   
  16.            
  17.         
    // 延时500毫秒之后发送消息

      
  18.         Thread.sleep(
    500
    );
      
  19.         producer.produceMessage(
    "Hello, world!"

    );
      
  20.         producer.close();   
  21.            
  22.         
    // 延时500毫秒之后停止接受消息

      
  23.         Thread.sleep(
    500
    );
      
  24.         consumer.close();   
  25.     }   
  26. }   

 

以上就是我学习ActiveMQ

之后写的一个简单的例子,希望对你有帮助,如果有什么错误还请指正。

【上篇】
【下篇】

抱歉!评论已关闭.