0%

rabbitmq系列-amqp

一、概念

​ AMQP 是 Advanced Message Queuing Protocol 的简称,它是一个面向消息中间件的开放式标准应用层协议。

二、spring boot的应用

1.消息生产者

1)导入maven依赖:
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2)配置application.yml
1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
virtual-host: yrl_test
username: test
password: test123
host: 127.0.0.1
port: 5672
publisher-confirms: true # 后续消息确认使用
publisher-returns: true # 后续消息确认使用
3)controller发送消息

​ 实际使用,应抽取rabbitTemplate注入序列化器,不应该每个方法设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@RestController
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;


/**
* 指定exchange、routing key发送消息
*/
@PostMapping("sendUser")
public void sendMsgUser(@RequestBody Map<String,String> param){
User user = new User();
user.setName(param.get("msg"));
user.setBirthday(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
//设置消息序列化
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend(param.get("exchange"),param.get("key"),user);
}

/**
* 不指定exchange,根据routing key发送消息
*/
@PostMapping("sendDefault")
public void sendDefault(@RequestBody Map<String,String> param){
Map<String,Object> msg = new HashMap<>();
msg.put("message",param.get("msg"));
msg.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
//设置消息序列化
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend(param.get("key"),msg);
}

/**
* 指定exchange,不指定routing key
*/
@PostMapping("sendHeader")
public void sendHeader(@RequestBody Map<String,String> param){
Map<String,Object> msg = new HashMap<>();
msg.put("message",param.get("msg"));
msg.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
//设置消息序列化
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend(param.get("exchange"),"",msg,message -> {
MessageProperties properties = message.getMessageProperties();
properties.setHeader("key-one", "1");
properties.setHeader("key-two", "2");
return message;
});
}

/**
* 发送延迟消息
*/
@PostMapping("sendDelay")
public void sendDelay(@RequestBody Map<String,String> param){
Map<String,Object> msg = new HashMap<>();
msg.put("message",param.get("msg"));
msg.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
//设置消息序列化
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend(param.get("exchange"),param.get("key"),msg,message -> {
MessageProperties properties = message.getMessageProperties();
properties.setHeader("x-delay", 5000);
return message;
});
}
}

2. 消息消费者

1) 配置
a. 第一种方式,通过实体配置,进行消费者与queue、exchange之间的绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* 创建队列
* durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
* exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
* autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
* return new Queue("TestDirectQueue",true,true,false);
*
* 一般设置一下队列的持久化就好,其余两个就是默认false
*/
@Bean
public Queue testDirectQueue(){
return new Queue("testDirectQueue",true);
}

/**
* 创建直连交换器
*/
@Bean
public DirectExchange testDirectExchange(){
return new DirectExchange("testDirectExchange",true,false);
}

/**
* 将exchange和queue进行绑定,并设置匹配的key
*/
@Bean
public Binding bindingDirect(){
return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with("testDirectRouting");
}

/**
* 创建监听器容器工厂,设置连接工厂、序列化方式(解决实体序列化问题)
*/
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//开启手动Ack(后续使用配置消息确认使用)
//factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}

/**
* 声明一个交换机
*/
@Bean
CustomExchange delayExchange() {

Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delayExchangeCus", "x-delayed-message", true, false, args);
}

/**
* 声明一个延迟队列
*/
@Bean
Queue delayQueue() {
return QueueBuilder.durable("delayCusQueue").build();
}

/**
* 绑定
*/
@Bean
Binding queueBinding(Queue delayQueue, CustomExchange delayExchange){
return BindingBuilder.bind(delayQueue).to(delayExchange).with("cusDelayKey").noargs();

}
b. 第二种方式,通过注解配置,进行消费者与queue、exchange之间的绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164

/**
* 默认交换器(名称为空字符串的直连交换机),一个queue若不指定binding的交换机,就被绑定到默认交换机上,routingKey为queue的名称
* @param param 消息内容,当只有一个参数的时候可以不加@Payload注解
*/
@RabbitListener(queuesToDeclare = @Queue("myDefaultQueue"))
@RabbitHandler
public void defaultDirectReceiver(Map param){
System.out.println("默认交换器,消费者接收到消息:" + param);
}

/**
* 直连交换器1
* @param param 消息内容,当只有一个参数的时候可以不加@Payload注解
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("testDirectQueue1"),
exchange = @Exchange(value = "myExchange",type = ExchangeTypes.DIRECT),
key = "myDirectRouting.1")
)

@RabbitHandler
public void direct1Receiver(Map param){
System.out.println("直连交换器1,消费者接收到消息:" + param);
}

/**
* 直连交换器2
* @param param 消息内容,当只有一个参数的时候可以不加@Payload注解
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("testDirectQueue2"),
exchange = @Exchange(value = "myExchange",type = ExchangeTypes.DIRECT),
key = "myDirectRouting.2")
)

@RabbitHandler
public void direct2Receiver(Map param){
System.out.println("直连交换器2,消费者接收到消息:" + param);
}

/**
* 直连交换器3
* @param param 消息内容,当只有一个参数的时候可以不加@Payload注解
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("testDirectQueue3"),
exchange = @Exchange(value = "myExchange",type = ExchangeTypes.DIRECT),
key = "myDirectRouting.3")
)

@RabbitHandler
public void direct3Receiver(User param){
System.out.println("直连交换器3,消费者接收到消息:" + param);
}


/**
* 主题交换器1
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topicQueue1"),
exchange = @Exchange(value = "topicExchange",type = ExchangeTypes.TOPIC),
key = "topicRouting.1")
)
@RabbitHandler
public void topic1Receiver(Map param){
System.out.println("主题交换器1,消费者接收到消息:" + param);
}

/**
* 主题交换器2
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topicQueue2"),
exchange = @Exchange(value = "topicExchange",type = ExchangeTypes.TOPIC),
key = "topicRouting.*")
)
@RabbitHandler
public void topic2Receiver(Map param){
System.out.println("主题交换器2,消费者接收到消息:" + param);
}


/**
* 扇形交换器1
* key不起作用
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("fanoutQueue1"),
exchange = @Exchange(value = "fanoutExchange",type = ExchangeTypes.FANOUT),
key = "fanoutRouting")
)
@RabbitHandler
public void fanout1Receiver(Map param){
System.out.println("扇形交换器1,消费者接收到消息:" + param);
}

/**
* 扇形交换器2
* key不起作用
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("fanoutQueue2"),
exchange = @Exchange(value = "fanoutExchange",type = ExchangeTypes.FANOUT),
key = "fanoutRouting")
)
@RabbitHandler
public void fanout2Receiver(Map param){
System.out.println("扇形交换器2,消费者接收到消息:" + param);
}

/**
* headers交换器,任意匹配
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("headerQueue1"),
exchange = @Exchange(value = "headerExchange",type = ExchangeTypes.HEADERS),
arguments = {@Argument(name = "x-match",value = "any"),
@Argument(name = "key-one",value = "1"),
@Argument(name = "key-three",value = "3")
})
)
@RabbitHandler
public void anyMatchReceiver(@Payload Map param, @Headers Map headers){
System.out.println("header交换器,任意匹配消费者接收到消息:" + param + ",头部信息:" + headers);
}
/**
* headers交换器,全匹配
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("headerQueue2"),
exchange = @Exchange(value = "headerExchange",type = ExchangeTypes.HEADERS),
arguments = {@Argument(name = "x-match",value = "all"),
@Argument(name = "key-one",value = "1"),
@Argument(name = "key-two",value = "2")
})
)
@RabbitHandler
public void allMatchReceiver(@Payload Map param, @Headers Map headers){
System.out.println("header交换器,全匹配消费者接收到消息:" + param + ",头部信息:" + headers);
}



@RabbitHandler
@RabbitListener(queues = "delayCusQueue")
public void delayReceiver(@Payload Map param, @Headers Map headers){
System.out.println("延迟消息交换器,"+ LocalDateTime.now()+"接收到消息:" + param + ",头部信息:" + headers);
}

@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = "delayExchangeCus2",delayed = "true" ,type = ExchangeTypes.DIRECT,arguments = @Argument(name = "x-delayed-type",value="direct")),
value = @Queue(value = "delayCusQueue2",durable = "true"),
key = "cusDelayKey2"
)
)
@RabbitHandler
public void delay2Receiver(@Payload Map param, Channel channel, @Headers Map headers){
Long amqpDeliveryTag = (Long) headers.get("amqp_deliveryTag");
System.out.println("延迟消息交换器2,"+ LocalDateTime.now()+"接收到消息:" + param + ",头部信息:" + headers);

}
2) 开启手动应答模式
a. 添加application.yml配置
1
2
3
4
spring:
rabbitmq:
publisher-confirms: true
publisher-returns: true
b. 连接工厂设置参数
1
2
//开启手动Ack
new SimpleRabbitListenerContainerFactory().setAcknowledgeMode(AcknowledgeMode.MANUAL);
c. 监听器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = "delayExchangeCus2",delayed = "true" ,type = ExchangeTypes.DIRECT,arguments = @Argument(name = "x-delayed-type",value="direct")),
value = @Queue(value = "delayCusQueue2",durable = "true"),
key = "cusDelayKey2"
)
)
@RabbitHandler
public void delay2Receiver(@Payload Map param, Channel channel, @Headers Map headers){
Long amqpDeliveryTag = (Long) headers.get("amqp_deliveryTag");
System.out.println("延迟消息交换器2,"+ LocalDateTime.now()+"接收到消息:" + param + ",头部信息:" + headers);
try {
String message = (String) param.get("message");
if("延迟消息1".equals(message)){
channel.basicReject(amqpDeliveryTag,false);
System.out.println("延迟消息1拒绝");
}else if("延迟消息2".equals(message)){
channel.basicReject(amqpDeliveryTag,true);
System.out.println("延迟消息2拒绝,重新放入队列");
}else {
channel.basicAck(amqpDeliveryTag,false);
System.out.println("正常提交");
}
} catch (IOException e) {
e.printStackTrace();
}
}