Kafka4.1.0 队列模式尝鲜

  • 时间:2025-11-11 21:03 作者: 来源: 阅读:0
  • 扫一扫,手机访问
摘要:背景kafka 一直以来的消费模型是一个 topic 下多个 partition,每个 partition 有一个消费者。其中 partition 的数量决定了消费者的并发度。在 KIP-932 的提案中提出了队列模型,在4.0.0版本引入在4.1.0版本为预览版。C 使用docker启动我们使用 docker 启动一个开启了预览版本的 broker。本文使用的 native-imagedocke

背景

kafka 一直以来的消费模型是一个 topic 下多个 partition,每个 partition 有一个消费者。其中 partition 的数量决定了消费者的并发度。在 KIP-932 的提案中提出了队列模型,在4.0.0版本引入在4.1.0版本为预览版。

C 使用docker启动

我们使用 docker 启动一个开启了预览版本的 broker。本文使用的 native-image

docker run -d  
  -p 9092:9092 
  --name kafka410 
  -e KAFKA_NODE_ID=1 
  -e KAFKA_PROCESS_ROLES=broker,controller 
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTE #技术分享XT://localhost:9092 
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER 
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT 
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 
  -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 
  -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 
  -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 
  -e KAFKA_NUM_PARTITIONS=3 
  -e KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS=classic,consumer,share 
  -e KAFKA_UNSTABLE_API_VERSIONS_ENABLE=true 
  -e KAFKA_GROUP_SHARE_ENABLE=true 
  -e KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR=1 
  apache/kafka-native:4.1.1-rc1
  • KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS=classic,consumer,share 在rebalance协议中支持share协议
  • KAFKA_UNSTABLE_API_VERSIONS_ENABLE 解锁未stable的版本限制
  • KAFKA_GROUP_SHARE_ENABLE 开启共享组消费者模式
  • KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR 共享组(队列)模式使用一个topic来保存消费进度,这个是此topic的副本因子

使用spring-kafka消费

  • spring boot 使用的4.0.0-rc2
  • 并添加spring-kafka
  • implementation 'org.springframework.boot:spring-boot-starter-kafka'
package cc.sofast.practice.kafkagroupconsumer;

import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ShareKafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultShareConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ShareConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.ShareKafkaMessageListenerContainer;

import java.util.*;

@Configuration public class ShareConsumerConfig {

private static final Logger log = LoggerFactory.getLogger(ShareConsumerConfig.class);

@Bean public ShareConsumerFactory<String, String> shareConsumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultShareConsumerFactory<>(props); }

@Bean public ShareKafkaMessageListenerContainer<String, String> imageProcessingContainer( ShareConsumerFactory<String, String> shareConsumerFactory) {

ContainerProperties containerProps = new ContainerProperties("image-processing"); containerProps.setGroupId("image-processors");

ShareKafkaMessageListenerContainer<String, String> container = new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);

container.setConcurrency(10); container.setupMessageListener(new MessageListener<String, String>() { @Override public void onMessage(ConsumerRecord<String, String> record) { System.out.println("Received on " + Thread.currentThread().getName() + ": Key: " + record.key() + " Val: " + record.value()); } });

return container; }

@Bean public CommandLineRunner commandLineRunner(KafkaTemplate<String, String> template) { return new CommandLineRunner() { @Override public void run(String... args) throws Exception { new Timer().schedule(new TimerTask() { @Override public void run() { template.send("image-processing", UUID.randomUUID().toString(), "image-2-" + UUID.randomUUID().toString()); } }, 3000, 1000); } }; } }

参考

  • strimzi.io/blog/2025/0…
  • oso.sh/blog/kafka-…
  • hub.docker.com/r/apache/ka…
  • spring.io/blog/2025/1…
  • www.instaclustr.com/blog/apache…
  • 全部评论(0)
最新发布的资讯信息
【系统环境|】最低 2 美元,这 55 款 macOS & Windows 应用一次全都入手(2025-11-11 22:01)
【系统环境|】SCI期刊对论文图片有哪些要求?(2025-11-11 22:00)
【系统环境|】论文缩写大全,拿走不谢(2025-11-11 22:00)
【系统环境|】阿甘正传高频词整理 GRE托福四六级词汇整理(2025-11-11 21:59)
【系统环境|】矢量图形编辑应用程序-WinFIG(2025-11-11 21:59)
【系统环境|】Figma上市首日暴涨250%的深层逻辑:为什么AI时代协作平台更加不可替代?(2025-11-11 21:58)
【系统环境|】FigJam是什么?一文读懂在线白板软件的方方面面!(2025-11-11 21:58)
【系统环境|】在windows上有什么好用的书写白板软件?(2025-11-11 21:57)
【系统环境|】Docker基础应用之nginx(2025-11-11 21:57)
【系统环境|】VS Code 新手必装插件清单(2025-11-11 21:56)
手机二维码手机访问领取大礼包
返回顶部