springboot + rocketmq实现简单消息队列( 二 )

7、消费者的监听器代码
package com.hp.rocket.rocket;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.common.message.MessageExt;import com.hp.rocket.entity.MessageBack;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.BeanUtils;import org.springframework.stereotype.Component;import org.springframework.util.CollectionUtils;import java.util.List;@Slf4j@Componentpublic class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {if(CollectionUtils.isEmpty(list)){log.info("接受到的消息为空 , 不处理 , 直接返回成功");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}MessageExt messageExt = list.get(0);log.info("接受到的消息为:"+new String(messageExt.getBody()));log.info("接受到的消息为:"+messageExt.toString());//MessageBack messageBack = new MessageBack();//messageBack.setMsg(new String(messageExt.getBody()));//messageBack.setId(messageExt.getMsgId());if(messageExt.getTopic().equals("你的Topic")){if(messageExt.getTags().equals("你的Tag")){//判断该消息是否重复消费int reconsume = messageExt.getReconsumeTimes();if(reconsume ==3){//消息已经重试了3次 , 如果不需要再次消费 , 则返回成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}//TODO 处理对应的业务逻辑}}// 如果没有return success, consumer会重新消费该消息 , 直到return successreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}8、通用配置参数
package com.hp.rocket.service;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Service;@Servicepublic class ParamConfigService {@Value("${rocket.group}")public String rocketGroup ;@Value("${rocket.topic}")public String rocketTopic ;@Value("${rocket.tag}")public String rocketTag ;}9、service层
package com.hp.rocket.service;import com.alibaba.rocketmq.client.consumer.PullResult;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.MessageExt;import com.hp.rocket.entity.MessageBack;import java.util.List;public interface RocketMqService {SendResult openAccountMsg(String msgInfo);}impl实现类
package com.hp.rocket.service;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service@Slf4jpublic class RocketMqServiceImpl implements RocketMqService{@Resourceprivate DefaultMQProducer defaultMQProducer;@Resourceprivate ParamConfigService paramConfigService ;@Overridepublic SendResult openAccountMsg(String msgInfo) {// 可以不使用Config中的GroupdefaultMQProducer.setProducerGroup(paramConfigService.rocketGroup);log.info("开始发送消息:"+msgInfo);SendResult sendResult = null;try {Message sendMsg = new Message(paramConfigService.rocketTopic,paramConfigService.rocketTag,"open_account_key", msgInfo.getBytes());sendResult = defaultMQProducer.send(sendMsg);log.info("消息发送响应信息:"+sendResult.toString());} catch (Exception e) {e.printStackTrace();}return sendResult ;}}10、controller层
方便浏览器测试 , 用的get方法
package com.hp.rocket.controller;import com.alibaba.fastjson.JSONObject;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.SendResult;import com.hp.rocket.common.HPResponse;import com.hp.rocket.service.RocketMqService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;@RequestMapping("rocket")@RestControllerpublic class RocketController {@AutowiredRocketMqService rocketMqService;@GetMapping(value = "http://kandian.youth.cn/index/getResult/{msg}")public HPResponse getResult(@PathVariable("msg") String msg){if(StringUtils.isEmpty(msg)){return HPResponse.error(CodeMsg.SYSTEM_ERROR.fillArgs("参数不能为空"));}SendResult result = rocketMqService.openAccountMsg(msg);return HPResponse.success(result);}}11、response返回类也贴上来 , 方便新手直接拿来用
package com.hp.rocket.common;public class HPResponse {private int code;private String msg;private T data;public staticHPResponse success(T data){return new HPResponse(data);}public staticHPResponse success(){return new HPResponse();}public staticHPResponse error(CodeMsg codeMsg){return newHPResponse(codeMsg);}private HPResponse(T data) {this.code = 200;this.msg = "success";this.data = http://kandian.youth.cn/index/data;}private HPResponse() {this.code = 200;this.msg ="success";}private HPResponse(CodeMsg codeMsg) {if(codeMsg == null) {return;}this.code = codeMsg.getCode();this.msg = codeMsg.getMsg();}public int getCode() {return code;}public String getMsg() {return msg;}public T getData() {return data;}}