SpringCloudStream使用详解及部分重点源码分析
环境:Springboot2。3。12。RELEASESpringCloudHoxton。SR12RabbitMQ3。8。12简介
SpringCloudStream是一个框架,用于构建与MQ连接的高度可伸缩的事件驱动微服务。其目的是为了简化消息在SpringCloud应用程序中的开发。屏蔽了各种MQ之间的差异,使得在更换MQ的时候不需要修改代码。
SpringCloudStream支持多种绑定器实现,如下:RabbitMQApacheKafkaKafkaStreamsAmazonKinesisGooglePubSub(partnermaintained)SolacePubSub(partnermaintained)AzureEventHubs(partnermaintained)AWSSQS(partnermaintained)AWSSNS(partnermaintained)ApacheRocketMQ(partnermaintained)
详细查看官方文档,对应每一个MQ都有一个Github地址。
SpringCloudStream的核心构建块是:目标绑定器(DestinationBinders):负责与MQ集成的组件。目标绑定(DestinationBindings):MQ中间件与最终用户提供的应用程序代码(生产者消费者)之间的桥梁。消息(Message):生产者和消费者用来与目标绑定器(以及通过MQ与其他应用程序)通信的规范数据结构。
Stream核心组件关系图快速入门
依赖:propertiesspringcloud。versionHoxton。SR12springcloud。versionpropertiesdependenciesdependencygroupIdorg。springframework。bootgroupIdspringbootstarteramqpartifactIddependencydependencygroupIdorg。springframework。cloudgroupIdspringcloudstarterstreamrabbitartifactIddependencydependenciesdependencyManagementdependenciesdependencygroupIdorg。springframework。cloudgroupIdspringclouddependenciesartifactIdversion{springcloud。version}versiontypepomtypescopeimportscopedependencydependenciesdependencyManagement
应用配置:spring:rabbitmq:host:localhostvirtualhost:busport:5672username:xxxpassword:xxxspring:cloud:stream:bindings:自定义输入输出myInput:指定输入通道对应的主题名destination:demomyOutput:destination:demo
创建消息通道绑定的接口:publicinterfaceStreamBinding{StringINPUTmyInput;StringOUTPUTmyOutput;Input(StreamBinding。INPUT)SubscribableChannelinput();Output(StreamBinding。OUTPUT)MessageChanneloutput();}
通过Input和Output注解定义输入通道和输出通道名称,这里的名称与上面配置文件中的是对应的。
当定义输出通道的时候,需要返回MessageChannel接口对象,该接口定义了向消息通道发送消息的方法;定义输入通道时,需要返回SubscribableChannel接口对象,该接口集成自MessageChannel接口,它定义了维护消息通道订阅者的方法。
这里的Input,Output两个方法容器会分别创建一个Bean对象
创建消费者:ComponentEnableBinding(value{StreamBinding。class})publicclassStreamReceiver{privateLoggerloggerLoggerFactory。getLogger(StreamReceiver。class);StreamListener(StreamBinding。INPUT)publicvoidreceive(Stringmessage){logger。info(接收到消息:{},message);}}
EnableBinding注解用来指定一个或多个定义了Input或Output注解的接口,以此实现对消息通道(Channel)的绑定。上面我们通过EnableBinding(value{StreamClient。class})绑定了StreamClient接口,该接口是我们自己实现的对输入输出消息通道绑定的定义
StreamListener,主要定义在方法上,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。上面我们将receive方法注册为myInput消息通道的监听处理器,当我们往这个消息通道发送信息的时候,receiver方法会执行。
消息发送接口:ResourceprivateStreamBindingstreamBinding;GetMapping(send)publicvoidsend(){streamBinding。output()。send(MessageBuilder。withPayload(FirstMessage。。。)。build());}
启动服务:
查看RabbitMQ
自动为我们创建了一个队列,队列的名称是以我们在配置文件中配置的开头,后面是随机生成的。这个队列会自动删除AD,服务关闭后就自动删除队列;Excl:排他的,存在该队列就不会在创建了。
修改端口后,再启动一个服务:
创建了2个队列,使用其中一个发送消息:
两个服务都收到了消息。消费者组
上面启动了2个服务都能收到消息,在集群的环境下这样肯定会带来问题,如果是业务方面的就会出现重复数据,这时候我们可以通过设置分组的解决此问题。修改配置:spring:cloud:stream:bindings:myInput:指定输入通道对应的主题名destination:demo指定一个组;指定分组以后,不管你启动多少个实例,所有的实例都监听这一个队列多个实例会轮询的接收消息group:gtestmyOutput:destination:demo
再次启动服务后,两个服务会轮询的接收到消息。
启动服务后,两个服务都同时监听同一个队列。队列也不是随机生成的了,并且队列是持久化的,服务断开后队列也不会自动删除。消息分区
通过消费组的设置,虽然能保证同一消息只被一个消费者进行接收和处理,但是对于特殊业务情况,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能被同一个实例消费,这个就可以使用SpringCloudStream提供的消息分区功能。修改配置spring:cloud:stream:bindings:myInput:指定输入通道对应的主题名destination:demo指定一个组;指定分组以后,不管你启动多少个实例,所有的实例都监听这一个队列多个实例会轮询的接收消息group:gtestconsumer:通过该参数开启消费者分区功能partitioned:truemyOutput:destination:demoproducer:这里的配置也可以是SpEL表达式,比如:headers〔partition〕通过消息header获取属性这里会通过表达式及消息对象进行计算得到一个Key,然后获取key的hashCode得到hashCode以后会与partitionCount进行取模运算得到具体的分区partitionKeyExpression:1我这里给的值就是对应的instanceIndex的值,你希望谁接收就设置谁配置的值即可partitionCount:2实例总数instanceCount:2该参数设置了当前实例的索引号,从0开始instanceIndex:0
计算分区源码:
最后得到分区信息后会在消息头中放入一个scstpartition为key,partition为值的头信息。
启动多个实例后,测试发现所有的消息都只是同一个实例收到消息
交换机分别与每一个服务进行绑定使用不同的RoutingKey这样在发送消息的时候就可以根据计算处理的分区进行定向发送消息了。
通过源码查看:
这里通过我们的配置交换机为demo。接着是获取路由key了
这里会从消息header中获取keyscstpartition的头信息。
这样针对使用RabbitMQ的中间件发送消息所需要的交换机及路由key就确定下来了。
完毕!!!
在SpringCloud中你还在使用Ribbon快来试试LoadBalancer
SpringCloudGateway应用详解1之谓词
SpringCloudBus使用说明详解
SpringCloudNacos整合feign
SpringCloudAlibaba之Nacos服务
SpringCloudSentinel整合Feign
SpringCloudHystrix实现资源隔离应用
SpringCloudzuul动态网关配置
SpringCloud微服务日志收集管理ElasticStack完整详细版
SpringCloudFeign实现原理源分析
SpringCloudSentinel热点参数限流