简介
ActiveMQ
是开源的JMS实现,Geronimo应用服务器就是使用的ActiveMQ提供JMS服务。ActiveMQ5.0相比以前版本提供了一些非常有用的新
功能:
- AMQ Message Store (Faster Persistence!)
- Cursors (To handle very large number of stored messages)
- Blob Messages
- Command Agent
- Enterprise Integration Patterns via Camel Integration
- Logging a warning if you forget to start a Connection
- Message Transformation
- Mirrored Queues
- Flow Control
鉴于目前关于ActiveMQ5.0的文章比较少,故准备写一系列ActiveMQ的使用方面的文章。本篇先从安装开始。
安装
- 在http://activemq.apache.org/download.html
下
载5.0.0发行包,解压到需要安装ActiveMQ的文件夹,记为/path/to/activemq。 - unix环境activemq文件夹需要执行权限,执行如下命令 chmod
-R 755 /path/to/activemq
启动
- window环境运行/path/to/activemq/bin/activemq.bat
- unix环境运行/path/to/activemq/bin/activemq
测试
ActiveMQ默认使用的TCP连接端口是61616, 通过查看该端口的信息可以测试ActiveMQ是否成功启动
- window环境运行 netstat -an|find "61616"
- unix环境运行netstat -an|grep 61616
监控
ActiveMQ5.0版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于监控ActiveMQ的admin应用。
admin:http://127.0.0.1:8161/admin/
demo:http://127.0.0.1:8161/demo/
点击demo应用中的“ Market data publisher
”,就会发一
些测试的消息。转到admin页面的topics menu下面(queue和topic的区别见 http://andyao.javaeye.com/blog/153173
),可以看到消息在增长。
配置
ActiveMQ5.0的配置文件在/path/to/activemq/conf目录下面。主要配置文件为activemq.xml,具体的配置
将在后续文章中详细说明。
-------------------------------------------****************************************************************
持久化时不能对自定义类型的对象进行保存,看是不是你的messageConverter有问题?你的消息类型是POJO吗?
MessageConverter如下
- public
class
DefaultMessageConverter
implements
MessageConverter {
-
/**
- * Logger for this class
- */
-
private
static
final
Log log = LogFactory.getLog(DefaultMessageConverter.
class
);
-
public
Message toMessage(Object obj, Session session)
throws
JMSException {
-
if
(log.isDebugEnabled()) {
- log.debug(
"toMessage(Object, Session) - start"
);
- }
-
// check Type
- ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage();
- HashMap<String,
byte
[]> map =
new
HashMap<String,
byte
[]>();
-
try
{ -
// POJO must implements Seralizable
- ByteArrayOutputStream bos =
new
ByteArrayOutputStream();
- ObjectOutputStream oos =
new
ObjectOutputStream(bos);
- oos.writeObject(obj);
- map.put(
"POJO"
, bos.toByteArray());
- objMsg.setObjectProperty(
"Map"
, map);
- }
catch
(IOException e) {
- log.error(
"toMessage(Object, Session)"
, e);
- }
-
return
objMsg;
- }
-
public
Object fromMessage(Message msg)
throws
JMSException {
-
if
(log.isDebugEnabled()) {
- log.debug(
"fromMessage(Message) - start"
);
- }
-
if
(msg
instanceof
ObjectMessage) {
- HashMap<String,
byte
[]> map = (HashMap<String,
byte
[]>) ((ObjectMessage) msg).getObjectProperty(
"Map"
);
-
try
{ -
// POJO must implements Seralizable
- ByteArrayInputStream bis =
new
ByteArrayInputStream(map.get(
"POJO"
));
- ObjectInputStream ois =
new
ObjectInputStream(bis);
- Object returnObject = ois.readObject();
-
return
returnObject;
- }
catch
(IOException e) {
- log.error(
"fromMessage(Message)"
, e);
- }
catch
(ClassNotFoundException e) {
- log.error(
"fromMessage(Message)"
, e);
- }
-
return
null
;
- }
else
{ -
throw
new
JMSException(
"Msg:["
+ msg +
"] is not Map"
);
- }
- }
- }