现在的位置: 首页 > 综合 > 正文

消息队列技术终结者(一)—通俗深刻地认识JMS(即Java Message Service)

2019年11月13日 ⁄ 综合 ⁄ 共 9804字 ⁄ 字号 评论关闭

        JMS很早就有,网上更是如此,但是大多总结的不太全面不太具体,在现有学习资源基础上结合自己的体悟,现重新总结一下:

        JMS全称为Java Message Service(即Java 消息服务),它是J2EE技术规范之一(它属于Java平台上有关面向消息中间件(MOM)的技术规范),用于访问消息系统(或向消息系统发送消息或向消息系统接收消息),最终实现不同应用系统之间的消息交互。呵呵呵,当你读到这里的时候恐怕会心里嘀咕——什么它妈的“消息系统”大笑?我们知道通过JDBC接口可以实现不同系统访问同一个关系型数据库,那么通过JMS接口我们就可以实现不同系统访问同一个“消息系统”,说的更直白一点儿“消息系统”的地位就相当于上面提到的数据库:关系型数据库是一个运行在计算机或服务器上的软件(比如Oracle、MySQL或SQL Server);消息系统也是一个运行在计算机或服务器上的软件(比如Apache的ActiveMQ或IBM的WebSphere MQ)。上面这句话不仅点出了什么是“消息系统”,也解释了究竟什么是JMS——JMS 扮演的角色与JDBC 很相似:JDBC 提供了一套用于访问各种不同关系型数据库的公共接口(这些接口由各个研发数据库软件的公司具体实现,只需使用他们的jar包进行开发即可),而JMS 提供了一套用于访问不同消息系统的接口(这些接口由各个研发消息系统软件的公司具体实现,只需使用他们的jar包进行开发即可)。
        使用JMS 的应用程序被称为JMS 客户端,保存消息与传递消息的“消息系统”被称为JMS Provider,JMS 应用是由“多个JMS 客户端”(发送消息的JMS 客户端被称为消息生产者(producer),而接收消息的JMS 客户端则被称为消息消费者(consumer);同一JMS 客户端既可以是生产者也可以是消费者)和一个JMS Provider 构成的业务系统。JMS能够让我们通过消息收发服务(即JMS Provider,有时也称为消息中介程序或称为消息服务器)从一个应用程序向另一个应用程序发送消息。
        JMS 的编程过程很简单,概括为:应用程序A (消息生产者)发送一条消息到消息服务器(比如:ActiveMQ)的某个目的地(Destination,即消息服务器端特定的队列或特定的主题),然后消息服务器(比如:ActiveMQ)把消息转发给应用程序B(消息使用者)。因为应用程序A 和应用程序B 没有直接的代码关连,所以两者实现了解偶,如下图:

        上图及图上面的一段文字简要介绍了JMS的编程过程,下面结合Apache的JMS软件(即ActiveMQ)详细做一介绍:

        应用程序A

package com.ghj.packageoftest;

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者
 * 
 * @author 高焕杰
 */
public class Producer {

    public static void main(String[] args) {
        try {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("FirstQueue");
            MessageProducer producer = session.createProducer(destination);
            Message message = session.createTextMessage("你好消息使用者,我是消息生产者生产的消息!!!");
            producer.send(message);
            if (connection != null){
				connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

        编程过程说明:

        1、ConnectionFactory(连接工厂接口):通过ActiveMQ对JMS接口ConnectionFactory的实现类ActiveMQConnectionFactory(ActiveMQConnectionFactory构造函数第一个参数为消息服务器的账号,默认admin;第二个参数为消息服务器的密码,默认为admin;第三个参数为消息服务器的URL)来创建连接消息服务器的连接工厂,该连接工厂实例的创建为JMS使用该实例创建与消息服务器的连接奠定了基础。该接口有如下两个子接口:队列连接工厂(QueueConnectionFactory)和主题连接工厂(TopicConnectionFactory)。

        2、Connection(连接接口):借助连接工厂实现类实例创建JMS客户端与消息服务器端的连接接口实现类实例,该实例的创建为启动JMS客户端与消息服务器端的连接以及创建Session接口实现类实例奠定了基础。该接口有如下两个子接口:队列连接(QueueConnection)和主题连接(TopicConnection)。

        3、通过连接接口实现类实例调用start方法启动JMS客户端与消息服务器端的连接。

        4、Session(会话接口):该接口表示一个单线程的上下文,用于发送消息或接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的,也是按照接受的顺序一个一个发送的。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务(调用session的commit方法)之前,用户可以使用回滚(session实例的rollback方法)操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。通过连接接口实现类实例调用createSession方法创建JMS客户端与消息服务器端的会话,该方法有两个参数: 第一个参数用于设置所创建的Session对象是否支持事务;第二个参数用于设置消息接受者以何种模式向ActiveMQ服务器端确认消息已经接受;如果会话是事务的(即第一个参数设为了true),则忽略该参数;如果是非事务的,那么该参数可以有如下三种:
               第一种:AUTO_ACKNOWLEDGE:自动确认模式。该确认模式下,在会话成功地从 receive调用返回或会话为处理消息而调用的消息监听器成功的返回时,该会话自动确认客户端消息的接收。  
               第二种:CLIENT_ACKNOWLEDGE:客户端确认模式。会话对象依赖于应用程序对被接收的消息调用一个acknowledge()方法。一旦这个方法被调用,会话会确认最后一次确认之后所有接收到的消息。这种模式允许应用程序以一个调用来接收,处理并确认一批消息。注意:在管理控制台中,如果连接工厂的Acknowledge Policy(确认方针)属性被设置为"Previous"(提前),但是你希望为一个给定的会话确认所有接收到的消息,那么就用最后一条消息来调用acknowledge()方法。如果使用的是这种方式,那么需要在MessageListener里显式调用message.acknowledge()来通知服务器。服务器接收到通知后采取相应的操作。  
               第三种:DUPS_OK_ACKNOWLEDGE:允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。注意:如果你的应用程序无法处理重复的消息的话,你应该避免使用这种模式。如果发送消息的初始化尝试失败,那么重复的消息可以被重新发送。 

        该接口有如下两个子接口:队列会话(QueueSession)和主题会话(TopicSession)。

        5、Destination(消息目的地接口):对消息生产者而言,Destination用于指明消息发布到消息服务器的哪个队列(Queue)或哪个主题(Topic)中;对于消息消费者而言,Destination用于指明消息从消息服务器的哪个队列(Queue)或主题(Topic)中接收到。就像JDBC中操作数据时需要指明哪个数据库(对应名字叫什么的队列(Queue)或主题(Topic))一样。该接口实现类实例由Session接口实现类实例通过调用createQueue(队列(Queue))或createTopic(主题(Topic))方法来创建,无论调用哪个方法,都需要指明队列(Queue)或主题(Topic)名称(就像指明JDBC数据库名称一样)。该接口有如下两个子接口:队列(Queue)和主题(Topic)。

        6、MessageProducer(消息生产者接口):该接口实现类实例由Session会话接口实现类实例通过调用createProducer方法并传入Destination接口实现类实例来创建(呵呵呵大笑,别忘了你要告诉消息生产者将消息发布到消息服务器的哪个队列(Queue)或哪个主题(Topic)中)。注意:在创建消息生产者时既可以直接创建某个消息目的地的消息生产者(如上面代码中创建消息生产者的方式),也可以创建一个通用的消息生产者(即在通过Session接口实现类调用createProducer方法时传入null),在发送消息时指定消息目的地(例如通过调用send(Destination destination, Message message)方法发送消息),创建通用的消息生产者对于实现同一个消息生产者向不同消息目的地发送消息提供了解决方案。由于博客篇幅问题,这里直接给出可下载的Demo——【0分下载该例子Demo】。

        7、Message(消息接口):该接口是所有JMS消息的根接口,其实现类实例是消息生产者和消息消费者之间传送的对象。Message接口实现类实例由Session接口实现类实例调用createStreamMessage、createMapMessage、createTextMessage、createObjectMessage或createBytesMessage中的一个方法来实现。 一个消息有三个主要部分:
               第一部分:消息头(Header) - 所有的消息都支持一套相同的头字段。头字段包含用于识别和为消息寻找路由的操作设置。 
               第二部分:消息属性(Properties) - 每条消息包含一个内置的功能,以便支持应用程序自定义的属性值。可以创建定制的字段和过滤器(消息选择器)。 
               第三部分:消息体(Body) - JMS API定义了五种类型的消息体,涵盖了当前使用的大部分消息类型: 
                                   第一种:StreamMessage:流消息,该种类型的消息体包含了Java原始数据类型流。它按顺序填充和读取。 
                                   第二种:MapMessage:映射消息,该种类型的消息体包含了一系列的键值对,其中键是 String类型的对象,值是Java原始类型。其中的条目可以顺序或随机的通过键来访问。每个条目的顺序是不确定的。 
                                   第三种:TextMessage:文本消息,该种类型的消息体包含一个 java.lang.String 对象。该类型的消息可以用来传送纯文本和XML消息。 
                                   第四种:ObjectMessage:对象消息,该种类型的消息体包含一个序列化的Java对象。
                                   第五种:BytesMessage:字节消息,该种类型的消息体包含了一个未经解释的字节流。该消息类型逐字地编码消息体以匹配现存的消息格式。在多数情况下,可以考虑更易于使用的其他消息类型。尽管JMS API允许在该消息上使用消息属性,但通常并不这么用,因为包含的消息属性可能会影响格式。

        8、MessageProducer接口实现类实例通过调用send方法将消息发布到消息服务器上已指定的目的地(别忘了大笑,创建消息生产者实例时传入了Destination接口实现类实例),该方法需要依次传入Message接口实现类实例、发送消息模式、消息优先级和消息的生存周期(以毫秒为单位)这四个参数,呵呵呵,上面只传入了一个Message接口实现类实例是因为采用了默认的发送模式、默认的消息优先级和默认的消息的生存周期(以毫秒为单位)。
       那么JMS发送消息模式有哪些呢?疑问JMS有两种发送消息模式:
               第一种:DeliveryMode.NON_PERSISTENT(非持久化,默认):这是开销最低的传送模式,因为它不要求消息被记录到稳定的存储中;但正是因为这种方式导致消息没有被记录到稳定的存储中,所以消息一定不能被传递两次
               第二种:DeliveryMode.PERSISTENT(持久化):该传送模式指示JMS服务器将消息记录到稳定的存储中作为消息生产者发送操作的一个环节,由此可以理解这样一句话:如果一个JMS服务离线,那么持久化消息不会丢失但是得等到这个服务恢复联机时才会被传递。  

        JMS的发送消息模式可以在发送消息时单独设置(即通过消息生产者调用send方法时设定),也可以通过消息生产者接口实现类调用setDeliveryMode方法来设定(这种方式设定的发送消息模式将影响该消息生产者发布的所有消息的)。

        JMS消息优先级又该怎样理解呢?疑问JMS定义了0到9共10个等级的优先级(其中4是默认的优先级),最低优先级为0,最高优先级为9;其中0到4是正常优先级的变化幅度,而5到9是加快的优先级的变化幅度。举例来说:“messageProducer.send(message, DeliveryMode.PERSISTENT, 8, 10000);”这行代码采用持久化的消息发送模式、加快优先级和10000毫秒生存周期。通过接口Message的setJMSPriority方法也可以设置消息的优先级。

        该接口有如下两个子接口:队列发送者(QueueSender)和主题发布者(TopicPublisher)。
        9、通过Connection接口实现类实例调用close方法来关闭与消息服务器的连接。注意:对于一个关闭的连接,不必关闭它的会话、生产者以及消费者。

        应用程序B

package com.ghj.packageoftest;

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息使用者
 * 
 * @author 高焕杰
 */
public class Consumer {
    public static void main(String[] args) {

        try {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("FirstQueue");
            MessageConsumer messageConsumer = session.createConsumer(destination);
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    try {
                        System.out.println("接收到的消息内容:" + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            
//          if (connection != null){//将该if语句块注销掉就可以一直等待接收消息服务器发送过来的消息了
//			    connection.close();
//          }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

        编程过程说明:

        1、和上面的1一样,并且传入的参数也要一样。

        2、和上面的2一样。

        3、和上面的3一样。

        4、和上面的4一样,并且传入的参数也要一样。

        5、和上面的5一样,并且传入的参数也要一样。

        6、MessageConsumer(消息消费者接口):该接口实现类实例由Session会话接口实现类实例通过调用createConsumer方法并传入Destination接口实现类实例来创建(呵呵呵大笑,别忘了你要告诉消息消费者从消息服务器的哪个队列(Queue)或哪个主题(Topic)中接受消息)。该实例的实现为消费者从消息服务器接受消息奠定了基础。该接口有如下两个子接口:队列接收者(QueueReceiver)和主题订阅者(TopicSubscriber)。

        7、通过消息消费者接口实现类的setMessageListener方法为消息消费者设定消息监听器。如果将该setMessageListener方法的参数设置为null等价于没有为该消息消费者设置消息监听器,消息消费者也就无法从消息服务器端接受消息。

        8、不关闭消息消费者与消息服务器的连接——当消息生产者发布新的消息时,消息消费者就可以马上获取到。

        【0分下载该例子Demo

        上文在谈论Destination接口时曾说“对消息生产者而言,Destination用于指明消息发布到消息服务器的哪个队列(Queue)或哪个主题(Topic)中;对于消息消费者而言,Destination用于指明消息从消息服务器的哪个队列(Queue)或主题(Topic)中接收到。”,上面例子将消息“寄存”在了消息服务器端名为“FirstQueue”的队列(Queue)中,哪么我们怎么将消息“寄存”在消息服务器端名为主题(Topic)中呢?且看下面代码:

        应用程序A

package com.ghj.packageoftest;

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者
 * 
 * @author 高焕杰
 */
public class Producer {

    public static void main(String[] args) {
        try {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createTopic("FirstTopic");
            MessageProducer producer = session.createProducer(destination);
            Message message = session.createTextMessage("你好消息使用者,我是消息生产者生产的消息!!!");
            producer.send(message);
            if (connection != null){
				connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

        应用程序B

package com.ghj.packageoftest;

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息使用者
 * 
 * @author 高焕杰
 */
public class Consumer {
    public static void main(String[] args) {

        try {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createTopic("FirstTopic");
            MessageConsumer messageConsumer = session.createConsumer(destination);
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    try {
                        System.out.println("接收到的消息内容:" + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            
//          if (connection != null){//将该if语句块注销掉就可以一直等待接收消息服务器发送过来的消息了
//			    connection.close();
//          }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

        通过代码比较我们会发现将消息“寄存”到消息服务器端队列(Queue)中与将消息“寄存”在消息服务器端主题(Topic)中的区别就是在实现Destination接口时调用的方法不同罢了大笑。【0分下载该例子Demo

        言尽于此,可能有人会问队列(Queue)和主题(Topic)有什么区别?呵呵呵,如果能有这样的疑问,哪么恭喜你你已经入门了,下篇博客将重点介绍队列(Queue)和主题(Topic)之间的区别——《消息队列技术终结者(二)—JMS中队列(Queue)和主题(Topic)的区别

抱歉!评论已关闭.