0%

rocketmq 客户端相关概念及使用

一、消息中间件的作用

  • 应用解耦

    AB应用不在互相依赖

  • 流量削峰

    流量达到高峰的时候,通常使用限流算法来控制流量涌入系统,避免系统被击瘫,但是这种方式损失了一部分请求

    此时可以使用消息中间件来缓冲大量的请求,匀速消费,当消息队列中堆积消息过多时,我们可以动态上线增加消费端,来保证不丢失重要请求。

  • 大数据处理

    消息中间件可以把各个模块中产生的管理员操作日志、用户行为、系统状态等数据文件作为消息收集到主题中

    数据使用方可以订阅自己感兴趣的数据内容互不影响,进行消费

  • 异构系统

    跨语言

二、消息收发模型

参考阿里云官方文档图片

三、核心概念

  • Topic:消息主题,一级消息类型,生产者向其发送消息。
  • 生产者:也称为消息发布者,负责生产并发送消息至Topic。
  • 消费者:也称为消息订阅者,负责从Topic接收并消费消息。
  • 消息:生产者向Topic发送并最终传送给消费者的数据和(可选)属性的组合。
  • 消息属性:生产者可以为消息定义的属性,包含Message Key和Tag。
  • Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

具体名词解释参考阿里云文档

四、消息使用

消息消费模式

消息消费模式由消费者来决定,可以由消费者设置MessageModel来决定消息模式。

默认情况下为集群模式

集群模式(默认)

1
consumer.setMessageModel(MessageModel.CLUSTERING);

集群消息是指集群化部署消费者

当使用集群消费模式时,MQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。

特点

  • 每条消息只需要被处理一次,broker只会把消息发送给消费集群中的一个消费者
  • 在消息重投时,不能保证路由到同一台机器上
  • 消费状态由broker维护

广播模式

1
consumer.setMessageModel(MessageModel.BROADCASTING);

当使用广播消费模式时,MQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。

特点

  • 消费进度由consumer维护

  • 保证每个消费者消费一次消息

  • 消费失败的消息不会重投

发送方式

同步消息(单条、批量)

​ 同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
DefaultMQProducer producer = new DefaultMQProducer("group01");
//nameServer地址
producer.setNamesrvAddr("192.168.1.18:9876");
producer.start();

//同步单条普通消息
Message message = new Message("topic01","msg_body".getBytes());
SendResult send = producer.send(message);

//同步多条普通消息发送
List<Message> msgList = new ArrayList<>();
msgList.add(new Message("topic01","同步普通多条01".getBytes()));
msgList.add(new Message("topic01","同步普通多条02".getBytes()));
msgList.add(new Message("topic01","同步普通多条03".getBytes()));
SendResult send = producer.send(msgList);

producer.shutdown();

异步消息

​ 异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列RocketMQ版的异步发送,需要您实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
DefaultMQProducer producer = new DefaultMQProducer("group02");
//设置nameServer
producer.setNamesrvAddr("192.168.1.18:9876");
//启动生产者
producer.start();

Message message = new Message("topic02","异步消息".getBytes());
//异步发送消息
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("成功回调:" + sendResult);
}

@Override
public void onException(Throwable throwable) {
System.out.println("失败回调:" + throwable);
}
});

//异步回调不能关闭生产者,因为无法确定回调的消息什么时候执行,会出现异常
//producer.shutdown();

单向消息

​ 发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

1
2
3
4
5
6
7
8
9
10
DefaultMQProducer producer = new DefaultMQProducer("group02");
//设置nameServer
producer.setNamesrvAddr("192.168.1.18:9876");
//启动
producer.start();
Message message = new Message("topic02","单向消息".getBytes());
//发送单向消息
producer.sendOneway(message);

producer.shutdown();

消息过滤

tag

二级消息类型,用来进一步区分某个Topic下的消息分类

生产者

在消息中指定tag

1
Message message = new Message("topic04","tag04","biz_key","多属性消息".getBytes());
消费者

在订阅中指定tag过滤规则

1
2
3
4
// * 代表订阅Topic下的所有消息
//consumer.subscribe("topic04","*");
//consumer.subscribe("topic04","tag04||tag05");
consumer.subscribe("topic04","tag04");

sql表达式

消费者将收到包含TAGA或TAGB或TAGB的消息. 但限制是一条消息只能有一个标签,而这对于复杂的情况可能无效。 在这种情况下,可以使用SQL表达式筛选出消息.

配置

/rocketmq-rocketmq-all-4.8.0/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/conf/broker.conf中添加配置

1
enablePropertyFilter=true

启动broker 加载指定配置文件

1
nohup ../bin/mqbroker -n 127.0.0.1:9876 -c broker.conf  &

rocketmq-console中可以看到

生产者
1
2
3
4
5
Message message = new Message();
message.setBody("消息体123456".getBytes());
message.setTopic("topic05");
//指定额外参数
message.putUserProperty("userId","123");
消费者
1
2
3
//对额外参数进行sql过滤
MessageSelector messageSelector = MessageSelector.bySql("userId >= 2");
consumer.subscribe("topic05",messageSelector);
语法

RocketMQ只定义了一些基本的语法来支持这个功能。 你也可以很容易地扩展它.

  1. 数字比较, 像 >, >=, <, <=, BETWEEN, =;
  2. 字符比较, 像 =, <>, IN;
  3. IS NULL 或者 IS NOT NULL;
  4. 逻辑运算AND, OR, NOT;

常量类型是:

  1. 数字, 像123, 3.1415;
  2. 字符串, 像‘abc’,必须使用单引号;
  3. NULL, 特殊常数;
  4. 布尔常量, TRUEFALSE;

五、消息类型

5.1、普通消息

消息队列RocketMQ版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。以上例子都以普通消息为例。

5.2、定时和延时消息

允许消息生产者对指定消息进行定时(延时)投递

5.3、顺序消息

允许消息消费者按照消息发送的顺序对消息进行消费。

顺序发布和顺序消费是指对于指定的一个Topic,生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被客户端接收到。

保证有序参与因素

  • FIFO
  • 队列内保证有序
  • 消费线程

生产者

默认一个topic下有四个queue,需要保证消息可以顺序搭放入同一个queue中,再依靠FIFO原则进行消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class OrderQueueProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("queue-group");
producer.setNamesrvAddr("192.168.1.18:9876");
producer.start();

for (int i = 0; i < 20; i++) {
Message message = new Message("topic01",("队列1消息" + i).getBytes());
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//默认有4个队列
//可以通过发送消息时,指定的arg进行规则判断,选取同个queue进行发送,常见的有hash取模
return mqs.get(0);
}
},"arg");
}
for (int i = 0; i < 20; i++) {
Message message = new Message("topic01",("队列2消息" + i).getBytes());
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//默认有4个队列
//可以通过发送消息时,指定的arg进行规则判断,选取同个queue进行发送,常见的有hash取模
return mqs.get(0);
}
},"arg");
}
producer.shutdown();
System.out.println("producer关闭");
}
}

消费者

使用顺序消息消费监听器,可以保证同个队列的消息以FIFO方式进行消费;

但是多个queue的消息同时消费时仍无法保证,此时需要指定消费的线程数为1,当消费完一个queue之后再去消费另一个queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-group");
consumer.setNamesrvAddr("192.168.1.18:9876");
//指定订阅规则
consumer.subscribe("topic01","*");

//最大开启消费线程数
consumer.setConsumeThreadMax(1);
//最小线程数
consumer.setConsumeThreadMin(1);

//注册顺序消息消费监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt messageExt : list) {
System.out.println(new String(messageExt.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();
System.out.println("order consumer start");
}
}

6.4、事务消息

事务消息发送步骤如下:

  1. 发送方将半事务消息发送至消息队列RocketMQ版服务端。
  2. 消息队列RocketMQ版服务端将消息持久化成功之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤如下:

  1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

实现类似XA或Open XA的分布事务功能,以达到事务最终一致性状态。

  • Half Message:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中

  • 检查事务状态:Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。

  • 超时:如果超过回查次数,默认回滚消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
TransactionMQProducer producer = new TransactionMQProducer("t-group");
producer.setNamesrvAddr("192.168.1.18:9876");

//设置事务监听器
producer.setTransactionListener(new TransactionListener() {
//处理本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
//暂时为未知状态,等待broker调用checkLocalTransaction回查
//return LocalTransactionState.UNKNOW;
try {
System.out.println("处理本地事务message:" + message);
System.out.println("处理本地事务arg:" + arg);
} catch (Exception e) {
//回滚消息,broker端会删除半消息
e.printStackTrace();
return LocalTransactionState.ROLLBACK_MESSAGE;
}
//执行事务成功,确认提交
return LocalTransactionState.COMMIT_MESSAGE;
}

//检查本地事务
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {

System.out.println("检查本地事务body:" + messageExt.getBody());
System.out.println("检查本地事务msgId:" + messageExt.getMsgId());
System.out.println("检查本地事务tid:" + messageExt.getTransactionId());
//执行事务成功,确认提交
//return LocalTransactionState.COMMIT_MESSAGE;
//回滚消息,broker端会删除半消息
//return LocalTransactionState.ROLLBACK_MESSAGE;
//暂时为未知状态,等待broker调用checkLocalTransaction回查
return LocalTransactionState.UNKNOW;
}
});

producer.start();
Message message = new Message("topic01", "消息体".getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(message, "arg");
System.out.println(result);

六、重试机制

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 发送的默认超时时间
* Timeout for sending messages.
*/
private int sendMsgTimeout = 3000;

// 异步发送时 重试次数,默认 2
producer.setRetryTimesWhenSendAsyncFailed(1);
// 同步发送时 重试次数,默认 2
producer.setRetryTimesWhenSendFailed(1);

// 当消息发送失败时,是否向其他broker发送请求 默认false
producer.setRetryAnotherBrokerWhenNotStoreOK(true);

Consumer

1
2
3
4
5
//消费超时,单位分钟,默认当前线程阻塞15分钟
consumer.setConsumeTimeout()

//发送ack,消费失败,消费监听器返回稍后重试
RECONSUME_LATER

Broker

只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息不重试

重投使用messageDelayLevel

默认值为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h