现在的位置: 首页 > 综合 > 正文

java 实现jms的客户端(发送接收消息)

2018年01月30日 ⁄ 综合 ⁄ 共 6063字 ⁄ 字号 评论关闭

本文以ActiveMQ 消息服务器中间件为例。

实现的步骤如下:

1)实例化连接 工厂ConnectionFactory,主要设置的参数为连接到消息服务器中间件的用户名,密码及url.

2)通过连接工厂ConnectionFactory获取到消息中间件的连接Connection.

3)启动连接,并创建消息会话Session,用于发送或接收消息的线程

4)通过消息会话创建消息目的地Destination

5)创建消息生产者MessageProducer或消息消费者MessageConsumer

6)通过消息生产者MessageProducer发送消息或通过消息消费者MessageConsumer接收消息

7)关闭并释放连接资料

具体的实现代码如下:

发送消息客户端代码如下:

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import com.afmobi.jms.model.User;

public class QueueSender {
	
	private ConnectionFactory connFactory;
	private Connection conn;
	private Session session;
	private MessageProducer producer;
	private boolean stop=false;

	public void execute() throws Exception {
		// 连接工厂
		// 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中
		connFactory = new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
		// 连接到JMS提供者
		conn = connFactory.createConnection();
		conn.start();

		// 事务性会话,自动确认消息
		// 第一个参数是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,没有回应则抛出异常,消息发送程序负责处理这个错误。
		// 第二个参数消息的确认模式:
		// AUTO_ACKNOWLEDGE :
		// 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。
		// CLIENT_ACKNOWLEDGE :
		// 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息)
		// DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息)
		session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

		// 创建目标,就创建主题也可以创建队列
		Destination destination = session.createQueue("queue.hello");

		// 消息生产者
		producer = session.createProducer(destination);
		// 设置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT
		// 如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失。
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 持久化

		// 发送消息
		while(!stop){
			Thread.sleep(1000);
			sendObject(session, producer);
			session.commit();// 在事务会话中,只有commit之后,消息才会真正到达目的地
			System.out.println("已发送消息");
			
		} 
		

		
		producer.close();
		session.close();
		conn.close();
	}

	// 对象消息
	public void sendObject(Session session, MessageProducer producer)
			throws JMSException {
		User user = new User();
		user.setAccount("petty");
		user.setName("happy");
		ObjectMessage objectMessage = session.createObjectMessage();
		objectMessage.setObject(user);
		producer.send(objectMessage);
	}

	// 字节消息
	public void sendBytes(Session session, MessageProducer producer)
			throws JMSException {
		String s = "BytesMessage字节消息";
		BytesMessage bytesMessage = session.createBytesMessage();
		bytesMessage.writeBytes(s.getBytes());
		producer.send(bytesMessage);
	}

	// 流消息
	public void sendStream(Session session, MessageProducer producer)
			throws JMSException {
		StreamMessage streamMessage = session.createStreamMessage();
		streamMessage.writeString("streamMessage流消息");
		streamMessage.writeLong(55);
		producer.send(streamMessage);
	}

	// 键值对消息
	public void sendMap(Session session, MessageProducer producer)
			throws JMSException {
		MapMessage mapMessage = session.createMapMessage();
		mapMessage.setLong("age", 25);
		mapMessage.setDouble("sarray", new Double(6555.5));
		mapMessage.setString("username", "键值对消息");
		producer.send(mapMessage);
	}

	// 文本消息
	public void sendText(Session session, MessageProducer producer)
			throws JMSException {
		TextMessage textMessage = session.createTextMessage("文本消息");
		producer.send(textMessage);
	}


}

接收消息客户端代码如下:

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import com.afmobi.jms.model.User;

public class QueueReceiver implements MessageListener{
	
	private ConnectionFactory connFactory;
	private Connection conn;
	private Session session;
	private boolean stop=false;
	
	public void execute()throws Exception{
		//连接工厂
		// 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中
		connFactory=new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD,
				"tcp://localhost:61616");
		
		//连接到JMS提供者
		conn=connFactory.createConnection();
		conn.start();
		
		//事务性会话,自动确认消息
		// 第一个参数是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,没有回应则抛出异常,消息发送程序负责处理这个错误。
        // 第二个参数消息的确认模式:
        // AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。
        // CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息)
        // DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息)
		session=conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
		
		// 创建目标,就创建主题也可以创建队列
		Destination destination=session.createQueue("queue.hello");
		
		//消息的消费者
		MessageConsumer consumer=session.createConsumer(destination);
		consumer.setMessageListener(this);
		
		//等待接收消息
		while(!stop){
			Thread.sleep(5000);
		}
				
		consumer.close();
		session.close();
		conn.close();
	}

	public void onMessage(Message m) {
		try{
			if(m instanceof TextMessage){//接收文件消息
				TextMessage message=(TextMessage)m;
				System.out.println(message.getText());
			}else if(m instanceof MapMessage){//接收键值消息
				MapMessage message=(MapMessage)m;
				System.out.println(message.getLong("age"));
				System.out.println(message.getDouble("sarray"));
				System.out.println(message.getString("username"));
			}else if(m instanceof StreamMessage){//接收流消息
				StreamMessage message=(StreamMessage)m;
				System.out.println(message.readString());
				System.out.println(message.readLong());
			}else if(m instanceof BytesMessage){
				byte[] b=new byte[1024];
				int len=-1;
				BytesMessage message=(BytesMessage)m;
				while((len=message.readBytes(b))!=-1){
					System.out.println(new String(b,0,len));
				}
			}else if(m instanceof ObjectMessage){
				ObjectMessage message=(ObjectMessage)m;
				User user=(User)message.getObject();
				System.out.println("name:"+user.getAccount()+";info:"+user.getName());
			}else{
				System.out.println(m);
			}
			session.commit();
		}catch(JMSException e){
			e.printStackTrace();
		}
		
		
	}

}

以上代码是PTP即点对点消息模式的示例,如果采用Sub/Pub即发布/订阅者消息模式,基本代码的实现过程都一样,只需把

创建消息目的地的代码
Destination destination = session.createQueue("queue.hello");

修改为

Destination destination=session.createTopic("topic.hello");

即可。

 

抱歉!评论已关闭.