绍圣--kafka之生产者(五)

  • 时间:2020-04-24 21:10 作者:绍圣 来源: 阅读:1045
  • 扫一扫,手机访问
摘要:在看很多讲kafka的文章里面都会说:kafka只保证单个partition的有序性,那么kafka是怎样保证有序的喃?使用RecordAccumulator的mutePartition和unmutePartition方法来配合实现有序性//记录tp能否还有未完成的RecordBatch,保证一个t

在看很多讲kafka的文章里面都会说:kafka只保证单个partition的有序性,那么kafka是怎样保证有序的喃?

使用RecordAccumulator的mutePartition和unmutePartition方法来配合实现有序性

//记录tp能否还有未完成的RecordBatch,保证一个tp的顺序性,当一个tp对应的RecordBatch要开始发送时,就将此tp加入到muted中,tp对应的RecordBatch发送完成后,删除muted中的tp

private final Set muted;

public void mutePartition(TopicPartition tp) { muted.add(tp); }

public void unmutePartition(TopicPartition tp) { muted.remove(tp); }

RecordAccumulator.ready方法中进行判断(伪代码)

public ReadyCheckResult ready(Cluster cluster, long nowMs) {

if (!readyNodes.contains(leader) && !muted.contains(part)) {}

}

if (!readyNodes.contains(leader) && !muted.contains(part)),假如muted中包含了这个tp,那么即便这个tp对应的leader存在,RecordBatch可以发送也不会去发送它,由于它上一个RecordBatch还没有解决完成。

RecordAccumulator.drain方法中进行判断(伪代码)

public Map> drain(Cluster cluster, Set nodes, int maxSize, long now) {

if (!muted.contains(tp)){}

}

if (!muted.contains(tp))在对RecordAccumulator中的记录进行重新组装的时候,仍旧会判断对应的tp能否在muted中。在muted中的仍旧不会选择出来发送。

在Sender中的变量:guaranteeMessageOrder:能否保持单个partition的有序性

在KafkaProducer的构造中

this.sender = new Sender(client, this.metadata, this.accumulator, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), this.metrics, new SystemTime(), clientId, this.requestTimeoutMs);

public Sender(KafkaClient client, Metadata metadata, RecordAccumulator accumulator, boolean guaranteeMessageOrder, int maxRequestSize, short acks, int retries, Metrics metrics, Time time, String clientId, int requestTimeout) { this.client = client; this.accumulator = accumulator; this.metadata = metadata; this.guaranteeMessageOrder = guaranteeMessageOrder; this.maxRequestSize = maxRequestSize; this.running = true; this.acks = acks; this.retries = retries; this.time = time; this.clientId = clientId; this.sensors = new SenderMetrics(metrics); this.requestTimeout = requestTimeout; }

guaranteeMessageOrder=config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1

我们可以在使用的时候设置max.in.flight.requests.per.connection来设置guaranteeMessageOrder的值。

mutePartition和unmutePartition方法都是在Sender中进行调用

mutePartition在Sender.run中调用

if (guaranteeMessageOrder) {

// 记录将要发送的topicPartition到mute中

for (List batchList : batches.values()) {

for (RecordBatch batch : batchList)

this.accumulator.mutePartition(batch.topicPartition);

}

}

发送的时候,把将要提交的RecordBatch的tp加到muted中。下次再需要发送tp里的RecordBatch的时候,假如muted里面包含了此tp,就不会选择出来发送。

在解决服务端响应的时候,清理muted中的tp

if (guaranteeMessageOrder) this.accumulator.unmutePartition(batch.topicPartition);

总结:要保证单partition的有序性,需要配置max.in.flight.requests.per.connection=1。

  • 全部评论(0)
最新发布的资讯信息
【系统环境|】2FA验证器 验证码如何登录(2024-04-01 20:18)
【系统环境|】怎么做才能建设好外贸网站?(2023-12-20 10:05)
【系统环境|数据库】 潮玩宇宙游戏道具收集方法(2023-12-12 16:13)
【系统环境|】遥遥领先!青否数字人直播系统5.0发布,支持真人接管实时驱动!(2023-10-12 17:31)
【系统环境|服务器应用】克隆自己的数字人形象需要几步?(2023-09-20 17:13)
【系统环境|】Tiktok登录教程(2023-02-13 14:17)
【系统环境|】ZORRO佐罗软件安装教程及一键新机使用方法详细简介(2023-02-10 21:56)
【系统环境|】阿里云 centos 云盘扩容命令(2023-01-10 16:35)
【系统环境|】补单系统搭建补单源码搭建(2022-05-18 11:35)
【系统环境|服务器应用】高端显卡再度登上热搜,竟然是因为“断崖式”的降价(2022-04-12 19:47)
手机二维码手机访问领取大礼包
返回顶部