将向多个消费者传递一条消息。这种模式被称为发布订阅。 使用fanout的exchanger,默认会忽略routingkey的值。fanout类型的exchanger,会将所有接收到的消息发给所有绑定到此exchanger上的所有队列。一般情况下,此时我们声明的队列,都会将是临时队列,因为它不需要名称,所以可以直接使用queueDeclare()不需要传递任何的参数即可以创建一个临时队列。注意,如果在没有任何队列绑定到fanout交换器时,生产者就发出消息,则消息将直接丢弃。纯java项目生产者代码 注意,以下虽然将交换器设置为持久的,但如果它在发布数据时,没有任何的队列绑定到此交换器上,则消息依然会被直接丢弃。 packagewj。rabbitmq。fanout; importcom。rabbitmq。client。BuiltinExchangeType; importcom。rabbitmq。client。Channel; importcom。rabbitmq。client。Connection; importcom。rabbitmq。client。MessageProperties; importwj。mq。utils。ConnUtils; publicclassFanoutSender{ publicstaticvoidmain(String〔〕args)throwsException{ ConnectionconnectionConnUtils。newConnection(); Channelchannelconnection。createChannel(); 声明扇出类型的交换器 finalStringexchangeMyFanOut; channel。exchangeDeclare(exchange, BuiltinExchangeType。FANOUT, true); 直接发布信息 for(inti0;i10;i){ StringmsgCi; routingKey对fanout类型的交换器无任何意义,有或没有都相同 只有在发送时所有绑定到此fanout的queue才可以收到信息 另外,即使设置了消息的持久化,此消息也不会被持久 因为没有任何一个routingKey名为 channel。basicPublish(exchange,, MessageProperties。PERSISTENTTEXTPLAIN, msg。getBytes()); } channel。close(); connection。close(); System。err。println(信息发送完成); } } 截图: 消费者代码 packagewj。rabbitmq。fanout; importcom。rabbitmq。client。; importwj。mq。utils。ConnUtils; publicclassFanoutReciver{ publicstaticvoidmain(String〔〕args)throwsException{ ConnectionconnectionConnUtils。newConnection(); Channelchannelconnection。createChannel(); 随机声明一个队列exclusivetrue,autoDeltrue Stringqueuechannel。queueDeclare()。getQueue(); StringexchangeMyFanOut; 绑定关系 channel。queueBind(queue,exchange,); DeliverCallbackmsgCallBack(consumerTag,message){ System。err。println(收到信息:newString(message。getBody())); }; channel。basicConsume(queue,true,msgCallBack,consumerTag{ }); } } 截图: 运行效果 多次启动消费者,然后再启动生产者发送信息,多个消息者都可以接收到信息。SpringBoot项目 由于一般情况下,生产者与消费者都是分开的,所以,这儿为了模拟这个过程,声明了两个不同的配置对象,一个专门用于生产者(SenderConfig),一个专门用于消费者(ReceiverConfig)。 为了查看结果,将其中的一个消费者处理消息的能力设置的比较慢。用于查看是否所有消费者都接收到了相同的信息。开发消费者配置类 此配置文件主要功能是:配置两个队列不同的消费者使用不同的队列消费消息。配置一个交换器fanout类型的。配置两个Binding对象,用于将两个队列全部的绑定给这个fanout的交换器。 packagewj。mq。config。fanout; importjava。util。HashMap; importjava。util。Map; importorg。springframework。amqp。core。AnonymousQueue; importorg。springframework。amqp。core。Binding; importorg。springframework。amqp。core。BindingBuilder; importorg。springframework。amqp。core。FanoutExchange; importorg。springframework。amqp。core。Queue; importorg。springframework。context。annotation。Bean; importorg。springframework。context。annotation。Configuration; 接收者配置类 因为在正式的开发中,要吗有接收者配置,要吗有发布者的配置,所以将这两个配置分开 Configuration publicclassReceiverConfig{ 声明队列,因为是与fanout类型的交换机绑定,所以不需要名称 Bean publicQueuereceiverQueue1(){ MapString,ObjectargsnewHashMap(); args。put(name,Test); QueuequeuenewAnonymousQueue(args); returnqueue; } 声明第二个任意名称的队列 return Bean publicQueuereceiverQueue2(){ MapString,ObjectargsnewHashMap(); args。put(name,Test); QueuequeuenewAnonymousQueue(args); returnqueue; } 声明fanout交换机 Bean publicFanoutExchangereceiverExchange(){ returnnewFanoutExchange(FanoutExchange,true,false); } 声明绑定关系 Bean publicBindingreceiverBinding1(QueuereceiverQueue1,FanoutExchangereceiverExchange){ BindingbBindingBuilder。bind(receiverQueue1)。to(receiverExchange); returnb; } 声明绑定关系 Bean publicBindingreceiverBinding2(QueuereceiverQueue2,FanoutExchangereceiverExchange){ BindingbBindingBuilder。bind(receiverQueue2)。to(receiverExchange); returnb; } } 截图: 消费者 使用RabbitListener注解声明两个消费者 packagewj。mq。rabbitmq。pubsub; importorg。springframework。amqp。core。Message; importorg。springframework。amqp。rabbit。annotation。RabbitListener; importorg。springframework。messaging。handler。annotation。Payload; importorg。springframework。stereotype。Component; importcom。rabbitmq。client。Channel; importcn。hutool。core。thread。ThreadUtil; importlombok。extern。slf4j。Slf4j; Slf4j Component publicclassPubsubReceiver{ RabbitListener(queues{receiverQueue1。name}) publicvoidcusumer1(Stringmsg){ log。info(消费者1,接收到:{},msg); } RabbitListener(queues{receiverQueue2。name}) publicvoidcusumer2(Payload()Messagemessage,Channelchannel)throwsException{ ThreadUtil。sleep(1000); StringstrnewString(message。getBody()); log。info(消费者2,接收到:{},str); 对于扇出类型的消息,不需要确认,因为没有routingkey,且队列也是随机的 Longidmessage。getMessageProperties()。getDeliveryTag(); channel。basicAck(id,false); } } 截图: 声明生产者配置类 生产者发布消息,只需要一个交换器 packagewj。mq。config。fanout; importorg。springframework。amqp。core。Exchange; importorg。springframework。amqp。core。FanoutExchange; importorg。springframework。context。annotation。Bean; importorg。springframework。context。annotation。Configuration; Configuration publicclassSenderConfig{ Bean publicExchangesenderExchange(){ returnnewFanoutExchange(FanoutExchange,true,false); } } 截图: 声明生产者 这儿并没有使用调度功能,通过手动的调用send功能,来发布信息 packagewj。mq。rabbitmq。pubsub; importorg。springframework。amqp。core。Exchange; importorg。springframework。amqp。rabbit。core。RabbitTemplate; importorg。springframework。beans。factory。annotation。Autowired; importorg。springframework。beans。factory。annotation。Qualifier; importorg。springframework。stereotype。Component; Component publicclassPubsubSender{ Autowired privateRabbitTemplaterabbitTemplate; Autowired Qualifier(senderExchange) privateExchangesenderExchange; publicvoidsend(){ for(inti1;i10;i){ Stringmsgi; rabbitTemplate。convertAndSend(senderExchange。getName(),,msg); } } } 截图: 开发ApplicationRunner 此类用于在程序启动完成后,调用send函数,发送数据。 packagewj。mq。config。runner; importorg。springframework。beans。factory。annotation。Autowired; importorg。springframework。boot。ApplicationArguments; importorg。springframework。boot。ApplicationRunner; importorg。springframework。context。annotation。Configuration; importlombok。extern。slf4j。Slf4j; importwj。mq。rabbitmq。pubsub。PubsubSender; Slf4j Configuration publicclassAppRunnerimplementsApplicationRunner{ Autowired privatePubsubSenderpubsubSender; Override publicvoidrun(ApplicationArgumentsargs)throwsException{ log。info(程序启动完成); pubsubSender。send(); } } 截图: 启动程序 启动程序,查看后台输出的数据,虽然消费者2处理的能力比较慢,但也同样接收到了所有的信息,因为在fanout这种交换机器,会将所有消息发送给所有绑定到这个交换器的队列,且会自动忽左忽略routingkey。