您现在的位置是:首页 > 后台技术 > JavaJava
rocketMQ实现消息集群消费(图文)
第十三双眼睛2021-06-26【Java】人已围观
简介本节记录如何实现rocketMQ下消息的集群消费
在公司上班有一次遇到一个问题,rocketMQ消息太多,单个应用消费速度慢,这时就应该增加消费者来提高消息消费速度了。特意记录一下此次的过程:
数据由一个生产者定时产生,每秒产生一条,弄两个消费者去消费,让消费者线程睡一会,模拟处理时间,项目结构如下:
一个消费者
项目为springboo项目,pom文件如下:
生产者:
定时产生消息:
消费者项目结构:
先后启动生产者,消费者1,消费者2
生产者发送10条消息:
消费者1消费的消息
消费者2消费的消息
可以看到生产者产生的消息由消费者1和消费者2共同消费,这样,就可以提供消费速度,消息不会再积压了。
数据由一个生产者定时产生,每秒产生一条,弄两个消费者去消费,让消费者线程睡一会,模拟处理时间,项目结构如下:
一个消费者
项目为springboo项目,pom文件如下:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.5.RELEASE</version> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <!-- <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> </exclusions> --> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.0.1</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.6</version> </dependency> <!-- mqtt --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> <!-- rocket mq --> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.7.9.Final</version> </dependency> <!-- mqtt 管控API--> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <!-- <optional>true</optional> --> <version>4.3.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-ons</artifactId> <version>3.1.0</version> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.10</version> <scope>provided</scope> </dependency> </dependencies> |
package com.xinchen.producer.mqtt; import java.util.Properties; import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.exception.ONSClientException; import com.xinchen.producer.message.MessageTest; @Component public class MQTTProducer { @Value("${aliyun.mq.topic}") private String topic; @Value("${aliyun.mq.group}") private String group; @Value("${aliyun.mqtt.group}") private String mqttGroup; @Value("${aliyun.mq.ak}") private String ak; @Value("${aliyun.mq.sk}") private String sk; @Value("${aliyun.mq.nameserver}") private String nameserver; @Value("${aliyun.mq.regionId}") private String regionId; @Value("${aliyun.mq.instanceId}") private String instanceId; private Producer producer; @PostConstruct public void initProducer() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.GROUP_ID, group); properties.put(PropertyKeyConst.AccessKey, ak); properties.put(PropertyKeyConst.SecretKey, sk); properties.put(PropertyKeyConst.NAMESRV_ADDR, nameserver); properties.put(PropertyKeyConst.MqttQOS, 1); producer = ONSFactory.createProducer(properties); try { producer.start(); System.out.println("producer 已启动"); } catch (Exception e) { e.printStackTrace(); } } public SendResult send(MessageTest messageTest) { SendResult sendResult = null; try { String clientId = mqttGroup + "@@@" + messageTest.getClientId(); Message msg = new Message(topic,"MQTT_TAG",messageTest.getDesc().getBytes()); msg.setKey("UNIQUE_KEY_" + clientId); msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, "/p2p/" + clientId + "/" + messageTest.getOperate()); sendResult = producer.send(msg); } catch (ONSClientException e) { e.printStackTrace(); } return sendResult; } } |
@Scheduled(cron = "0/1 * * * * *") public void testList() { while(true) { MessageTest message = new MessageTest(); message.setClientId("producer"); message.setOperate("open"); message.setDesc("producer"); SendResult sendResult = producer.send(message); System.out.println(sendResult.getMessageId()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } count ++; System.out.println("发送消息:" + count); } } |
package com.xinchen.consumer.mqtt; import java.util.Properties; import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.PropertyValueConst; @Component public class MQTTConsumer { @Value("${aliyun.mq.topic}") private String topic; @Value("${aliyun.mq.group}") private String group; @Value("${aliyun.mqtt.group}") private String mqttGroup; @Value("${aliyun.mq.ak}") private String ak; @Value("${aliyun.mq.sk}") private String sk; @Value("${aliyun.mq.nameserver}") private String nameserver; private Consumer consumer; @Autowired private MQTTCallBack mqttCallBack; @PostConstruct public void initConsumer() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.GROUP_ID, group); properties.put(PropertyKeyConst.AccessKey, ak); properties.put(PropertyKeyConst.SecretKey, sk); properties.put(PropertyKeyConst.NAMESRV_ADDR, nameserver); properties.put(PropertyKeyConst.ConsumeThreadNums, 10); properties.put(PropertyKeyConst.MaxReconsumeTimes, 10); properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); consumer = ONSFactory.createConsumer(properties); consumer.subscribe(topic, "*", mqttCallBack); try { consumer.start(); System.out.println("Consumer启动成功"); } catch (Exception e) { e.printStackTrace(); } } } |
生产者发送10条消息:
C0A800645D5401D16E9385296F9B0000 发送消息:1 C0A800645D5401D16E93852974D90003 发送消息:2 C0A800645D5401D16E93852978E30006 发送消息:3 C0A800645D5401D16E9385297CEC0009 发送消息:4 C0A800645D5401D16E93852980FE000C 发送消息:5 C0A800645D5401D16E938529850D000F 发送消息:6 C0A800645D5401D16E938529891B0012 发送消息:7 C0A800645D5401D16E9385298D2F0015 发送消息:8 C0A800645D5401D16E93852991620018 发送消息:9 C0A800645D5401D16E938529956C001B 发送消息:10 |
消费者1处理中...... 消费者11111消费消息:1 消费者1处理中...... 消费者11111消费消息:2 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E93852978E30006 body:producer 消费者1处理中...... 消费者11111消费消息:3 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E9385297CEC0009 body:producer 消费者1处理中...... 消费者11111消费消息:4 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E93852980FE000C body:producer 消费者1处理中...... 消费者11111消费消息:5 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E938529850D000F body:producer 消费者1处理中...... 消费者11111消费消息:6 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E938529891B0012 body:producer 消费者1处理中...... 消费者11111消费消息:7 |
消费者2处理中...... 消费者2222消费消息:1 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E93852991620018 body:producer 消费者2处理中...... 消费者2222消费消息:2 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E938529956C001B body:producer 消费者2处理中...... 消费者2222消费消息:3 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E938529997A001E body:producer 消费者2处理中...... 消费者2222消费消息:4 |
Tags:
很赞哦! ()
相关文章
随机图文
-
JWT的使用(图文)
Json web token (JWT), 是为了在网络应用环境间传递声明而执行的一种基于JSON的开放标准((RFC 7519).该token被设计为紧凑且安全的,特别适用于分布式站点的单点登录(SSO)场景。JWT的声明一般被用来在身份提供者和服务提供者间传递被认证的用户身份信息,以便于从资源服务器获取资源,也可以增加一些额外的其它业务逻辑所必须的声明信息,该token也可直接被用于认证,也可被加密 -
java实现websocket(图文)
WebSocket是HTML5开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。 在WebSocket API中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。 -
java 实现MQTT客户端(图文)
前端时间,总结了以下java使用MQTT客户端的使用方法,特地记录一下,方便以后使用 -
微服务springcloud(图文)
微服务就是由一系列围绕自己业务开发的微小服务构成,他们独立部署运行在自己的进程里,基于分布式的管理 微服务是一种架构方式: 将单一应用基于业务拆分多个微小服务,他们独立运行 独立部署 每个服务运行在自己计算机进程里面 对于这些服务都是分布式管理。 这种架构是将单个的整体应用程序分割成更小的项目关联的独立的服务。一个服务通常实现一组独立的特性或功能,包含自己的业务逻辑和适配器。各个微服务之间的关联通过暴露api来实现(http)。这些独立的微服务不需要部署在同一个虚拟机,同一个系统和同一个应用服务器中