- 浏览: 2144570 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (1240)
- mac/IOS (287)
- flutter (1)
- J2EE (115)
- android基础知识 (582)
- android中级知识 (55)
- android组件(Widget)开发 (18)
- android 错误 (21)
- javascript (18)
- linux (70)
- 树莓派 (18)
- gwt/gxt (1)
- 工具(IDE)/包(jar) (18)
- web前端 (17)
- java 算法 (8)
- 其它 (5)
- chrome (7)
- 数据库 (8)
- 经济/金融 (0)
- english (2)
- HTML5 (7)
- 网络安全 (14)
- 设计欣赏/设计窗 (8)
- 汇编/C (8)
- 工具类 (4)
- 游戏 (5)
- 开发频道 (5)
- Android OpenGL (1)
- 科学 (4)
- 运维 (0)
- 好东西 (6)
- 美食 (1)
最新评论
-
liangzai_cool:
请教一下,文中,shell、C、Python三种方式控制led ...
树莓派 - MAX7219 -
jiazimo:
...
Kafka源码分析-序列5 -Producer -RecordAccumulator队列分析 -
hp321:
Windows该命令是不是需要安装什么软件才可以?我试过不行( ...
ImageIO读jpg的时候出现javax.imageio.IIOException: Unsupported Image Type -
hp321:
Chenzh_758 写道其实直接用一下代码就可以解决了:JP ...
ImageIO读jpg的时候出现javax.imageio.IIOException: Unsupported Image Type -
huanghonhpeng:
大哥你真强什么都会,研究研究。。。。小弟在这里学到了很多知识。 ...
android 浏览器
Kafka源码分析-序列5 -Producer -RecordAccumulator队列分析
- 博客分类:
- J2EE
在Kafka源码分析-序列2中,我们提到了整个Producer client的架构图,如下所示:
其它几个组件我们在前面都讲过了,今天讲述最后一个组件RecordAccumulator.
Batch发送
在以前的kafka client中,每条消息称为 “Message”,而在Java版client中,称之为”Record”,同时又因为有批量发送累积功能,所以称之为RecordAccumulator.
RecordAccumulator最大的一个特性就是batch消息,扔到队列中的多个消息,可能组成一个RecordBatch,然后由Sender一次性发送出去。
每个TopicPartition一个队列
下面是RecordAccumulator的内部结构,可以看到,每个TopicPartition对应一个消息队列,只有同一个TopicPartition的消息,才可能被batch。
batch的策略
那什么时候,消息会被batch,什么时候不会呢?下面从KafkaProducer的send方法看起:
从上面代码可以看到,batch逻辑,都在accumulator.append函数里面:
从上面代码我们可以看出Batch的策略:
1。如果是同步发送,每次去队列取,RecordBatch都会为空。这个时候,消息就不会batch,一个Record形成一个RecordBatch
2。Producer 入队速率 < Sender出队速率 && lingerMs = 0 ,消息也不会被batch
3。Producer 入队速率 > Sender出对速率, 消息会被batch
4。lingerMs > 0,这个时候Sender会等待,直到lingerMs > 0 或者 队列满了,或者超过了一个RecordBatch的最大值,就会发送。这个逻辑在RecordAccumulator的ready函数里面。
为什么是Deque?
在上面我们看到,消息队列用的是一个“双端队列“,而不是普通的队列。
一端生产,一端消费,用一个普通的队列不就可以吗,为什么要“双端“呢?
这其实是为了处理“发送失败,重试“的问题:当消息发送失败,要重发的时候,需要把消息优先放入队列头部重新发送,这就需要用到双端队列,在头部,而不是尾部加入。
当然,即使如此,该消息发出去的顺序,还是和Producer放进去的顺序不一致了。
其它几个组件我们在前面都讲过了,今天讲述最后一个组件RecordAccumulator.
Batch发送
在以前的kafka client中,每条消息称为 “Message”,而在Java版client中,称之为”Record”,同时又因为有批量发送累积功能,所以称之为RecordAccumulator.
RecordAccumulator最大的一个特性就是batch消息,扔到队列中的多个消息,可能组成一个RecordBatch,然后由Sender一次性发送出去。
每个TopicPartition一个队列
下面是RecordAccumulator的内部结构,可以看到,每个TopicPartition对应一个消息队列,只有同一个TopicPartition的消息,才可能被batch。
public final class RecordAccumulator { private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches; ... }
batch的策略
那什么时候,消息会被batch,什么时候不会呢?下面从KafkaProducer的send方法看起:
//KafkaProducer public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { try { // first make sure the metadata for the topic is available long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); ... RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs); //核心函数:把消息放入队列 if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future;
从上面代码可以看到,batch逻辑,都在accumulator.append函数里面:
public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException { appendsInProgress.incrementAndGet(); try { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); Deque<RecordBatch> dq = dequeFor(tp); //找到该topicPartiton对应的消息队列 synchronized (dq) { RecordBatch last = dq.peekLast(); //拿出队列的最后1个元素 if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); //最后一个元素, 即RecordBatch不为空,把该Record加入该RecordBatch if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } //队列里面没有RecordBatch,建一个新的,然后把Record放进去 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); } } finally { appendsInProgress.decrementAndGet(); } } private Deque<RecordBatch> dequeFor(TopicPartition tp) { Deque<RecordBatch> d = this.batches.get(tp); if (d != null) return d; d = new ArrayDeque<>(); Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d); if (previous == null) return d; else return previous; }
从上面代码我们可以看出Batch的策略:
1。如果是同步发送,每次去队列取,RecordBatch都会为空。这个时候,消息就不会batch,一个Record形成一个RecordBatch
2。Producer 入队速率 < Sender出队速率 && lingerMs = 0 ,消息也不会被batch
3。Producer 入队速率 > Sender出对速率, 消息会被batch
4。lingerMs > 0,这个时候Sender会等待,直到lingerMs > 0 或者 队列满了,或者超过了一个RecordBatch的最大值,就会发送。这个逻辑在RecordAccumulator的ready函数里面。
public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<Node>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; boolean unknownLeadersExist = false; boolean exhausted = this.free.queued() > 0; for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<RecordBatch> deque = entry.getValue(); Node leader = cluster.leaderFor(part); if (leader == null) { unknownLeadersExist = true; } else if (!readyNodes.contains(leader)) { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; long waitedTimeMs = nowMs - batch.lastAttemptMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired || exhausted || closed || flushInProgress(); //关键的一句话 if (sendable && !backingOff) { readyNodes.add(leader); } else { nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist); }
为什么是Deque?
在上面我们看到,消息队列用的是一个“双端队列“,而不是普通的队列。
一端生产,一端消费,用一个普通的队列不就可以吗,为什么要“双端“呢?
这其实是为了处理“发送失败,重试“的问题:当消息发送失败,要重发的时候,需要把消息优先放入队列头部重新发送,这就需要用到双端队列,在头部,而不是尾部加入。
当然,即使如此,该消息发出去的顺序,还是和Producer放进去的顺序不一致了。
发表评论
-
小程序textarea完美填坑
2020-07-07 16:09 457相信做微信小程序的码友们都被textarea这个原生组件坑过 ... -
Nginx+Https自己敲命令生成证书
2020-05-18 09:35 895一、准备 环境:centos6.8 ... -
https证书生成环境搭建配置(基于Tomcat和Nginx)
2020-04-24 11:06 777一、基于Tomcat、JDK内置密钥工具: 1、生成服务端证 ... -
史上最强Tomcat8性能优化
2019-11-01 21:41 729授人以鱼不如授人以渔 ... -
SpringBoot配置HTTPS,并实现HTTP访问自动转HTTPS访问
2019-10-07 09:13 5181.使用jdk自带的 keytools 创建证书 打开cmd ... -
Spring Boot工程集成全局唯一ID生成器 UidGenerator
2019-09-16 09:04 805概述 流水号生成器(全局唯一 ID生成器)是服务化系统的基础 ... -
CentOS7下Redis的安装与使用
2019-08-17 11:45 554一、手动安装过程 1、准备工作(安装gcc依赖) yum ... -
Nginx与tomcat组合的简单使用
2019-08-17 10:05 365配置tomcat跳转 请求http出现400的时候在这里配置 ... -
linux下lvs+keepalived安装配置
2019-07-10 14:20 429keepalived主机:192.168.174. ... -
使用Docker搭建Tomcat运行环境
2019-02-08 21:32 4451 准备宿主系统 准备一 ... -
Netty笔记-GlobalEventExecutor
2019-02-06 23:00 5771.概念 /** * Single-thread si ... -
Netty4转发服务的实现方案
2019-02-06 15:03 1040如果用Netty做转发服务(不需要同步应答),Netty中有一 ... -
java手机号归属地查询
2018-12-25 17:16 702所需的包:carrier-1.75.jar 、geocoder ... -
基于Netty4的HttpServer和HttpClient的简单实现
2018-10-17 20:02 626Http 消息格式: Http request: Met ... -
javafx : 支持使用微调(spinner)控制的数字的文本框(NemberTextField)
2018-10-16 00:00 992最近花了一些时间学习javaFX, 要更深入地理解新GUI包, ... -
我的Java(定制你的Java/JavaFX Runtime)
2018-10-12 23:29 629最新的JDK 11发布了,撒花 新版本的JDK终于有了ope ... -
javaFX的几个新特性,让swing彻底过时
2018-10-12 22:42 605首先声明,Java的GUI曾经 ... -
mac os系统用install4j把jar包生成app
2018-10-05 23:02 1345install4j有windows版也有mac版 mac电脑 ... -
JavaFX Alert对话框
2018-10-05 22:01 22401. 标准对话框 消息对话框 Alert alert = ... -
IDEA Properties中文unicode转码问题
2017-02-17 19:54 926摘要: 如何让IDEA的properties中的中文进行uni ...
相关推荐
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=6.2.2 -Dfile=/root/kafka-schema-registry-client-6.2.2.jar -Dpackaging=jar 官网下载地址 packages....
赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...
已编译 Kafka-Manager-1.3.3.22 linux下直接解压解压kafka-manager-1.3.3.22.zip到/opt/module目录 [root@hadoop102 module]$ unzip kafka-manager-1.3.3.22.zip 4)进入到/opt/module/kafka-manager-1.3.3.22/...
赠送jar包:flink-connector-kafka_2.12-1.14.3.jar 赠送原API文档:flink-connector-kafka_2.12-1.14.3-javadoc.jar 赠送源代码:flink-connector-kafka_2.12-1.14.3-sources.jar 包含翻译后的API文档:flink-...
赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...
赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.10.0.1.pom; 包含...
kafka_2.11-2.0.0.tgz, kafka_2.11-2.0.1.tgz, kafka_2.11-2.1.0.tgz, kafka_2.11-2.1.1.tgz, kafka_2.11-2.2.0.tgz, kafka_2.11-2.2.1.tgz, kafka_2.11-2.2.2.tgz, kafka_2.11-2.3.0.tgz, kafka_2.11-2.3.1.tgz, ...
赠送jar包:kafka-clients-2.2.0.jar; 赠送原API文档:kafka-clients-2.2.0-javadoc.jar; 赠送源代码:kafka-clients-2.2.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.2.0.pom; 包含翻译后的API文档...
kafka-schema-registry-client-3.2.0.jar包,亲测可用,在aliyun仓库内找不到,可以下载此jar包来进行手动安装
赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...
spark-streaming-kafka-0-10_2.11-2.4.0-cdh6.1.1.jar
赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...
spark-streaming-kafka-0-8_2.11-2.4.0.jar
kafka-eagle-bin-2.1.0.tar.gz 2022年7月份下载,最新版
kafka-schema-registry-client-3.3.1.jar包,在aliyun 仓库内无法下载,可以下载此jar包然后手动安装
大数据监控工具kafka监控工具kafka-eagle-bin-1.4.2.tar.gz,比较简单好用。
spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar
kafka_2.10-0.8.1.1kafka_2.10-0.8.1.1kafka_2.10-0.8.1.1kafka_2.10-0.8.1.1kafka_2.10-0.8.1.1kafka_2.10-0.8.1.1kafka_2.10-0.8.1.1kafka_2.10-0.8.1.1kafka_2.10-0.8.1.1kafka_2.10-0.8.1.1kafka_2.10-0.8.1.1...
说明:kafka-manager 自己下载编译速度巨慢,此资源是编译好的 kafka-manager,版本是:kafka-manager-1.3.3.7(适用于较新的版本,kafka版本是kafka_2.11-2.0.1)。 安装配置说明: 1. 里头有个自己写的启动脚本,...
kafka-manager.jar 配置application.conf中的zk地址后可直接启动 bin/kafka-manager -Dconfig.file=/kafka-manager-2.0.0.2/conf/application.conf -Dhttp.port=8888