springboot + rocketmq实现简单消息队列

1、先安装rocketmq , 配置环境变量 , 这里就不写怎么安装了 , cmd命令行进入bin目录 , 运行name-server和broker,分别用如下两个命令行
start mqnamesrv.cmdstart mqbroker.cmd -n localhost:9876不要关闭命令行窗口 , 当然也可以用后台运行的方式运行这两个文件
2、创建springboot项目 , 下面是依赖包
org.springframework.bootspring-boot-devtoolsruntimetrueorg.projectlomboklomboktruecom.alibaba.rocketmqrocketmq-client3.2.6org.springframework.bootspring-boot-starter-testtestorg.junit.vintagejunit-vintage-engineorg.springframework.bootspring-boot-starter-webRELEASE3、配置文件 application.properties
###producer#该应用是否启用生产者rocketmq.producer.isOnOff=on#发送同一类消息的设置为同一个group , 保证唯一,默认不需要设置 , rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示rocketmq.producer.groupName=hpGroup#mq的nameServer地址rocketmq.producer.namesrvAddr=127.0.0.1:9876#消息最大长度 默认1024*4(4M)rocketmq.producer.maxMessageSize=4096#发送消息超时时间,默认3000rocketmq.producer.sendMsgTimeout=3000#发送消息失败重试次数 , 默认2rocketmq.producer.retryTimesWhenSendFailed=2###consumer##该应用是否启用消费者rocketmq.consumer.isOnOff=onrocketmq.consumer.groupName=hpGroup#mq的nameServer地址rocketmq.consumer.namesrvAddr=127.0.0.1:9876#该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;rocketmq.consumer.topics=rocketTopic~*rocketmq.consumer.consumeThreadMin=20rocketmq.consumer.consumeThreadMax=64#设置一次消费消息的条数 , 默认为1条rocketmq.consumer.consumeMessageBatchMaxSize=1rocket.group=rocketGrouprocket.topic=rocketTopicrocket.tag=rocketTag注:要保证rocketmq.consumer.topics去除 ~ 之后的值和rocket.group的值一致 , ~只是分隔符 , 也可以选择其他分隔符 。 4、生产者配置类代码:
package com.hp.rocket.rocket;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class ProducerConfig {private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ;@Value("${rocketmq.producer.groupName}")private String groupName;@Value("${rocketmq.producer.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.producer.maxMessageSize}")private Integer maxMessageSize ;@Value("${rocketmq.producer.sendMsgTimeout}")private Integer sendMsgTimeout;@Value("${rocketmq.producer.retryTimesWhenSendFailed}")private Integer retryTimesWhenSendFailed;@Beanpublic DefaultMQProducer getRocketMQProducer() {DefaultMQProducer producer;producer = new DefaultMQProducer(this.groupName);producer.setNamesrvAddr(this.namesrvAddr);//如果需要同一个jvm中不同的producer往不同的mq集群发送消息 , 需要设置不同的instanceNameif(this.maxMessageSize!=null){producer.setMaxMessageSize(this.maxMessageSize);}if(this.sendMsgTimeout!=null){producer.setSendMsgTimeout(this.sendMsgTimeout);}//如果发送消息失败 , 设置重试次数 , 默认为2次if(this.retryTimesWhenSendFailed!=null){producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);}try {producer.start();} catch (MQClientException e) {e.printStackTrace();}return producer;}}5、消费者配置类代码
package com.hp.rocket.rocket;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;import com.hp.rocket.common.CodeMsg;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.SpringBootConfiguration;import org.springframework.context.annotation.Bean;import org.springframework.util.StringUtils;@Slf4j@SpringBootConfigurationpublic class ConsumerConfig {@Value("${rocketmq.consumer.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.consumer.groupName}")private String groupName;@Value("${rocketmq.consumer.consumeThreadMin}")private int consumeThreadMin;@Value("${rocketmq.consumer.consumeThreadMax}")private int consumeThreadMax;@Value("${rocketmq.consumer.topics}")private String topics;@Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")private int consumeMessageBatchMaxSize;@Autowiredprivate MQConsumeMsgListenerProcessor mqMessageListenerProcessor;@Beanpublic DefaultMQPushConsumer getRocketMQConsumer() throws Exception {if (StringUtils.isEmpty(groupName)){throw new Exception(CodeMsg.SYSTEM_ERROR.getMsg());}if (StringUtils.isEmpty(namesrvAddr)){throw new Exception(CodeMsg.SYSTEM_ERROR.getMsg());}if(StringUtils.isEmpty(topics)){throw new Exception(CodeMsg.SYSTEM_ERROR.getMsg());}DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr(namesrvAddr);consumer.setConsumeThreadMin(consumeThreadMin);consumer.setConsumeThreadMax(consumeThreadMax);consumer.registerMessageListener(mqMessageListenerProcessor);/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动 , 那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);/*** 设置消费模型 , 集群还是广播 , 默认为集群*///consumer.setMessageModel(MessageModel.CLUSTERING);/*** 设置一次消费消息的条数 , 默认为1条*/consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);try {/*** 设置该消费者订阅的主题和tag , 如果是订阅该主题下的所有tag , 则tag使用*;如果需要指定订阅该主题下的某些tag , 则使用||分割 , 例如tag1||tag2||tag3*/String[] topicTagsArr = topics.split(";");for (String topicTags : topicTagsArr) {String[] topicTag = topicTags.split("~");consumer.subscribe(topicTag[0],topicTag[1]);}consumer.start();log.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);}catch (MQClientException e){log.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);throw new Exception(e);}return consumer;}}