一、依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 <dependencies > <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > <version > 3.2.3</version > </dependency > <dependency > <groupId > log4j</groupId > <artifactId > log4j</artifactId > <version > 1.2.17</version > </dependency > <dependency > <groupId > org.slf4j</groupId > <artifactId > slf4j-api</artifactId > <version > 1.7.25</version > </dependency > <dependency > <groupId > org.slf4j</groupId > <artifactId > slf4j-log4j12</artifactId > <version > 1.7.25</version > </dependency > </dependencies >
二、日志 log4j.properties放入resources中
1 2 3 4 5 6 log4j.rootLogger = info,console log4j.appender.console = org.apache.log4j.ConsoleAppender log4j.appender.console.Target = System.out log4j.appender.console.layout = org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern = %p %d{yyyy-MM-dd HH:mm:ss} %c - %m%n
三、管理Topic 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092" ); KafkaAdminClient adminClient = (KafkaAdminClient) KafkaAdminClient.create(properties); KafkaFuture<Set<String>> names = adminClient.listTopics().names(); for (String name : names.get()) { System.out.println("topic:" + name); } adminClient.createTopics(Collections.singletonList(new NewTopic("topic02" , 3 , (short ) 2 ))); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList("topic01" )); KafkaFuture<Map<String, TopicDescription>> mapKafkaFuture = describeTopicsResult.allTopicNames(); for (Map.Entry<String, TopicDescription> descriptionEntry : mapKafkaFuture.get().entrySet()) { System.out.println("DescribeTopicsResult,key:" + descriptionEntry.getKey() + ",value:" + descriptionEntry.getValue()); } adminClient.close();
四、生产者 4.1、发送消息 topic可以不提前创建,发送时会创建,默认分区为1,副本因子为1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class .getName ()) ; properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class .getName ()) ; KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0 ; i < 10 ; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("topic01" , "key-" + i, "value-" + i); producer.send(record); } producer.close();
4.2、指定分区规则 1 2 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class .getName ()) ;
可以实现org.apache.kafka.clients.producer.Partitioner
接口,复写partition方法,实现自定义分区规则;也可以在发送记录时手动指定分区
DefaultPartitioner(默认)
如果ProducerRecord指定了分区则使用它
没有指定分区但是存在key,则使用key的哈希值%分区数得到分区
没有指定分区、没有key,获取可用分区
可用分区数 小于1,获取随机值 & 分区数
可用分区数 等于 1,获取第一个分区数
可用分区数大于1,获取随机值 % 分区数作为索引值获取分区数
五、消费者 5.1、消费消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092" ); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group01" ); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Pattern.compile("^topic01$" )); while (true ) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1 )); Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); while (iterator.hasNext()) { ConsumerRecord<String, String> record = iterator.next(); int partition = record.partition(); long offset = record.offset(); String topic = record.topic(); String key = record.key(); String value = record.value(); System.out.println("partition:" + partition + ",offset:" + offset + ",topic:" + topic + ",key:" + key + ",value:" + value); } }
单个消费者消费
partition:2,offset:3,topic:topic01,key:key-2,value:value-2 partition:2,offset:4,topic:topic01,key:key-3,value:value-3 partition:2,offset:5,topic:topic01,key:key-5,value:value-5 partition:2,offset:6,topic:topic01,key:key-6,value:value-6 partition:1,offset:1,topic:topic01,key:key-0,value:value-0 partition:1,offset:2,topic:topic01,key:key-7,value:value-7 partition:1,offset:3,topic:topic01,key:key-8,value:value-8 partition:0,offset:1,topic:topic01,key:key-1,value:value-1 partition:0,offset:2,topic:topic01,key:key-4,value:value-4 partition:0,offset:3,topic:topic01,key:key-9,value:value-9
分区内消息有序,不同分区消息无序
5.2、分配分区 当前topic01有3个分区,3个副本
当第一个消费者上线,添加新分配的分区:topic01-0、 topic01-1、 topic01-2
1 2 Notifying assignor about the new Assignment(partitions=[topic01-0, topic01-1, topic01-2]) Adding newly assigned partitions: topic01-0, topic01-1, topic01-2
当第二个消费者上线,添加新分配的分区:topic01-2
1 2 Notifying assignor about the new Assignment(partitions=[topic01-2]) Adding newly assigned partitions: topic01-2
之前第一个消费者,重新分配分区:topic01-0, topic01-1
1 2 3 4 (Re-)joining group ... Notifying assignor about the new Assignment(partitions=[topic01-0, topic01-1]) Adding newly assigned partitions: topic01-0, topic01-1
当第三个消费者上线,添加新分配的分区:topic01-0
1 2 3 4 (Re-)joining group ... Notifying assignor about the new Assignment(partitions=[topic01-0]) Adding newly assigned partitions: topic01-0
之前第一个消费者,重新分配分区:topic01-1
1 2 3 4 (Re-)joining group ... Notifying assignor about the new Assignment(partitions=[topic01-1]) Adding newly assigned partitions: topic01-1
之前第二个消费者,重新分配分区:topic01-2
1 2 3 4 (Re-)joining group ... Notifying assignor about the new Assignment(partitions=[topic01-2]) Adding newly assigned partitions: topic01-2
六、序列化 6.1、生产者 1 2 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class .getName ()) ; props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class .getName ()) ;
实现org.apache.kafka.common.serialization.Serializer
接口,重写serialize方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class ObjectSerializer implements Serializer <Object > { @Override public void configure (Map<String, ?> configs, boolean isKey) { System.out.println("configure" ); } @Override public byte [] serialize(String topic, Object data) { return SerializationUtils.serialize((Serializable) data); } @Override public void close () { System.out.println("close" ); } }
6.2、消费者 1 2 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class .getName ()) ;
实现org.apache.kafka.common.serialization.Deserializer
接口,重写deserialize接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class ObjectDeserializer implements Deserializer <Object > { @Override public void configure (Map<String, ?> configs, boolean isKey) { System.out.println("configure" ); } @Override public Object deserialize (String topic, byte [] data) { return SerializationUtils.deserialize(data); } @Override public void close () { System.out.println("close" ); } }
七、拦截器 实现org.apache.kafka.clients.producer.ProducerInterceptor
接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 public class UserDefineProducerInterceptor implements ProducerInterceptor { @Override public ProducerRecord onSend (ProducerRecord record) { ProducerRecord wrapRecord = new ProducerRecord(record.topic(), record.key() + "#" , record.value() + "#" ); wrapRecord.headers().add("user" , "test" .getBytes()); return wrapRecord; } @Override public void onAcknowledgement (RecordMetadata metadata, Exception exception) { System.out.println("metadata:" + metadata + ",exception:" + exception); } @Override public void close () { System.out.println("close" ); } @Override public void configure (Map<String, ?> configs) { System.out.println("configure" ); } } public class KafkaProducerTest { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class .getName ()) ; properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class .getName ()) ; properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,UserDefineProducerInterceptor.class .getName ()) ; KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0 ; i < 10 ; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("topic01" , "key-" + i, "value-" + i); producer.send(record); } producer.close(); } } public class KafkaConsumerTest { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092" ); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group01" ); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Pattern.compile("^topic01$" )); while (true ) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1 )); Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); while (iterator.hasNext()) { ConsumerRecord<String, String> record = iterator.next(); int partition = record.partition(); long offset = record.offset(); String topic = record.topic(); String key = record.key(); String value = record.value(); Headers headers = record.headers(); System.out.println("partition:" + partition + ",offset:" + offset + ",topic:" + topic + ",key:" + key + ",value:" + value + ",headers:" + headers); } } } }
partition:2,offset:19,topic:topic01,key:key-1#,value:value-1#,headers:RecordHeaders(headers = [RecordHeader(key = user, value = [116, 101, 115, 116])], isReadOnly = false) partition:2,offset:20,topic:topic01,key:key-5#,value:value-5#,headers:RecordHeaders(headers = [RecordHeader(key = user, value = [116, 101, 115, 116])], isReadOnly = false) partition:0,offset:18,topic:topic01,key:key-0#,value:value-0#,headers:RecordHeaders(headers = [RecordHeader(key = user, value = [116, 101, 115, 116])], isReadOnly = false) partition:0,offset:19,topic:topic01,key:key-3#,value:value-3#,headers:RecordHeaders(headers = [RecordHeader(key = user, value = [116, 101, 115, 116])], isReadOnly = false) partition:0,offset:20,topic:topic01,key:key-7#,value:value-7#,headers:RecordHeaders(headers = [RecordHeader(key = user, value = [116, 101, 115, 116])], isReadOnly = false) partition:0,offset:21,topic:topic01,key:key-9#,value:value-9#,headers:RecordHeaders(headers = [RecordHeader(key = user, value = [116, 101, 115, 116])], isReadOnly = false) partition:1,offset:18,topic:topic01,key:key-2#,value:value-2#,headers:RecordHeaders(headers = [RecordHeader(key = user, value = [116, 101, 115, 116])], isReadOnly = false) partition:1,offset:19,topic:topic01,key:key-4#,value:value-4#,headers:RecordHeaders(headers = [RecordHeader(key = user, value = [116, 101, 115, 116])], isReadOnly = false) partition:1,offset:20,topic:topic01,key:key-6#,value:value-6#,headers:RecordHeaders(headers = [RecordHeader(key = user, value = [116, 101, 115, 116])], isReadOnly = false) partition:1,offset:21,topic:topic01,key:key-8#,value:value-8#,headers:RecordHeaders(headers = [RecordHeader(key = user, value = [116, 101, 115, 116])], isReadOnly = false)
八、offset 8.1、首次消费策略
latest(默认):自动将偏移量重置为最新的偏移量
earliest:自动将偏移量重置为最早的偏移量
none:如果未找到消费组先前的偏移量,则抛出异常
1 2 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest" );
8.2、自动提交 1 2 3 4 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true ); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000 );
如果用户需要自己管理offset的自动提交,可以关闭offset的自动提交,手动管理offset提交的偏移量,注意用户提交的offset偏移量永远都要比本次消费的偏移量+1,因为提交的offset是kafka消费者下一次抓取数据的位置
8.3、手动提交 手动提交的offset需要手动加1,否则消费者重启后仍会再次接收到最后一条消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092" ); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1" ); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false ); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Pattern.compile("^topic04$" )); while (true ) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1 )); Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); while (iterator.hasNext()) { ConsumerRecord<String, String> record = iterator.next(); int partition = record.partition(); long offset = record.offset(); String topic = record.topic(); String key = record.key(); String value = record.value(); Headers headers = record.headers(); System.out.println("partition:" + partition + ",offset:" + offset + ",topic:" + topic + ",key:" + key + ",value:" + value + ",headers:" + headers); Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>(); offsets.put(new TopicPartition(record.topic(), partition), new OffsetAndMetadata(offset + 1 )); kafkaConsumer.commitAsync(offsets, new OffsetCommitCallback() { @Override public void onComplete (Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { System.out.println("完成:" + offset + "提交!" ); } }); } }
九、ack、retry Kafka生产者在发送完一个的消息之后,要求Broker在规定的额时间Ack应答,如果没有在规定时间内应答,Kafka生产者会尝试n次重新发送消息。
(默认)acks=1:Leader会将Record写到其本地日志中,但会在不等待所有Follower的完全确认的情况下做出响应。在这种情况下,如果Leader在确认记录后立即失败,但在Follower复制记录之前失败,则记录将丢失。
acks=0:生产者不会等待服务器的任何确认。该记录将立即添加到套接字缓冲区中并视为已发送。在这种情况下,不能保证服务器已收到记录。
acks=all:这意味着Leader将等待全套同步副本确认记录。这保证了只要至少一个同步副本仍处于活动状态,记录就不会丢失。这是最有力的保证。这等效于acks = -1设置。
如果生产者在规定的时间内,并没有得到Kafka的Leader的Ack应答,Kafka可以开启reties机制。
9.1、生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class .getName ()) ; properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class .getName ()) ; properties.put(ProducerConfig.ACKS_CONFIG, "all" ); properties.put(ProducerConfig.RETRIES_CONFIG, 3 ); properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1 ); properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,false ); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); for (int i = 0 ; i < 1 ; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("topic01" , "key" + i, "value" + i); kafkaProducer.send(record); } Thread.sleep(500 ); kafkaProducer.close();
9.2、消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092" ); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group01" ); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Pattern.compile("^topic01$" )); while (true ) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1 )); Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); while (iterator.hasNext()) { ConsumerRecord<String, String> record = iterator.next(); int partition = record.partition(); long offset = record.offset(); String topic = record.topic(); String key = record.key(); String value = record.value(); Headers headers = record.headers(); System.out.println("partition:" + partition + ",offset:" + offset + ",topic:" + topic + ",key:" + key + ",value:" + value + ",headers:" + headers); } }
partition:1,offset:39,topic:topic01,key:key0,value:value0,headers:RecordHeaders(headers = [], isReadOnly = false) partition:1,offset:40,topic:topic01,key:key0,value:value0,headers:RecordHeaders(headers = [], isReadOnly = false) partition:1,offset:41,topic:topic01,key:key0,value:value0,headers:RecordHeaders(headers = [], isReadOnly = false) partition:1,offset:42,topic:topic01,key:key0,value:value0,headers:RecordHeaders(headers = [], isReadOnly = false)
9.3、幂等性 当开启ack响应重试时,kafka可能会接收到多条重复数据
可以通过properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)
开启幂等
Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber
ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。
引入幂等性特性后,在每条消息中附带了PID(ProducerID)和SequenceNumber,broker的分区判断消息的SequenceNumber是否比自己当前消息的SequenceNumber大1,是则接收消息,不是则为重复消息
使用幂等性时,必须开启retries=true和acks=all
十、事务 kafka的幂等性只能保证一条记录在分区发送的原子性,多条记录(多分区)之间的完成性需要通过kafka的事务来保证
默认消费者消费消息的级别是read_uncommited
如果开启事务控制,消费者需要将事务隔离级别设置为read_committed;生产者需要指定transactional.id
,需要保证transactional.id
取值唯一,同一时刻只能有一个
10.1、生产者事务Only 控制生产者多条记录的发送处于同一事务中,一条记录失败则整批记录回滚
10.1.1、生产者 开启幂等性、ack、事务配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class .getName ()) ; properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class .getName ()) ; properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txid" + UUID.randomUUID()); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 ); properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000 ); properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true ); properties.put(ProducerConfig.ACKS_CONFIG, "all" ); properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 20 ); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); kafkaProducer.initTransactions(); try { kafkaProducer.beginTransaction(); for (int i = 0 ; i < 10 ; i++) { if (i == 8 ) { System.out.println(i / 0 ); } ProducerRecord<String, String> record = new ProducerRecord<>("topic01" , "key" + i, "value" + i); kafkaProducer.send(record); } kafkaProducer.flush(); kafkaProducer.commitTransaction(); } catch (ProducerFencedException e) { e.printStackTrace(); kafkaProducer.abortTransaction(); } kafkaProducer.close();
10.1.2、消费者 设置消费者隔离级别为:read_committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092" ); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group01" ); properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed" ); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Pattern.compile("^topic01$" )); while (true ) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1 )); Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); while (iterator.hasNext()) { ConsumerRecord<String, String> record = iterator.next(); int partition = record.partition(); long offset = record.offset(); String topic = record.topic(); String key = record.key(); String value = record.value(); Headers headers = record.headers(); System.out.println("partition:" + partition + ",offset:" + offset + ",topic:" + topic + ",key:" + key + ",value:" + value + ",headers:" + headers); } }
10.2、消费者&生产者事务 当消费端存在消费处理后,消息需要再次发送的情况下,通过“消费者&生产者事务”将其保持为一个整体,消费处理失败回滚事务,保证发出去的消息不被消费
10.2.1、生产者A 将消息发送给topic01
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class .getName ()) ; properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class .getName ()) ; properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txid" + UUID.randomUUID()); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 ); properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000 ); properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true ); properties.put(ProducerConfig.ACKS_CONFIG, "all" ); properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 20 ); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); kafkaProducer.initTransactions(); try { kafkaProducer.beginTransaction(); for (int i = 0 ; i < 10 ; i++) { if (i == 8 ) { System.out.println(i / 0 ); } ProducerRecord<String, String> record = new ProducerRecord<>("topic01" , "key" + i, "value" + i); kafkaProducer.send(record); } kafkaProducer.flush(); kafkaProducer.commitTransaction(); } catch (ProducerFencedException e) { e.printStackTrace(); kafkaProducer.abortTransaction(); } kafkaProducer.close();
10.2.2、消费者A&生产者B 消费者A 消费生产者A 的记录,处理加工后由生产者B 发送给topic02
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 private static KafkaConsumer<String, String> buildKafkaConsumer (String groupId) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092" ); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed" ); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false ); return new KafkaConsumer<String, String>(properties); } private static KafkaProducer<String, String> buildKafkaProducer () { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class .getName ()) ; properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class .getName ()) ; properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txid-" + UUID.randomUUID()); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 ); properties.put(ProducerConfig.LINGER_MS_CONFIG, 5 ); properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true ); properties.put(ProducerConfig.ACKS_CONFIG, "all" ); properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 20000 ); return new KafkaProducer<String, String>(properties); } public static void main (String[] args) { KafkaProducer<String, String> producer = buildKafkaProducer(); KafkaConsumer<String, String> consumer = buildKafkaConsumer("group02" ); producer.initTransactions(); consumer.subscribe(Collections.singleton("topic01" )); while (true ) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1 )); Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); producer.beginTransaction(); Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(); try { while (iterator.hasNext()) { ConsumerRecord<String, String> record = iterator.next(); offsetMap.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1 )); ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic02" , record.key(), record.value() + "-again" ); producer.send(producerRecord); } producer.sendOffsetsToTransaction(offsetMap, consumer.groupMetadata()); producer.commitTransaction(); } catch (Exception e) { e.printStackTrace(); producer.abortTransaction(); } } }
10.2.3、消费者B 消费者B消费记录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092" ); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group01" ); properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed" ); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Pattern.compile("^topic02$" )); while (true ) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1 )); Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); while (iterator.hasNext()) { ConsumerRecord<String, String> record = iterator.next(); int partition = record.partition(); long offset = record.offset(); String topic = record.topic(); String key = record.key(); String value = record.value(); Headers headers = record.headers(); System.out.println("partition:" + partition + ",offset:" + offset + ",topic:" + topic + ",key:" + key + ",value:" + value + ",headers:" + headers); } }