现在的位置: 首页 > 综合 > 正文

Apache ActiveMQ单点基本配置

2019年06月20日 ⁄ 综合 ⁄ 共 10376字 ⁄ 字号 评论关闭

activemq.bat:

@echo off

REM ------------------------------------------------------------------------
REM Licensed to the Apache Software Foundation (ASF) under one or more
REM contributor license agreements.  See the NOTICE file distributed with
REM this work for additional information regarding copyright ownership.
REM The ASF licenses this file to You under the Apache License, Version 2.0
REM (the "License"); you may not use this file except in compliance with
REM the License.  You may obtain a copy of the License at
REM
REM http://www.apache.org/licenses/LICENSE-2.0
REM
REM Unless required by applicable law or agreed to in writing, software
REM distributed under the License is distributed on an "AS IS" BASIS,
REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
REM See the License for the specific language governing permissions and
REM limitations under the License.
REM ------------------------------------------------------------------------
set JAVA_HOME=E:\Jason\jdk1.6.0_43
if exist "%HOME%\activemqrc_pre.bat" call "%HOME%\activemqrc_pre.bat"

if "%OS%"=="Windows_NT" @setlocal

rem %~dp0 is expanded pathname of the current script under NT
set DEFAULT_ACTIVEMQ_HOME=%~dp0..

if "%ACTIVEMQ_HOME%"=="" set ACTIVEMQ_HOME=%DEFAULT_ACTIVEMQ_HOME%
set DEFAULT_ACTIVEMQ_HOME=

:doneStart
rem find ACTIVEMQ_HOME if it does not exist due to either an invalid value passed
rem by the user or the %0 problem on Windows 9x
if exist "%ACTIVEMQ_HOME%\README.txt" goto checkJava

rem check for activemq in Program Files on system drive
if not exist "%SystemDrive%\Program Files\activemq" goto checkSystemDrive
set ACTIVEMQ_HOME=%SystemDrive%\Program Files\activemq
goto checkJava

:checkSystemDrive
rem check for activemq in root directory of system drive
if not exist %SystemDrive%\activemq\README.txt goto checkCDrive
set ACTIVEMQ_HOME=%SystemDrive%\activemq
goto checkJava

:checkCDrive
rem check for activemq in C:\activemq for Win9X users
if not exist C:\activemq\README.txt goto noAntHome
set ACTIVEMQ_HOME=C:\activemq
goto checkJava

:noAntHome
echo ACTIVEMQ_HOME is set incorrectly or activemq could not be located. Please set ACTIVEMQ_HOME.
goto end

:checkJava
set _JAVACMD=%JAVACMD%

if "%JAVA_HOME%" == "" goto noJavaHome
if not exist "%JAVA_HOME%\bin\amq_test.exe" goto noJavaHome
if "%_JAVACMD%" == "" set _JAVACMD=%JAVA_HOME%\bin\amq_test.exe
goto runAnt

:noJavaHome
if "%_JAVACMD%" == "" set _JAVACMD=amq_test.exe
echo.
echo Warning: JAVA_HOME environment variable is not set.
echo.

:runAnt

if "%ACTIVEMQ_BASE%" == "" set ACTIVEMQ_BASE=%ACTIVEMQ_HOME%

if "%ACTIVEMQ_CONF%" == "" set ACTIVEMQ_CONF=%ACTIVEMQ_HOME%\conf

if "%ACTIVEMQ_DATA%" == "" set ACTIVEMQ_DATA=%ACTIVEMQ_HOME%\data

if "%ACTIVEMQ_TMP%" == "" set ACTIVEMQ_TMP=%ACTIVEMQ_DATA%\tmp

if "%ACTIVEMQ_OPTS%" == "" set ACTIVEMQ_OPTS=-Xms1500M -Xmx1500M -Xss256K -Djava.util.logging.config.file=logging.properties -Dorg.apache.activema.UseDedicatedTaskRunner=false

if "%SUNJMX%" == "" set SUNJMX=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
REM set SUNJMX=-Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false

REM Uncomment to enable YourKit profiling
REM SET ACTIVEMQ_DEBUG_OPTS="-agentlib:yjpagent"

REM Uncomment to enable remote debugging
REM SET ACTIVEMQ_DEBUG_OPTS=-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005

REM Setup ActiveMQ Classpath.
REM Add instance conf dir before AMQ install conf dir to pick up instance-specific classpath entries first
set ACTIVEMQ_CLASSPATH=%ACTIVEMQ_CONF%;%ACTIVEMQ_BASE%/conf;%ACTIVEMQ_HOME%/conf;%ACTIVEMQ_CLASSPATH%

"%_JAVACMD%" %SUNJMX% %ACTIVEMQ_DEBUG_OPTS% %ACTIVEMQ_OPTS% %SSL_OPTS% -Dactivemq.classpath="%ACTIVEMQ_CLASSPATH%" -Dactivemq.home="%ACTIVEMQ_HOME%" -Dactivemq.base="%ACTIVEMQ_BASE%" -Dactivemq.conf="%ACTIVEMQ_CONF%" -Dactivemq.data="%ACTIVEMQ_DATA%" -Djava.io.tmpdir="%ACTIVEMQ_TMP%" -jar "%ACTIVEMQ_HOME%/bin/activemq.jar" start %*

goto end


:end
set _JAVACMD=
if "%OS%"=="Windows_NT" @endlocal

:mainEnd
if exist "%HOME%\activemqrc_post.bat" call "%HOME%\activemqrc_post.bat"

activemq.xml:

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

        <!--
            For better performances use VM cursor and small memory limit.
            For more information, see:

http://activemq.apache.org/message-cursors.html

            Also, if your producer is "hanging", it's probably due to producer flow control.
            For more information, see:

http://activemq.apache.org/producer-flow-control.html

        -->

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true">
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="false" memoryLimit="20mb" >
                  <!-- Use VM cursor for better latency
                       For more information, see:

http://activemq.apache.org/message-cursors.html

                  <pendingQueuePolicy>
                    <vmQueueCursor/>
                  </pendingQueuePolicy>
                  -->
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

http://activemq.apache.org/jmx.html

        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

http://activemq.apache.org/persistence.html

        -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"
					enableIndexWriteAsync="true"
					enableJournalDiskSyncs="false"/>
			<!--<amqPersistenceAdapter directory="${activemq.data}/activemq-data" /> -->
        </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before slowing down producers. For more information, see:

http://activemq.apache.org/producer-flow-control.html

            If using ActiveMQ embedded - the following limits could safely be used:

        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="20 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="1 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>
        -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="1400 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

http://activemq.apache.org/configuring-transports.html

        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="nio://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireformat.maxFrameSize=104857600"/>
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

Producer for java:

ActiveMQConnectionFactory connectionFactory = 
	new ActiveMQConnectionFactory("tcp://192.168.0.87:61616?wireFormat.maxInactivityDuration=0");
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

Queue queue = session.createQueue(qName);
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue); // Default DeliveryMode.PERSISTENT
//producer.setPriority(3); // Default priority 4.
//producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

session.createObjectMessage();
jmsMessage.setObject(message);
producer.send(jmsMessage);

Consumer for java:

final String qName = "Test.foo?consumer.prefetchSize=100";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.0.87:61616");  
Connection connection = connectionFactory.createConnection();
connection.start();

final Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
Destination destination = session.createQueue(qName);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(destination);
System.out.println(consumer.getPrefetchNumber());
//listener 方式 
consumer.setMessageListener(new MessageListener() { 

	public void onMessage(Message msg) { 
	    //TODO something.... 
	    try {
		System.out.println("收到消息:"+counter.incrementAndGet()+","+msg);
		} catch (JMSException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		} 
	} 

}); 

服务器设备配置:

os:windows 2003 r2

vm:4G

cpu:2 core  2,8ghz

硬盘:SCSI 80G 10000rpm

内网百兆网卡

客户端设备配置:

普通PC, windows xp系统

测得结果:

1、消息大小1K、非事务、持久化消息,服务器端采用NIO:

单个连接 500个消息/sec,2个连接 1000个消息/sec,服务器端系统资源很稳定。

2、消息大小1K、非事务、非持久化消息,服务器端采用NIO:

单个连接2000+个消息/sec,CPU占用较高,内存回收稳定,受理的消息占用内存超过jvm给定内存后,AMQ不报异常也不会终止程序,但会导致消息丢失,内存回收频率升高。

控制台:

http://192.168.0.87:8161/admin/queues.jsp

参考资源:

http://activemq.apache.org/cms/configuring.html

http://www.cnblogs.com/zhishan/archive/2013/04/01/2993334.html

http://activemq.apache.org/maven/apidocs/index.html

抱歉!评论已关闭.