本文以ActiveMQ 消息服务器中间件为例。
实现的步骤如下:
1)实例化连接 工厂ConnectionFactory,主要设置的参数为连接到消息服务器中间件的用户名,密码及url.
2)通过连接工厂ConnectionFactory获取到消息中间件的连接Connection.
3)启动连接,并创建消息会话Session,用于发送或接收消息的线程
4)通过消息会话创建消息目的地Destination
5)创建消息生产者MessageProducer或消息消费者MessageConsumer
6)通过消息生产者MessageProducer发送消息或通过消息消费者MessageConsumer接收消息
7)关闭并释放连接资料
具体的实现代码如下:
发送消息客户端代码如下:
import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import com.afmobi.jms.model.User; public class QueueSender { private ConnectionFactory connFactory; private Connection conn; private Session session; private MessageProducer producer; private boolean stop=false; public void execute() throws Exception { // 连接工厂 // 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中 connFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); // 连接到JMS提供者 conn = connFactory.createConnection(); conn.start(); // 事务性会话,自动确认消息 // 第一个参数是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,没有回应则抛出异常,消息发送程序负责处理这个错误。 // 第二个参数消息的确认模式: // AUTO_ACKNOWLEDGE : // 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。 // CLIENT_ACKNOWLEDGE : // 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息) // DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息) session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建目标,就创建主题也可以创建队列 Destination destination = session.createQueue("queue.hello"); // 消息生产者 producer = session.createProducer(destination); // 设置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT // 如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失。 producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 持久化 // 发送消息 while(!stop){ Thread.sleep(1000); sendObject(session, producer); session.commit();// 在事务会话中,只有commit之后,消息才会真正到达目的地 System.out.println("已发送消息"); } producer.close(); session.close(); conn.close(); } // 对象消息 public void sendObject(Session session, MessageProducer producer) throws JMSException { User user = new User(); user.setAccount("petty"); user.setName("happy"); ObjectMessage objectMessage = session.createObjectMessage(); objectMessage.setObject(user); producer.send(objectMessage); } // 字节消息 public void sendBytes(Session session, MessageProducer producer) throws JMSException { String s = "BytesMessage字节消息"; BytesMessage bytesMessage = session.createBytesMessage(); bytesMessage.writeBytes(s.getBytes()); producer.send(bytesMessage); } // 流消息 public void sendStream(Session session, MessageProducer producer) throws JMSException { StreamMessage streamMessage = session.createStreamMessage(); streamMessage.writeString("streamMessage流消息"); streamMessage.writeLong(55); producer.send(streamMessage); } // 键值对消息 public void sendMap(Session session, MessageProducer producer) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setLong("age", 25); mapMessage.setDouble("sarray", new Double(6555.5)); mapMessage.setString("username", "键值对消息"); producer.send(mapMessage); } // 文本消息 public void sendText(Session session, MessageProducer producer) throws JMSException { TextMessage textMessage = session.createTextMessage("文本消息"); producer.send(textMessage); } }
接收消息客户端代码如下:
import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import com.afmobi.jms.model.User; public class QueueReceiver implements MessageListener{ private ConnectionFactory connFactory; private Connection conn; private Session session; private boolean stop=false; public void execute()throws Exception{ //连接工厂 // 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中 connFactory=new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); //连接到JMS提供者 conn=connFactory.createConnection(); conn.start(); //事务性会话,自动确认消息 // 第一个参数是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,没有回应则抛出异常,消息发送程序负责处理这个错误。 // 第二个参数消息的确认模式: // AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。 // CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息) // DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息) session=conn.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建目标,就创建主题也可以创建队列 Destination destination=session.createQueue("queue.hello"); //消息的消费者 MessageConsumer consumer=session.createConsumer(destination); consumer.setMessageListener(this); //等待接收消息 while(!stop){ Thread.sleep(5000); } consumer.close(); session.close(); conn.close(); } public void onMessage(Message m) { try{ if(m instanceof TextMessage){//接收文件消息 TextMessage message=(TextMessage)m; System.out.println(message.getText()); }else if(m instanceof MapMessage){//接收键值消息 MapMessage message=(MapMessage)m; System.out.println(message.getLong("age")); System.out.println(message.getDouble("sarray")); System.out.println(message.getString("username")); }else if(m instanceof StreamMessage){//接收流消息 StreamMessage message=(StreamMessage)m; System.out.println(message.readString()); System.out.println(message.readLong()); }else if(m instanceof BytesMessage){ byte[] b=new byte[1024]; int len=-1; BytesMessage message=(BytesMessage)m; while((len=message.readBytes(b))!=-1){ System.out.println(new String(b,0,len)); } }else if(m instanceof ObjectMessage){ ObjectMessage message=(ObjectMessage)m; User user=(User)message.getObject(); System.out.println("name:"+user.getAccount()+";info:"+user.getName()); }else{ System.out.println(m); } session.commit(); }catch(JMSException e){ e.printStackTrace(); } } }
以上代码是PTP即点对点消息模式的示例,如果采用Sub/Pub即发布/订阅者消息模式,基本代码的实现过程都一样,只需把
创建消息目的地的代码
Destination destination = session.createQueue("queue.hello");
修改为
Destination destination=session.createTopic("topic.hello");
即可。