现在的位置: 首页 > 编程语言 > 正文

javaRocketMQ快速入门基础知识

2020年02月14日 编程语言 ⁄ 共 2530字 ⁄ 字号 评论关闭

如何使用

1、引入 rocketmq-client

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.1.0-incubating</version></dependency>

2、编写Producer

DefaultMQProducer producer = new DefaultMQProducer("producer_demo");//指定NameServer地址producer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改为自己的/*** Producer对象在使用之前必须要调用start初始化,初始化一次即可* 注意:切记不可以在每次发送消息时,都调用start方法*/producer.start();for (int i = 0; i < 997892; i++) {try {//构建消息Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("测试RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));//发送同步消息SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();

3、编写Consumer

/*** Consumer Group,非常重要的概念,后续会慢慢补充*/DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo");//指定NameServer地址,多个地址以 ; 隔开consumer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改为自己的/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {try {for(MessageExt msg:msgs){String msgbody = new String(msg.getBody(), "utf-8");System.out.println(" MessageBody: "+ msgbody);//输出消息内容}} catch (Exception e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功}});consumer.start();System.out.printf("Consumer Started.%n");

4、说明

各位根据自己的环境,修改NamesrvAddr的值,我的集群请参考:RocketMQ集群部署配置。稍后通过RocketMQ管控台就可以看到之前搭建的多Master多Slave模式,异步复制集群模式。

5、通过RocketMQ管控台

rocketmq-console-ng获取方式为:rocketmq-console-ng,之后通过mavne进行编译获取jar,命令如下:

mvn clean package -Dmaven.test.skip=truejava -jar target/rocketmq-console-ng-1.0.0.jar

得到rocketmq-console-ng-1.0.0.jar之后,找到rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties文件,根据自己的NamesrvAddr进行修改rocketmq.config.namesrvAddr的值。

直接启动:

java -jar rocketmq-console-ng-1.0.0.jar

管控台是基于springboot的,的确springboot非常方便和非常火了,所以有必要去学习下springboot了(其实还是spring系列,所以spring也必要深入学习下),稍后通过管控台进行观察运行。

6、运行观察

一个好的习惯是先运行Consumer,之后在运行Producer,之后通过rocketmq-console-ng管控台观察

运行完成之后,的确broker-a的数据加上broker-b的数据量就等于我们发送的数据量,而且slave的数量也master的数量也是一致的,效果如下:

查看发送这些数据,2台机器的磁盘情况如下:

到目前位置,关于RocketMQ快速入门就结束了。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

本文标题: java RocketMQ快速入门基础知识

以上就上有关javaRocketMQ快速入门基础知识的相关介绍,要了解更多java,RocketMQ,入门内容请登录学步园。

抱歉!评论已关闭.