1. 网上典型例子
例1:http://www.verydemo.com/demo_c202_i2146.html 这个例子相对比较详细完整,也是JMS入门级的资料,例子中的JMS消息没有进行持久化。下面的DIY也是在这个基础上进行。
本DIY的例子的JBOSS安装程序是jboss-4.0.4.GA-Patch1-installer.jar,JBoss环境就是http://blog.csdn.net/ictcamera/article/details/13997997
,整个例子是在该环境基础上进行的。因为这个JBOSS版本例1的版本有差异,所有在DIY过程会出现一些问题,按照例子1进行的过程出现很多问题(当然不是完全按照例1进行,本例子中全程使用了IDE-myeclipse作为编译、打包工具,jar包和配置文件完全是手工进行的),主要问题和解决办法如下:
1、“如何来管理目的”部分,安装文档介绍,调用一个通过调用service=DestinationManager 登记的JMX Bean 来实现目的地的动态操作(增加、删除)。如果代码调用部分和JMXBean服务器在一个JVM(注意:(1)这个版本的JBOSS中JBOSS、内置tomcat都在一个容器;(2)、JBOSS的服务都是以JMX的MBean服务形式提供),那么可以如下方式管理目的地
MBeanServer server = (MBeanServer) MBeanServerFactory.findMBeanServer(null).iterator().next(); server.invoke(new ObjectName("JBossMQ", "service", "DestinationManager"), method, new Object[] {“myTopic” }, new String[] { "java.lang.String" }); |
而可能因为版本的不同,默认的DestinationManager服务域名有差异,应该改为“boss.mq”否则一直抛出DestinationManager服务未注册的异常。这时候可以通过登入URL:http://localhost:8080/jmx-console/HtmlAdaptor来查看DestinationManager服务的基本属性来了解具体的域名(可以查看JBOSS的所有服务及相关信息)。
2、同样在“如何来管理目的”部分,如果代码调用部分和JMXBean服务器在不再一个JVM,那么可以通过一个JMX adapter。一个JMX adapter是一个HTML GUI。用程序通过URL来调用Mbean。但是由于代码中的直接发http请求通过来进行操作,而JBOSS提供的web形式管理方式默认有口令(admin/admin),因此执行这段代码的时候一直异常“java.io.IOException:
Server returned HTTP response code:……”这时候将异常中的url直接在已经登入过的浏览器中输入执行,目的却能成功创建。
3、如果代码调用部分和JMXBean服务器在不再一个JVM可以如下方式操作JMS的目的地
private static void invokeViaMBeanInDiffJVM(String method, String destName){ try { Context context; Hashtable <String, String> jndi = new Hashtable <String, String>(); jndi.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); jndi.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); jndi.put(Context.PROVIDER_URL, "localhost:1099"); context = new InitialContext(jndi); MBeanServerConnection server = (MBeanServerConnection) context.lookup("jmx/invoker/RMIAdaptor"); server.invoke(new ObjectName("jboss.mq:service=DestinationManager"), method, new Object[] {destName, "topic/" + destName}, new String[] {"java.lang.String", "java.lang.String"}); } catch (Exception e) { e.printStackTrace(); } }
2. DIY例子
使用JMS不仅能发送(send)/发布(publish)消息,也能获得(receive)/订阅(subscribe)的消息,消息的发送(send)/发布(publish)和(receive)/订阅(subscribe)和JBOSS的相对位置是无关的,这个在实际应用中非常有用。在C/S架构的软件中,一般S端部署了JBOSS,作为JMS的publisher,而C端一般是Java程序,作为JMS的subscriber。这里以topic的形式举例说明,注:下面以可序列化的Map作为消息传输载体。
import org.martin.common.communication.jms.JmsService; import org.martin.common.communication.jms.JmsServiceImpl; public class ServiceAccess { public static JmsService getJMSService(){ return new JmsServiceImpl(); } } package org.martin.common.communication.jms; public class Constants { public static final int DEST_TYPE_TOPIC=1; public static final int DEST_TYPE_QUEUE=2; public static final int DEST_OPER_ADD=1; public static final int DEST_OPER_DELETE=2; } public class JmsAsyncReceiver { private TopicConnection topicConnection; private TopicSession topicSession; private TopicSubscriber topicSubscriber; private Topic topic; @SuppressWarnings("unchecked") public JmsAsyncReceiver(String factoryJNDI, String topicJNDI, MessageListener messagelistener) throws JMSException,NamingException { Hashtable props = new Hashtable(); props.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); props.put(Context.PROVIDER_URL, "localhost:1099"); props.put("java.naming.rmi.security.manager", "yes"); props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming"); Context context = new InitialContext(props); TopicConnectionFactory topicFactory = (TopicConnectionFactory) context.lookup(factoryJNDI); topicConnection = topicFactory.createTopicConnection(); topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); topic = (Topic) context.lookup(topicJNDI); topicSubscriber = topicSession.createSubscriber(topic); topicSubscriber.setMessageListener(messagelistener); System.out.println("AsyncReceiver subscribed to topic: " + topicJNDI); topicConnection.start(); } public void close() throws JMSException { topicSession.close(); topicConnection.close(); } } public class JmsHelper { public static void addDestination(String destName){ try{ invokeViaMBean("createTopic", destName); }catch(Exception e){ //e.printStackTrace(); try{ invokeViaMBeanInDiffJVM("createTopic", destName); }catch(Exception e1){ //e1.printStackTrace(); } } } private static void invokeViaMBean(String method, String destName) throws Exception { MBeanServer server = (MBeanServer) MBeanServerFactory.findMBeanServer(null) .iterator().next(); server.invoke(new ObjectName("jboss.mq", "service", "DestinationManager"), method, new Object[] { destName }, new String[] { "java.lang.String" }); } private static void invokeViaMBeanInDiffJVM(String method, String destName)throws Exception{ Context context; Hashtable <String, String> jndi = new Hashtable <String, String>(); jndi.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); jndi.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); jndi.put(Context.PROVIDER_URL, "localhost:1099"); context = new InitialContext(jndi); MBeanServerConnection server = (MBeanServerConnection) context.lookup("jmx/invoker/RMIAdaptor"); server.invoke(new ObjectName("jboss.mq:service=DestinationManager"), method, new Object[] {destName, "topic/" + destName}, new String[] {"java.lang.String", "java.lang.String"}); } } public interface JmsReceiveListener { void onMessage(Object msg); } public class JmsReceiveScheduler implements MessageListener { private JmsAsyncReceiver regInfo; private ConcurrentHashMap<String, JmsReceiveListener> listenerMap = new ConcurrentHashMap<String, JmsReceiveListener>(); private String destination=null; public JmsReceiveScheduler(String destination) throws JMSException, NamingException { this.destination=destination; JmsService service=ServiceAccess.getJMSService(); regInfo = service.registAsyncReceive(Constants.DEST_TYPE_TOPIC, this.destination, this); } public void registerListener(String clientID, JmsReceiveListener listener) { listenerMap.put(clientID, listener); } public void unregisterListener(String id) { listenerMap.remove(id); } public void close() throws JMSException { regInfo.close(); } @SuppressWarnings("unchecked") public void onMessage(Message msgIn) { ObjectMessage objMsg = (ObjectMessage) msgIn; Map map=null; try{ map = (HashMap) objMsg.getObject(); }catch(JMSException e){ e.printStackTrace(); } String clientID = (String) map.get("CLIENT_ID"); JmsReceiveListener listener = listenerMap.get(clientID); if(listener != null) { listener.onMessage(map.get("DATA")); } } } public class JmsSender { private TopicConnection topicConnection; private TopicSession topicSession; private TopicPublisher topicPublisher; private Topic topic; @SuppressWarnings("unchecked") public JmsSender(String factoryJNDI, String topicJNDI) throws JMSException,NamingException { Hashtable props = new Hashtable(); props.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); props.put(Context.PROVIDER_URL, "localhost:1099"); props.put("java.naming.rmi.security.manager", "yes"); props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming"); Context context = new InitialContext(props); TopicConnectionFactory topicFactory = (TopicConnectionFactory) context .lookup(factoryJNDI); topicConnection = topicFactory.createTopicConnection(); topicSession = topicConnection .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); topic = (Topic) context.lookup(topicJNDI); topicPublisher = topicSession.createPublisher(topic); } @SuppressWarnings("unchecked") public void publish(Map msg) throws JMSException { ObjectMessage message = topicSession.createObjectMessage(); message.setObject((Serializable) msg); topicPublisher.publish(topic, message); } public void close() throws JMSException { topicSession.close(); topicConnection.close(); } } public class JmsSenderWrapped { private static JmsSender sender; public JmsSenderWrapped(String destination) throws JMSException, NamingException{ JmsService jmsService = ServiceAccess.getJMSService(); sender = jmsService.getSender(Constants.DEST_TYPE_TOPIC, destination); } @SuppressWarnings("unchecked") public void sendJMSMessage(String clientID, Serializable dataIn) throws JMSException { Map map = new HashMap(); map.put("CLIENT_ID", clientID); map.put("DATA", dataIn); sender.publish( map); } public void close() throws JMSException{ sender.close(); } } public interface JmsService { JmsAsyncReceiver registAsyncReceive(int destType,String destination, MessageListener listener) throws JMSException, NamingException; JmsSender getSender(int destType, String destination) throws JMSException, NamingException; public void operDestination(int operType,int destType, String destination); } public class JmsServiceImpl implements JmsService { public JmsAsyncReceiver registAsyncReceive(int destType, String destination, MessageListener listener) throws JMSException, NamingException { if(destType==Constants.DEST_TYPE_TOPIC){ JmsAsyncReceiver receivere = new JmsAsyncReceiver ("TopicConnectionFactory","topic/"+destination, listener); return receivere; }else if(destType==Constants.DEST_TYPE_QUEUE){ } return null; } public JmsSender getSender(int destType, String destination) throws JMSException, NamingException { if(destType==Constants.DEST_TYPE_TOPIC){ JmsSender sender=new JmsSender ( "ConnectionFactory", "topic/"+destination); return sender; } return null; } public void operDestination(int operType,int destType, String destination) { if(destType==Constants.DEST_TYPE_TOPIC){ if(operType==Constants.DEST_OPER_ADD){ JmsHelper.addDestination(destination); } } } } public class ReceiveTest { public static void main(String[]args){ String destName="MyTestDestination"; String clientName="MyTClientName"; JmsService service=ServiceAccess.getJMSService(); service.operDestination(Constants.DEST_OPER_ADD, Constants.DEST_TYPE_TOPIC, destName); JmsReceiveScheduler schedul=null; try{ schedul=new JmsReceiveScheduler(destName); }catch(JMSException e){ e.printStackTrace(); }catch(NamingException e){ e.printStackTrace(); } schedul.registerListener(clientName, new JmsReceiveListener(){ public void onMessage(Object msg) { System.out.println(msg); } }); } } public class SendTest { public static void main(String[]args){ String destName="MyTestDestination"; String clientName="MyTClientName"; JmsService service=ServiceAccess.getJMSService(); service.operDestination(Constants.DEST_OPER_ADD, Constants.DEST_TYPE_TOPIC, destName); JmsSenderWrapped send=null; try{ send=new JmsSenderWrapped(destName); }catch(JMSException e){ e.printStackTrace(); }catch(NamingException e){ e.printStackTrace(); } for(int i=0;i<10;i++){ try{ send.sendJMSMessage(clientName, "count:"+i); }catch(JMSException e){ e.printStackTrace(); } } } }