SpringBoot集成应用RocketMQ的集成用法(上)
1。RocketMQ集成介绍
在金融互联网领域广泛应用,在阿里双11活动经历过多次考验,经过严苛的生产验证,有比较高的可靠性,在数据处理上有比较高的稳定性,能从最大程度上保证消息不易丢失,如果业务上有一定的规模,且对数据的一致性,稳定性要求严苛,那么可以采用RocketMQ,比如金融互联网领域,支付场景、交易场景等。如果有借助消息队列实现分布式事务,RocketMQ可以作为首选。
SpringBoot官方提供了springbootstarteractivemq对ActiveMQ的支持,但并没有提供对RocketMQ的支持,这不代表SpringBoot本身不支持,RocketMQ官方给我们提供了RocketMQSpring框架,整合了RocketMQ与SpringBoot,主要提供3个特性:使用RocketMQTemplate用来统一发送消息,包括同步、异步发送消息和事务消息RocketMQTransactionListener注解用来处理事务消息的监听和回查RocketMQMessageListener注解用来消费消息2。RocketMQ安装说明
简要安装说明,详情请参考官方文档:
1、下载RocketMQ安装文件
采用其他镜像下载:https:mirrors。cloud。tencent。comapacherocketmq4。3。2rocketmqall4。3。2binrelease。zip
2、启动NameServernohupshbinmqnamesrvtailflogsrocketmqlogsnamesrv。log
3、启动Brokernohupshbinmqbrokernlocalhost:9876tailflogsrocketmqlogsbroker。log3。RocketMQ集成配置
采用RocketMQ官方提供得rocketmqspringbootstarter作为集成组件。
1、创建springbootmqrocket父级工程
MAVEN依赖:propertiesrocketmqspringbootstarterversion2。0。3rocketmqspringbootstarterversionpropertiesdependencies!RocketMq与SpringBoot集成组件依赖dependencygroupIdorg。apache。rocketmqgroupIdrocketmqspringbootstarterartifactIdversion{rocketmqspringbootstarterversion}versiondependencydependencies
2、创建rocketmqbasic工程
工程依赖直接继承父级依赖,无须添加其他依赖组件。
工程配置:
application。yml文件:server:port:12613spring:application:name:rocketmqbasicRocketMQ配置rocketmq:nameserver:10。10。20。15:9876producer:group:basicgroup
配置填写RocketMQ地址信息,如果是集群,多个以逗号分割。
3、创建启动类
com。mirson。spring。boot。mq。rocket。basic。startup。RocketMqBasicApplicationSpringBootApplicationComponentScan(basePackages{com。mirson})publicclassRocketMqBasicApplication{publicstaticvoidmain(String〔〕args){SpringApplication。run(RocketMqBasicApplication。class,args);}}
扫描包含com。mirson包下所有路径。4。RocketMQ集成之普通消息处理
1、定义监听器
com。mirson。spring。boot。mq。rocket。basic。consume。StringConsumer:ServiceRocketMQMessageListener(topicRabbitMqConfig。TOPIC,consumerGroupRabbitMqConfig。CONSUMEGROUPSTRING)Log4j2publicclassStringConsumerimplementsRocketMQListenerString{OverridepublicvoidonMessage(Stringmessage){log。info(StringConsumerreceive:message);}}订阅的主题为RabbitMqConfig。TOPIC,订阅的分组为RabbitMqConfig。CONSUMEGROUPSTRING。实现RocketMQListener接口,将接收的消息通过日志打印。
2、提供接口
com。mirson。spring。boot。mq。rocket。basic。provider。RocketMqProviderContorllerRestControllerLog4j2publicclassRocketMqProviderContorller{ResourceprivateRocketMQTemplaterocketMQTemplate;生产者发送字符类型消息returnGetMapping(sendString)publicStringsendString(){Stringmsgrandomnumber:RandomUtils。nextInt(0,100);SendstringSendResultsendResultrocketMQTemplate。syncSend(RabbitMqConfig。TOPIC,msg);log。info(sendresult:sendResult。getSendStatus());returnmsg;}。。。}
提供sendString接口,每次请求发送一个随机数,通过RocketMQTemplate的syncSend同步方法发送数据。
如果发送成功,会返回状态:SENDOK。
3、调用验证访问接口地址:http:127。0。0。1:12613sendString
查看监听器日志
可以看到,String类型的普通消息监听器,正常接收到消息。
5。RocketMQ集成之原生消息处理
RocketMQ原生消息,除了发送的数据,还可以获取RocketMQ内置的系统信息,比如消息ID,主机名称,时间戳,队列信息等。
1、定义监听器
com。mirson。spring。boot。mq。rocket。basic。consume。MessageExtConsumerServiceRocketMQMessageListener(topicRabbitMqConfig。TOPICEXT,selectorExpressiontag1,consumerGroupRabbitMqConfig。CONSUMEGROUPEXT)Log4j2publicclassMessageExtConsumerimplementsRocketMQListenerMessageExt,RocketMQPushConsumerLifecycleListener{OverridepublicvoidonMessage(MessageExtmessage){log。info(MessageExtConsumerreceivemsgId:{},msgData:{},message。getMsgId(),newString(message。getBody()));}自定义消费者的开始位置,这里设置的是当前时间paramconsumerOverridepublicvoidprepareStart(DefaultMQPushConsumerconsumer){setconsumerconsumemessagefromnowconsumer。setConsumeFromWhere(ConsumeFromWhere。CONSUMEFROMTIMESTAMP);consumer。setConsumeTimestamp(UtilAll。timeMillisToHumanString3(System。currentTimeMillis()));}}订阅的主题为RabbitMqConfig。TOPICEXT,订阅的Group为RabbitMqConfig。CONSUMEGROUPEXT。打印接收到的消息ID与数据。实现RocketMQPushConsumerLifecycleListener接口,可以自定义消费者的开始消息位置,这里设置的是当前时间。
2、提供发送接口
com。mirson。spring。boot。mq。rocket。basic。provider。RocketMqProviderContorller,增加接口:发送RocketMQ原生消息returnGetMapping(sendStringExt)publicStringsendStringExt(){Stringmsgrandomnumber:RandomUtils。nextInt(0,100);try{SendResultresultrocketMQTemplate。syncSend(RabbitMqConfig。TOPICEXT:tag1,msg);log。info(result:result。getSendStatus());}catch(Exceptione){log。error(e。getMessage(),e);}SendStringExtMessagereturnmsg;}发送一个随机数,增加了一个tag1标记,与上面RocketMQMessageListener注解中的selectorExpression需保持一致,如不匹配,不能收到对应消息。与正常发送方式没有差异,不需做额外处理,仍采用同步方式发送。
3、测试验证访问接口:
http:127。0。0。1:12613sendStringExt
查看监听器日志
6。RocketMQ集成之SpringMessage消息
SpringMessage是一种消息传输规范,RocketMQ可以支持,在SpringCloudStream中采用的就是SpringMessage作为消息传输规范,这是一个用于构建基于消息的微服务应用框架。
1、定义传输对象
在实际消息交互当中,不会传输简单的数据结构,一般传递的是业务对象,这里定义一个订单对象:
com。mirson。spring。boot。mq。rocket。basic。bo。OrderDatapublicclassOrderimplementsSerializable{privatestaticfinallongserialVersionUID1L;订单IDprivateStringorderId;创建时间privateDatecreateDate;}
消息交互当中,默认会通过序列化传递,需要实现序列化接口。
2、定义监听器
com。mirson。spring。boot。mq。rocket。basic。consume。OrderSpringMessageConsumerServiceRocketMQMessageListener(topicRabbitMqConfig。TOPICSPRINGMESSAGE,consumerGroupRabbitMqConfig。CONSUMEGROUPSPRINGMESSAGE)Log4j2publicclassOrderSpringMessageConsumerimplementsRocketMQListenerOrder{OverridepublicvoidonMessage(Orderorder){log。info(OrderSpringMessageConsumerreceiveorder:order);}}定义不同的主题以区分,这里订阅的主题为RabbitMqConfig。TOPICSPRINGMESSAGE,组别为RabbitMqConfig。CONSUMEGROUPSPRINGMESSAGE。实现RocketMQListener接口,泛型为Order;打印接收到的订单数据。
3、定义发送接口
com。mirson。spring。boot。mq。rocket。basic。provider。RocketMqProviderContorller发送RocketMQSpringMessage封装消息returnGetMapping(sendSpringMessage)publicStringsendSpringMessage(){Stringmsgrandomnumber:RandomUtils。nextInt(0,100);OrderordernewOrder();order。setOrderId(UUID。randomUUID()。toString());order。setCreateDate(newDate());SendSpringMessageWithOrderSendResultresultrocketMQTemplate。syncSend(RabbitMqConfig。TOPICSPRINGMESSAGE,MessageBuilder。withPayload(order)。build());log。info(sendresult:result。getSendStatus());returnmsg;}创建一个订单对象,生成UUID作为订单ID,设置订单创建时间。采用同步方式发送,指定主题RabbitMqConfig。TOPICSPRINGMESSAGE,注意,SpringMessage封装采用MessageBuilder,将订单放入playload包体里面,调用build方法进行序列化。
4、测试验证调用发送接口
http:127。0。0。1:12613sendSpringMessage
查看监听器日志
能够正常接收并打印出完整的订单数据。