activemq.bat:
@echo off REM ------------------------------------------------------------------------ REM Licensed to the Apache Software Foundation (ASF) under one or more REM contributor license agreements. See the NOTICE file distributed with REM this work for additional information regarding copyright ownership. REM The ASF licenses this file to You under the Apache License, Version 2.0 REM (the "License"); you may not use this file except in compliance with REM the License. You may obtain a copy of the License at REM REM http://www.apache.org/licenses/LICENSE-2.0 REM REM Unless required by applicable law or agreed to in writing, software REM distributed under the License is distributed on an "AS IS" BASIS, REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. REM See the License for the specific language governing permissions and REM limitations under the License. REM ------------------------------------------------------------------------ set JAVA_HOME=E:\Jason\jdk1.6.0_43 if exist "%HOME%\activemqrc_pre.bat" call "%HOME%\activemqrc_pre.bat" if "%OS%"=="Windows_NT" @setlocal rem %~dp0 is expanded pathname of the current script under NT set DEFAULT_ACTIVEMQ_HOME=%~dp0.. if "%ACTIVEMQ_HOME%"=="" set ACTIVEMQ_HOME=%DEFAULT_ACTIVEMQ_HOME% set DEFAULT_ACTIVEMQ_HOME= :doneStart rem find ACTIVEMQ_HOME if it does not exist due to either an invalid value passed rem by the user or the %0 problem on Windows 9x if exist "%ACTIVEMQ_HOME%\README.txt" goto checkJava rem check for activemq in Program Files on system drive if not exist "%SystemDrive%\Program Files\activemq" goto checkSystemDrive set ACTIVEMQ_HOME=%SystemDrive%\Program Files\activemq goto checkJava :checkSystemDrive rem check for activemq in root directory of system drive if not exist %SystemDrive%\activemq\README.txt goto checkCDrive set ACTIVEMQ_HOME=%SystemDrive%\activemq goto checkJava :checkCDrive rem check for activemq in C:\activemq for Win9X users if not exist C:\activemq\README.txt goto noAntHome set ACTIVEMQ_HOME=C:\activemq goto checkJava :noAntHome echo ACTIVEMQ_HOME is set incorrectly or activemq could not be located. Please set ACTIVEMQ_HOME. goto end :checkJava set _JAVACMD=%JAVACMD% if "%JAVA_HOME%" == "" goto noJavaHome if not exist "%JAVA_HOME%\bin\amq_test.exe" goto noJavaHome if "%_JAVACMD%" == "" set _JAVACMD=%JAVA_HOME%\bin\amq_test.exe goto runAnt :noJavaHome if "%_JAVACMD%" == "" set _JAVACMD=amq_test.exe echo. echo Warning: JAVA_HOME environment variable is not set. echo. :runAnt if "%ACTIVEMQ_BASE%" == "" set ACTIVEMQ_BASE=%ACTIVEMQ_HOME% if "%ACTIVEMQ_CONF%" == "" set ACTIVEMQ_CONF=%ACTIVEMQ_HOME%\conf if "%ACTIVEMQ_DATA%" == "" set ACTIVEMQ_DATA=%ACTIVEMQ_HOME%\data if "%ACTIVEMQ_TMP%" == "" set ACTIVEMQ_TMP=%ACTIVEMQ_DATA%\tmp if "%ACTIVEMQ_OPTS%" == "" set ACTIVEMQ_OPTS=-Xms1500M -Xmx1500M -Xss256K -Djava.util.logging.config.file=logging.properties -Dorg.apache.activema.UseDedicatedTaskRunner=false if "%SUNJMX%" == "" set SUNJMX=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false REM set SUNJMX=-Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false REM Uncomment to enable YourKit profiling REM SET ACTIVEMQ_DEBUG_OPTS="-agentlib:yjpagent" REM Uncomment to enable remote debugging REM SET ACTIVEMQ_DEBUG_OPTS=-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005 REM Setup ActiveMQ Classpath. REM Add instance conf dir before AMQ install conf dir to pick up instance-specific classpath entries first set ACTIVEMQ_CLASSPATH=%ACTIVEMQ_CONF%;%ACTIVEMQ_BASE%/conf;%ACTIVEMQ_HOME%/conf;%ACTIVEMQ_CLASSPATH% "%_JAVACMD%" %SUNJMX% %ACTIVEMQ_DEBUG_OPTS% %ACTIVEMQ_OPTS% %SSL_OPTS% -Dactivemq.classpath="%ACTIVEMQ_CLASSPATH%" -Dactivemq.home="%ACTIVEMQ_HOME%" -Dactivemq.base="%ACTIVEMQ_BASE%" -Dactivemq.conf="%ACTIVEMQ_CONF%" -Dactivemq.data="%ACTIVEMQ_DATA%" -Djava.io.tmpdir="%ACTIVEMQ_TMP%" -jar "%ACTIVEMQ_HOME%/bin/activemq.jar" start %* goto end :end set _JAVACMD= if "%OS%"=="Windows_NT" @endlocal :mainEnd if exist "%HOME%\activemqrc_post.bat" call "%HOME%\activemqrc_post.bat"
activemq.xml:
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <!-- START SNIPPET: example --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:${activemq.conf}/credentials.properties</value> </property> </bean> <!-- The <broker> element is used to configure the ActiveMQ broker. --> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"> <!-- For better performances use VM cursor and small memory limit. For more information, see: http://activemq.apache.org/message-cursors.html Also, if your producer is "hanging", it's probably due to producer flow control. For more information, see: http://activemq.apache.org/producer-flow-control.html --> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="true"> <!-- The constantPendingMessageLimitStrategy is used to prevent slow topic consumers to block producers and affect other consumers by limiting the number of messages that are retained For more information, see: http://activemq.apache.org/slow-consumer-handling.html --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000"/> </pendingMessageLimitStrategy> </policyEntry> <policyEntry queue=">" producerFlowControl="false" memoryLimit="20mb" > <!-- Use VM cursor for better latency For more information, see: http://activemq.apache.org/message-cursors.html <pendingQueuePolicy> <vmQueueCursor/> </pendingQueuePolicy> --> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <!-- The managementContext is used to configure how ActiveMQ is exposed in JMX. By default, ActiveMQ uses the MBean server that is started by the JVM. For more information, see: http://activemq.apache.org/jmx.html --> <managementContext> <managementContext createConnector="false"/> </managementContext> <!-- Configure message persistence for the broker. The default persistence mechanism is the KahaDB store (identified by the kahaDB tag). For more information, see: http://activemq.apache.org/persistence.html --> <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/> <!--<amqPersistenceAdapter directory="${activemq.data}/activemq-data" /> --> </persistenceAdapter> <!-- The systemUsage controls the maximum amount of space the broker will use before slowing down producers. For more information, see: http://activemq.apache.org/producer-flow-control.html If using ActiveMQ embedded - the following limits could safely be used: <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="1 gb"/> </storeUsage> <tempUsage> <tempUsage limit="100 mb"/> </tempUsage> </systemUsage> </systemUsage> --> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="1400 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="100 gb"/> </storeUsage> <tempUsage> <tempUsage limit="50 gb"/> </tempUsage> </systemUsage> </systemUsage> <!-- The transport connectors expose ActiveMQ over a given protocol to clients and other brokers. For more information, see: http://activemq.apache.org/configuring-transports.html --> <transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="nio://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> </transportConnectors> <!-- destroy the spring context on shutdown to stop jetty --> <shutdownHooks> <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> </shutdownHooks> </broker> <!-- Enable web consoles, REST and Ajax APIs and demos Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details --> <import resource="jetty.xml"/> </beans> <!-- END SNIPPET: example -->
Producer for java:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.0.87:61616?wireFormat.maxInactivityDuration=0"); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(qName); ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue); // Default DeliveryMode.PERSISTENT //producer.setPriority(3); // Default priority 4. //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); session.createObjectMessage(); jmsMessage.setObject(message); producer.send(jmsMessage);
Consumer for java:
final String qName = "Test.foo?consumer.prefetchSize=100"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.0.87:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(qName); ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(destination); System.out.println(consumer.getPrefetchNumber()); //listener 方式 consumer.setMessageListener(new MessageListener() { public void onMessage(Message msg) { //TODO something.... try { System.out.println("收到消息:"+counter.incrementAndGet()+","+msg); } catch (JMSException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } });
服务器设备配置:
os:windows 2003 r2
vm:4G
cpu:2 core 2,8ghz
硬盘:SCSI 80G 10000rpm
内网百兆网卡
客户端设备配置:
普通PC, windows xp系统
测得结果:
1、消息大小1K、非事务、持久化消息,服务器端采用NIO:
单个连接 500个消息/sec,2个连接 1000个消息/sec,服务器端系统资源很稳定。
2、消息大小1K、非事务、非持久化消息,服务器端采用NIO:
单个连接2000+个消息/sec,CPU占用较高,内存回收稳定,受理的消息占用内存超过jvm给定内存后,AMQ不报异常也不会终止程序,但会导致消息丢失,内存回收频率升高。
控制台:
http://192.168.0.87:8161/admin/queues.jsp
参考资源:
http://activemq.apache.org/cms/configuring.html
http://www.cnblogs.com/zhishan/archive/2013/04/01/2993334.html
http://activemq.apache.org/maven/apidocs/index.html