ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
[TOC] 服务端启动时,配置生成一个队列quene和Exchange,通过routing_key绑定,所以没启动一个服务,本地都会有一个队列和交换机对应到rabbitmq上。 如果rabbitmq没有对应的队列,就会生成,对应到服务上去 ![](https://img.kancloud.cn/a5/a7/a5a70a79496968c329dca2a4a03ff66e_1213x254.png) ![](https://img.kancloud.cn/a7/13/a7138b14292455bc22c3c53dcc49cbd7_1217x189.png) 同时启动两个服务,会均摊消费信息,并且都可以生产消息 # 1. 消息推送流程 不同的交换机通过routing_key,利用不同的策略将消息发送到队列 ![](https://img.kancloud.cn/3f/16/3f16bcf8966b70873c86849b4f7a040f_2011x773.png) * 队列与交换机绑定 1. 生命队列(name)和交换机(name) 2. 通过routingkey进行绑定 * 产生消息 1. 生产者向交换机发送消息(指定交换机名称和routing_key) 2. 信息到达交换机,交换机根据routing_key找到绑定的队列,并把消息发送到队列 * 消费消息 1. 消费者根据队列名称绑定监听队列 2. 当队列有消息时,就可以接收到 ## 1.1 Direct Exchange  1. 要求通过`routing_key`将一个或多个队列绑定到交换机上 2. 消息通过发送到交换机,通过对应的routing_key发送到对应的所有队列上 3. 一个队列可以有多个消费者,但是消息只能被一个消费者消费(轮训) 直连型交换机,根据消息携带的路由键将消息投递给对应队列。 大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。 然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,**交换机就会根据这个路由值X去寻找绑定值也是X的队列。** 实例: #### **1. 配置队列与交换机的绑定** ~~~ @Configuration public class MQConfig { //创建三个队列1,2,3 //Queue的第一个参数为队列名称,第二个参数为是否持久存在 @Bean public Queue directQueue1() { return new Queue("queue1", true); } @Bean public Queue directQueue2() { return new Queue("queue2", false); } @Bean public Queue directQueue3() { return new Queue("queue3", true); } //创建直连交换机,参数为交换机的名称 @Bean public DirectExchange directExchange() { return new DirectExchange("DIRECT_EXCHANGE"); //交换机名称 } //将三个队列都与该直连交换机绑定起来,并赋予上面说的binding_key(也可以说是routing_key) @Bean public Binding bindingDirectExchange1() { return BindingBuilder.bind(directQueue1()).to(directExchange()).with("key.1"); } @Bean public Binding bindingDirectExchange2() { return BindingBuilder.bind(directQueue2()).to(directExchange()).with("key.1"); } @Bean public Binding bindingDirectExchange3() { return BindingBuilder.bind(directQueue3()).to(directExchange()).with("key.2"); } } ~~~ #### 2. 发送者 通过routing_key发送到对应的所有队列 ~~~ @Autowired MQSender mqSender; @GetMapping("/directExchange/{message}") public String directExchange(@PathVariable("message") String message) { mqSender.send(message); return "success"; } ~~~ ~~~ @Service public class MQSender { //注入AmqpTemplate接口,该接口定义了发送和接收消息的基本操作 @Autowired private AmqpTemplate amqpTemplate; public void send(String message) { //第一个参数指将消息发送到该名称的交换机,第二个参数为对应的routing_key,第三个参数为发送的具体消息 amqpTemplate.convertAndSend("DIRECT_EXCHANGE", "key.1", message); } } ~~~ #### 4. 接收者 监听队列`@RabbitListener(queues = "queue1")` ~~~ @Service public class MQReceiver { private static final Logger logger = LoggerFactory.getLogger(MQReceiver.class); //此注解表示监听某个队列,参数为队列名 @RabbitListener(queues = "queue1") public void receive1(String message) { logger.info("queue1 receive : fanout message {}", message); } @RabbitListener(queues = "queue1") public void receive1_1(String message) { logger.info("queue1_1 receive : fanout message {}", message); } @RabbitListener(queues = "queue2") public void receive2(String message) { logger.info("queue2 receive : fanout message {}", message); } @RabbitListener(queues = "queue3") public void receive3(String message) { logger.info("receive : fanout message {}", message); } } ~~~ ![](https://img.kancloud.cn/94/49/9449fff4dc4951290555a4b7f938f69c_716x116.png) quene1绑定了两个消费者,轮询着消费 ![](https://img.kancloud.cn/26/6e/266e370088137d0d16ee391cc450fed0_1527x286.png) ## 1.2 Fanout Exchange 广播消息 ![](https://img.kancloud.cn/ee/c4/eec4629a6e7f1746810912dbe491a1f6_739x460.png) 1. 扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 2. **这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。** 1. config ~~~ @Configuration public class FanoutMQConfig { //创建三个队列1,2,3 //Queue的第一个参数为队列名称,第二个参数为是否持久存在 @Bean public Queue fanoutQueue1() { return new Queue("queue1", true); } @Bean public Queue fanoutQueue2() { return new Queue("queue2", false); } @Bean public Queue fanoutQueue3() { return new Queue("queue3", true); } //创建扇形交换机,参数为交换机的名称 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("FANOUT_EXCHANGE"); } //将三个队列都与该交换机绑定起来,无需binding_key @Bean public Binding bindingFanoutExchange1() { return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()); } @Bean public Binding bindingFanoutExchange2() { return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); } @Bean public Binding bindingFanoutExchange3() { return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange()); } } ~~~ 2. controller ~~~ @GetMapping("/fanoutExchange/{message}") public String fanoutExchange(@PathVariable("message") String message) { fanoutMQSender.send(message); return "success"; } ~~~ 3. sender ~~~ //扇形交换机 public void fanoutSend(String message) { //第一个参数指将消息发送到该名称的交换机,第二个参数为对应的routing_key,第三个参数为发送的具体消息 amqpTemplate.convertAndSend("FANOUT_EXCHANGE", "", message); } ~~~ 4. received不变,还是监听队列 所以说队列不变,只是不同的交换机将信息发送给队列,以下三个队列都接收到了消息。 ``` 2021-05-11 19:14:34.479 INFO 17996 --- [ntContainer#1-1] c.t.s.service.FanoutMQReceiver : queue1_1 receive : fanout message hello-directexchange 2021-05-11 19:14:34.479 INFO 17996 --- [ntContainer#0-1] c.t.s.service.FanoutMQReceiver : receive : fanout message hello-directexchange 2021-05-11 19:14:34.479 INFO 17996 --- [ntContainer#2-1] c.t.s.service.FanoutMQReceiver : queue2 receive : fanout message hello-directexchange ``` ## 1.3 Topic Exchange 直连交换机的`routing_key`方法非常简单,如果希望将一条消息发送给多个队列,那么这个交换机需要绑定非常多的`routing_key`,这样的话消息的管理就会非常的困难。 所以根据一定的规则绑定队列方便很多 简单地介绍下规则: > *  (星号) 用来表示一个单词 (必须出现的) > #  (井号) 用来表示任意数量(零个或多个)单词 > 通配的绑定键是跟队列进行绑定的,举个小例子 队列Q1 绑定键为 `*.TT.*`          队列Q2绑定键为  TT.# 如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到; 如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到; 主题交换机是非常强大的,为啥这么膨胀? 当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。 所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。 另外还有 Header Exchange 头交换机 ,Default Exchange 默认交换机,Dead Letter Exchange 死信交换机,这几个该篇暂不做讲述 1. config ~~~ @Configuration public class TopicMQConfig { @Bean public Queue topicQueue1() { return new Queue("TOPIC_QUEUE1", true); } @Bean public Queue topicQueue2() { return new Queue("TOPIC_QUEUE2", true); } @Bean public TopicExchange topicExchange() { return new TopicExchange("TOPIC_EXCHANGE"); } //将topicQueue1与topicExchange交换机绑定 @Bean public Binding bindQueue1() { return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1"); } //将topicQueue2与topicExchange交换机绑定,可以接收topic. 开头的routing_key @Bean public Binding bindQueue2() { return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#"); } } ~~~ 队列2可以接收`topic.`开头的routing_key消息 2. send ~~~ public void send(String message) { //第一个参数指将消息发送到该名称的交换机,第二个参数为对应的routing_key,第三个参数为发送的具体消息 amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "topic.key1", message); amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "topic.key2", message); } ~~~ 3. receive ~~~ @Service public class TopicMQReceiver { private static final Logger logger = LoggerFactory.getLogger(TopicMQReceiver.class); @RabbitListener(queues = "TOPIC_QUEUE1") public void receiveQueue1(String message) { logger.info("receive : TOPIC_QUEUE1 {}", message); } @RabbitListener(queues ="TOPIC_QUEUE2") public void receiveQueue2(String message) { logger.info("receive : TOPIC_QUEUE2 {}", message); } } ~~~ ![](https://img.kancloud.cn/04/5f/045fc5d4032bf8bf48887e1f5c0c3363_1297x265.png) ## 1.4 Headers exchange * 首部交换机: 通过构建消息时,加入“header”来区分队列 1. config ~~~ @Configuration public class HeaderMQConfig { @Bean public Queue headersQueue() { return new Queue("HEADERS_QUEUE"); } @Bean public HeadersExchange headersExchange() { return new HeadersExchange("HEADERS_EXCHANGE"); } //将headersQueue与HeadersExchange交换机绑定 @Bean public Binding bingHeadersQueue() { //map为绑定的规则 Map<String, Object> map = new HashMap<>(); map.put("headers1", "value1"); map.put("headers2", "value2"); //whereAll表示需要满足所有条件 return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(map).match(); } } ~~~ 2. send ~~~ public void send(String message) { MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("headers1", "value1"); messageProperties.setHeader("headers2", "value2"); //要发送的消息,第一个参数为具体的消息字节数组,第二个参数为消息规则 Message msg = new Message(message.getBytes(), messageProperties); //加入类似header的信息 amqpTemplate.convertAndSend("HEADERS_EXCHANGE", "", msg); } ~~~ 3. receive ~~~ @RabbitListener(queues = "HEADERS_QUEUE") public void receiveHeadersQueue(byte[] message) { logger.info("receive : HeadersQueue {}", new String(message)); } ~~~ http://localhost:8080/mq/headerExchange/hello-directexchange ![](https://img.kancloud.cn/33/04/33048668684ca2703a758b7aa7f87f18_1343x191.png) # 2. 队列属性 ## 2.1 quene name > 1. 队列名称,队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。 > 2. 如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么将会抛出一个406通道级异常。 > ## 2.2 durable Boolean 默认false 队列的声明默认是存放到内存中的,称为暂存队列,消息代理重启会丢失。如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库。但是队列持久化并不意味着消息持久化当消息代理重启后消息依旧会丢失。 exclusive :是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的。 autoDelete :当最后一个消费者断开连接之后队列是否自动被删除。