kafka 一直以来的消费模型是一个 topic 下多个 partition,每个 partition 有一个消费者。其中 partition 的数量决定了消费者的并发度。在 KIP-932 的提案中提出了队列模型,在4.0.0版本引入在4.1.0版本为预览版。
我们使用 docker 启动一个开启了预览版本的 broker。本文使用的 native-image
docker run -d
-p 9092:9092
--name kafka410
-e KAFKA_NODE_ID=1
-e KAFKA_PROCESS_ROLES=broker,controller
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
-e KAFKA_ADVERTISED_LISTENERS=PLAINTE #技术分享XT://localhost:9092
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
-e KAFKA_NUM_PARTITIONS=3
-e KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS=classic,consumer,share
-e KAFKA_UNSTABLE_API_VERSIONS_ENABLE=true
-e KAFKA_GROUP_SHARE_ENABLE=true
-e KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR=1
apache/kafka-native:4.1.1-rc1
package cc.sofast.practice.kafkagroupconsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ShareKafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultShareConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ShareConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.ShareKafkaMessageListenerContainer;
import java.util.*;
@Configuration public class ShareConsumerConfig {
private static final Logger log = LoggerFactory.getLogger(ShareConsumerConfig.class);
@Bean public ShareConsumerFactory<String, String> shareConsumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultShareConsumerFactory<>(props); }
@Bean public ShareKafkaMessageListenerContainer<String, String> imageProcessingContainer( ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("image-processing"); containerProps.setGroupId("image-processors");
ShareKafkaMessageListenerContainer<String, String> container = new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
container.setConcurrency(10); container.setupMessageListener(new MessageListener<String, String>() { @Override public void onMessage(ConsumerRecord<String, String> record) { System.out.println("Received on " + Thread.currentThread().getName() + ": Key: " + record.key() + " Val: " + record.value()); } });
return container; }
@Bean public CommandLineRunner commandLineRunner(KafkaTemplate<String, String> template) { return new CommandLineRunner() { @Override public void run(String... args) throws Exception { new Timer().schedule(new TimerTask() { @Override public void run() { template.send("image-processing", UUID.randomUUID().toString(), "image-2-" + UUID.randomUUID().toString()); } }, 3000, 1000); } }; } }