springboot + rocketmq实现简单消息队列( 二 )
7、消费者的监听器代码
package com.hp.rocket.rocket;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.common.message.MessageExt;import com.hp.rocket.entity.MessageBack;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.BeanUtils;import org.springframework.stereotype.Component;import org.springframework.util.CollectionUtils;import java.util.List;@Slf4j@Componentpublic class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List
8、通用配置参数
package com.hp.rocket.service;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Service;@Servicepublic class ParamConfigService {@Value("${rocket.group}")public String rocketGroup ;@Value("${rocket.topic}")public String rocketTopic ;@Value("${rocket.tag}")public String rocketTag ;}
9、service层
package com.hp.rocket.service;import com.alibaba.rocketmq.client.consumer.PullResult;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.MessageExt;import com.hp.rocket.entity.MessageBack;import java.util.List;public interface RocketMqService {SendResult openAccountMsg(String msgInfo);}
impl实现类
package com.hp.rocket.service;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service@Slf4jpublic class RocketMqServiceImpl implements RocketMqService{@Resourceprivate DefaultMQProducer defaultMQProducer;@Resourceprivate ParamConfigService paramConfigService ;@Overridepublic SendResult openAccountMsg(String msgInfo) {// 可以不使用Config中的GroupdefaultMQProducer.setProducerGroup(paramConfigService.rocketGroup);log.info("开始发送消息:"+msgInfo);SendResult sendResult = null;try {Message sendMsg = new Message(paramConfigService.rocketTopic,paramConfigService.rocketTag,"open_account_key", msgInfo.getBytes());sendResult = defaultMQProducer.send(sendMsg);log.info("消息发送响应信息:"+sendResult.toString());} catch (Exception e) {e.printStackTrace();}return sendResult ;}}
10、controller层
方便浏览器测试 , 用的get方法
package com.hp.rocket.controller;import com.alibaba.fastjson.JSONObject;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.SendResult;import com.hp.rocket.common.HPResponse;import com.hp.rocket.service.RocketMqService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;@RequestMapping("rocket")@RestControllerpublic class RocketController {@AutowiredRocketMqService rocketMqService;@GetMapping(value = "http://kandian.youth.cn/index/getResult/{msg}")public HPResponse getResult(@PathVariable("msg") String msg){if(StringUtils.isEmpty(msg)){return HPResponse.error(CodeMsg.SYSTEM_ERROR.fillArgs("参数不能为空"));}SendResult result = rocketMqService.openAccountMsg(msg);return HPResponse.success(result);}}
11、response返回类也贴上来 , 方便新手直接拿来用
package com.hp.rocket.common;public class HPResponse {private int code;private String msg;private T data;public staticHPResponse success(T data){return new HPResponse(data);}public staticHPResponse success(){return new HPResponse();}public staticHPResponse error(CodeMsg codeMsg){return newHPResponse(codeMsg);}private HPResponse(T data) {this.code = 200;this.msg = "success";this.data = http://kandian.youth.cn/index/data;}private HPResponse() {this.code = 200;this.msg ="success";}private HPResponse(CodeMsg codeMsg) {if(codeMsg == null) {return;}this.code = codeMsg.getCode();this.msg = codeMsg.getMsg();}public int getCode() {return code;}public String getMsg() {return msg;}public T getData() {return data;}}
- 与用户|掌握好这4个步骤,实现了规模性的盈利
- 落地|“电竞之都”争夺战中,城市们该怎样实现产业落地?
- 美好生活|以人为本实现万物互融,中国视频社会化时代开启
- 手机|女神的自拍秘密,只需一部vivo S7便可以实现
- 自动任务|赶在三星 S21 发布之前实现语音解锁
- 产业|新主导力量来了,上海如何实现一次“革命性重塑”?
- Mate40Pro|华为Mate40Pro前置镜头有多强实现的这些功能国产机没人做到
- 突破|再传喜讯国产8英寸石墨烯晶圆亮相,中国芯再次实现新突破
- 如何基于Python实现自动化控制鼠标和键盘操作
- 小天才电话手表立体定位技术,真正实现无死角定位