rabbitMQ实现消息缓冲

RabbitMQ主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。RabbitMQ使用的是AMQP协议,它是一种二进制协议。默认启动端口5672`

rabbitMQ的message model实际上消息不直接发送到queue中,中间有一个exchange是做消息分发,生产者producer甚至不知道消息发送到那个队列中去。因此,当exchange收到message时,必须准确知道该如何分发。

  • Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 根据key全文匹配去寻找队列

  • Topic:按规则转发消息(最灵活)转发消息主要是根据通配符

  • Headers:设置 header attribute 参数类型的交换机

  • Fanout:转发消息到所有绑定队列(广播方式)

    rabbitMQ原理

下面介绍4种交换机与springboot的集成

添加依赖包amqp,统一配置application.properties

1
2
3
4
5
6
<!--rabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
##rabbitMQ
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#消费者数量
spring.rabbitmq.listener.simple.concurrency= 10
spring.rabbitmq.listener.simple.max-concurrency= 10
#消费者每次从队列获取的消息数量
spring.rabbitmq.listener.simple.prefetch= 1
#消费者自动启动
spring.rabbitmq.listener.simple.auto-startup=true
#消费失败,自动重新入队
spring.rabbitmq.listener.simple.default-requeue-rejected= true
#启用发送重试
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000
spring.rabbitmq.template.retry.max-attempts=3

1. Direct交换机制

一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的。 这样当一个交换机绑定多个队列,就会被送到对应的队列去处理。

direct原理

1.1 编写config
1
2
3
4
5
6
7
8
9
10

@Configuration
public class MQconfig {
public static final String QUEUE ="queue";

@Bean
public Queue queue(){
return new Queue(QUEUE,true);
}
}
1.2 创建消息发送者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Service
public class MQsender {
@Autowired
AmqpTemplate amqpTemplate;
@Autowired
RedisService redisService;
private static Logger log = LoggerFactory.getLogger(MQreceiver.class);

public void send(Object message){
String msg = redisService.beanToString(message);
log.info("send message"+msg);
amqpTemplate.convertAndSend(MQconfig.Queue,message);
}
}
1.3 创建消息接收者
1
2
3
4
5
6
7
8
9
10
@Service
public class MQreceiver {
private static Logger log = LoggerFactory.getLogger(MQreceiver.class);
//监听了queue的队列

@RabbitListener(queues = MQconfig.Queue)
public void receive(String message){
log.info("receive message"+message);
}
}
1.4 编写controller测试
1
2
3
4
5
6
7
8
9
10
11
12
@Controller

public class HelloController { @Autowired
MQsender sender;
@RequestMapping("/mq")
@ResponseBody
public Result<Boolean> mq(){
sender.send("hello");

return Result.success(true);
}
}

2. Fanout交换机制

  • 扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。

    Fanout原理

2.1 编写config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class MQconfig {
public static final String FANOUT_EXCHANGE ="fanoutxchage";

@Bean

public FanoutExchange fanoutExchange(){

return new FanoutExchange(FANOUT_EXCHANGE);

}

@Bean

public Binding fanoutBinding(){

return
BindingBuilder.bind(topticQueue2()).to(fanoutExchange());

}
2.2 创建消息发送者
1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class MQsender {
@Autowired
AmqpTemplate amqpTemplate;
@Autowired
RedisService redisService;
public void sendFanout(Object message){

String msg = redisService.beanToString(message);
log.info("send message"+msg);
amqpTemplate.convertAndSend(MQconfig.FANOUT_EXCHANGE,"",msg);
}
}
2.3 创建消息接收者
1
2
3
4
@Service
public class MQreceiver {
//fanout 广播模式都接受故不需要listener
}
2.4 编写controller测试
1
2
3
4
5
6
7
8
9
@Controller
public class HelloController {
@RequestMapping("/mq/fanout")
@ResponseBody
public Result<String> fanout(){
sender.sendFanout("hellommxx");
return Result.success("hello xuanzi");
}
}

3. topic交换机制

通配符交换机,exchange会把消息发送到一个或者多个满足通配符规则的routing-key的queue。其中表号匹配一个word,#匹配多个word和路径,路径之间通过.隔开。如满足a..c的routing-key有a.hello.c;满足#.hello的routing-key有a.b.c.helo。

topic原理

1.1 编写config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Configuration
public class MQconfig {
public static final String TOPIC_QUEUE1 ="topic.queue1";

public static final String TOPIC_QUEUE2 ="topic.queue2";
public static final String TOPIC_EXCHANGE ="topicExchange"

@Bean
public Queue topticQueue1(){
return new Queue(TOPIC_QUEUE1,true);
}
@Bean
public Queue topicQueue2(){
return new Queue(TOPIC_QUEUE2,true);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding topicBinding1(){
return BindingBuilder.bind(topticQueue1()).to(topicExchange()).with("topic.key1");
}
@Bean
public Binding topicBinding2(){
return BindingBuilder.bind(topticQueue2()).to(topicExchange()).with("topic.#");
}

}
1.2 创建消息发送者
1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class MQsender {
public void sendTopic(Object message){
String msg = redisService.beanToString(message);
log.info("send message"+msg);
amqpTemplate.convertAndSend(MQconfig.TOPIC_EXCHANGE,
"topic.key1",msg+"1");

amqpTemplate.convertAndSend(MQconfig.TOPIC_EXCHANGE,
"topic.key2",msg+"2");

}
}
1.3 创建消息接收者
1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class MQreceiver {
@RabbitListener(queues = MQconfig.TOPIC_QUEUE1)

public void receiveTopic1(String message){
log.info("receive topic queue1 message"+message);
}

@RabbitListener(queues = MQconfig.TOPIC_QUEUE2)
public void receiveTopic2(String message){
log.info("receive topic queue2 message"+message);
}
}
1.4 编写controller测试
1
2
3
4
5
6
7
8
@Controller
public class HelloController {
@RequestMapping("/mq/topic")
@ResponseBody
public Result<String> topic(){
sender.sendTopic("hellommxx");
return Result.success("hello xuanzi");
}

4. header交换机制

header exchange(头交换机)和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的header数据。
主题交换机路由键只有是字符串,而头交换机可以是整型和哈希值

1.1 编写config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Configuration
public class MQconfig {
public static final String HEADER_QUEUE ="header.queue";

@Bean

public HeadersExchange headersExchange(){

return new HeadersExchange(HEADERS_EXCHANGE);

}

@Bean

public Queue headerQueue(){

return new Queue(HEADER_QUEUE,true);

}

@Bean

public Binding headerBinding(){

Map<String,Object> map =new HashMap<String,Object>();

map.put("header1","value1");

map.put("header2","value2");

return
BindingBuilder.bind(headerQueue()).to(headersExchange())
.whereAll(map).match();

}
}
1.2 创建消息发送者
1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class MQsender {
public void sendheader(Object message){

String msg = redisService.beanToString(message);
log.info("send message"+msg);
MessageProperties properties = new MessageProperties();
properties.setHeader("header1","value1");
properties.setHeader("header2","value2");
Message obj = new Message(msg.getBytes(),properties);
amqpTemplate.convertAndSend(MQconfig.HEADERS_EXCHANGE,"",obj);
}
}
1.3 创建消息接收者
1
2
3
4
5
6
7
8
9
@Service
public class MQreceiver {
@RabbitListener(queues = MQconfig.HEADER_QUEUE)
public void receiveHeader(byte[] message){

log.info("receive header queue message"+message);

}
}
1.4 编写controller测试
1
2
3
4
5
6
7
8
9
10
11
12
13
@Controller
public class HelloController {
@RequestMapping("/mq/header")
@ResponseBody

public Result<String> header(){

sender.sendheader("hellommxx");

return Result.success("hello xuanzi");

}
}
文章目录
  1. 1. 下面介绍4种交换机与springboot的集成
    1. 1.1. 1. Direct交换机制
      1. 1.1.1. 1.1 编写config
      2. 1.1.2. 1.2 创建消息发送者
      3. 1.1.3. 1.3 创建消息接收者
      4. 1.1.4. 1.4 编写controller测试
    2. 1.2. 2. Fanout交换机制
      1. 1.2.1. 2.1 编写config
      2. 1.2.2. 2.2 创建消息发送者
      3. 1.2.3. 2.3 创建消息接收者
      4. 1.2.4. 2.4 编写controller测试
    3. 1.3. 3. topic交换机制
      1. 1.3.1. 1.1 编写config
      2. 1.3.2. 1.2 创建消息发送者
      3. 1.3.3. 1.3 创建消息接收者
      4. 1.3.4. 1.4 编写controller测试
    4. 1.4. 4. header交换机制
      1. 1.4.1. 1.1 编写config
      2. 1.4.2. 1.2 创建消息发送者
      3. 1.4.3. 1.3 创建消息接收者
      4. 1.4.4. 1.4 编写controller测试
| 139.6k