一、消息中间件的作用
应用解耦
AB应用不在互相依赖
流量削峰
流量达到高峰的时候,通常使用限流算法来控制流量涌入系统,避免系统被击瘫,但是这种方式损失了一部分请求
此时可以使用消息中间件来缓冲大量的请求,匀速消费,当消息队列中堆积消息过多时,我们可以动态上线增加消费端,来保证不丢失重要请求。
大数据处理
消息中间件可以把各个模块中产生的管理员操作日志、用户行为、系统状态等数据文件作为消息收集到主题中
数据使用方可以订阅自己感兴趣的数据内容互不影响,进行消费
异构系统
跨语言
二、消息收发模型
三、核心概念
- Topic:消息主题,一级消息类型,生产者向其发送消息。
- 生产者:也称为消息发布者,负责生产并发送消息至Topic。
- 消费者:也称为消息订阅者,负责从Topic接收并消费消息。
- 消息:生产者向Topic发送并最终传送给消费者的数据和(可选)属性的组合。
- 消息属性:生产者可以为消息定义的属性,包含Message Key和Tag。
- Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。
具体名词解释参考阿里云文档
四、消息使用
消息消费模式
消息消费模式由消费者来决定,可以由消费者设置MessageModel来决定消息模式。
默认情况下为集群模式
集群模式(默认)
1
| consumer.setMessageModel(MessageModel.CLUSTERING);
|
集群消息是指集群化部署消费者
当使用集群消费模式时,MQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。
特点
- 每条消息只需要被处理一次,broker只会把消息发送给消费集群中的一个消费者
- 在消息重投时,不能保证路由到同一台机器上
- 消费状态由broker维护
广播模式
1
| consumer.setMessageModel(MessageModel.BROADCASTING);
|
当使用广播消费模式时,MQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。
特点
消费进度由consumer维护
保证每个消费者消费一次消息
消费失败的消息不会重投
发送方式
同步消息(单条、批量)
同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| DefaultMQProducer producer = new DefaultMQProducer("group01");
producer.setNamesrvAddr("192.168.1.18:9876"); producer.start();
Message message = new Message("topic01","msg_body".getBytes()); SendResult send = producer.send(message);
List<Message> msgList = new ArrayList<>(); msgList.add(new Message("topic01","同步普通多条01".getBytes())); msgList.add(new Message("topic01","同步普通多条02".getBytes())); msgList.add(new Message("topic01","同步普通多条03".getBytes())); SendResult send = producer.send(msgList);
producer.shutdown();
|
异步消息
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列RocketMQ版的异步发送,需要您实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| DefaultMQProducer producer = new DefaultMQProducer("group02");
producer.setNamesrvAddr("192.168.1.18:9876");
producer.start();
Message message = new Message("topic02","异步消息".getBytes());
producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("成功回调:" + sendResult); }
@Override public void onException(Throwable throwable) { System.out.println("失败回调:" + throwable); } });
|
单向消息
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
1 2 3 4 5 6 7 8 9 10
| DefaultMQProducer producer = new DefaultMQProducer("group02");
producer.setNamesrvAddr("192.168.1.18:9876");
producer.start(); Message message = new Message("topic02","单向消息".getBytes());
producer.sendOneway(message);
producer.shutdown();
|
消息过滤
tag
二级消息类型,用来进一步区分某个Topic下的消息分类
生产者
在消息中指定tag
1
| Message message = new Message("topic04","tag04","biz_key","多属性消息".getBytes());
|
消费者
在订阅中指定tag过滤规则
1 2 3 4
|
consumer.subscribe("topic04","tag04");
|
sql表达式
消费者将收到包含TAGA或TAGB或TAGB的消息. 但限制是一条消息只能有一个标签,而这对于复杂的情况可能无效。 在这种情况下,可以使用SQL表达式筛选出消息.
配置
在/rocketmq-rocketmq-all-4.8.0/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/conf/broker.conf
中添加配置
1
| enablePropertyFilter=true
|
启动broker 加载指定配置文件
1
| nohup ../bin/mqbroker -n 127.0.0.1:9876 -c broker.conf &
|
rocketmq-console中可以看到
生产者
1 2 3 4 5
| Message message = new Message(); message.setBody("消息体123456".getBytes()); message.setTopic("topic05");
message.putUserProperty("userId","123");
|
消费者
1 2 3
| MessageSelector messageSelector = MessageSelector.bySql("userId >= 2"); consumer.subscribe("topic05",messageSelector);
|
语法
RocketMQ只定义了一些基本的语法来支持这个功能。 你也可以很容易地扩展它.
- 数字比较, 像
>
, >=
, <
, <=
, BETWEEN
, =
;
- 字符比较, 像
=
, <>
, IN
;
IS NULL
或者 IS NOT NULL
;
- 逻辑运算
AND
, OR
, NOT
;
常量类型是:
- 数字, 像123, 3.1415;
- 字符串, 像‘abc’,必须使用单引号;
NULL
, 特殊常数;
- 布尔常量,
TRUE
或FALSE
;
五、消息类型
5.1、普通消息
消息队列RocketMQ版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。以上例子都以普通消息为例。
5.2、定时和延时消息
允许消息生产者对指定消息进行定时(延时)投递
5.3、顺序消息
允许消息消费者按照消息发送的顺序对消息进行消费。
顺序发布和顺序消费是指对于指定的一个Topic,生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被客户端接收到。
保证有序参与因素
生产者
默认一个topic下有四个queue,需要保证消息可以顺序搭放入同一个queue中,再依靠FIFO原则进行消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class OrderQueueProducer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("queue-group"); producer.setNamesrvAddr("192.168.1.18:9876"); producer.start();
for (int i = 0; i < 20; i++) { Message message = new Message("topic01",("队列1消息" + i).getBytes()); producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return mqs.get(0); } },"arg"); } for (int i = 0; i < 20; i++) { Message message = new Message("topic01",("队列2消息" + i).getBytes()); producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return mqs.get(0); } },"arg"); } producer.shutdown(); System.out.println("producer关闭"); } }
|
消费者
使用顺序消息消费监听器,可以保证同个队列的消息以FIFO方式进行消费;
但是多个queue的消息同时消费时仍无法保证,此时需要指定消费的线程数为1,当消费完一个queue之后再去消费另一个queue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class OrderConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-group"); consumer.setNamesrvAddr("192.168.1.18:9876"); consumer.subscribe("topic01","*");
consumer.setConsumeThreadMax(1); consumer.setConsumeThreadMin(1);
consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { for (MessageExt messageExt : list) { System.out.println(new String(messageExt.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } });
consumer.start(); System.out.println("order consumer start"); } }
|
6.4、事务消息
事务消息发送步骤如下:
- 发送方将半事务消息发送至消息队列RocketMQ版服务端。
- 消息队列RocketMQ版服务端将消息持久化成功之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查步骤如下:
- 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。
实现类似XA或Open XA的分布事务功能,以达到事务最终一致性状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| TransactionMQProducer producer = new TransactionMQProducer("t-group"); producer.setNamesrvAddr("192.168.1.18:9876");
producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object arg) { try { System.out.println("处理本地事务message:" + message); System.out.println("处理本地事务arg:" + arg); } catch (Exception e) { e.printStackTrace(); return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; }
@Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("检查本地事务body:" + messageExt.getBody()); System.out.println("检查本地事务msgId:" + messageExt.getMsgId()); System.out.println("检查本地事务tid:" + messageExt.getTransactionId()); return LocalTransactionState.UNKNOW; } });
producer.start(); Message message = new Message("topic01", "消息体".getBytes()); TransactionSendResult result = producer.sendMessageInTransaction(message, "arg"); System.out.println(result);
|
六、重试机制
Producer
1 2 3 4 5 6 7 8 9 10 11 12 13
|
private int sendMsgTimeout = 3000;
producer.setRetryTimesWhenSendAsyncFailed(1);
producer.setRetryTimesWhenSendFailed(1);
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
|
Consumer
1 2 3 4 5
| consumer.setConsumeTimeout()
RECONSUME_LATER
|
Broker
只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息不重试
重投使用messageDelayLevel
默认值为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h