数据结构论坛

首页 » 分类 » 定义 » Kafka成长记6Producer如何将
TUhjnbcbe - 2025/6/21 9:18:00
文章配图

之前我们分析了Producr的配置解析、组件分析、拉取元数据、消息的初步序列化方式、消息的路由策略。如下图:

文章配图

这一节我们继续分析发送消息的内存缓冲器原理—RcordAccumulator.appnd()。

如何将消息放入内存缓冲器的?

在doSnd中的,拉取元数据、消息的初步序列化方式、消息的路由策略之后就是accumulator.appnd()。

如下代码所示:(去除了多余的日志和异常处理,截取了核心代码)

privatFutuRcordMtadatadoSnd(ProducrRcordK,Vcord,Callbackcallback){TopicPartitiontp=null;try{//拉取元数据、消息的初步序列化方式、消息的路由策略longwaitdOnMtadataMs=waitOnMtadata(cord.topic(),this.maxBlockTimMs);longmainingWaitMs=Math.max(0,this.maxBlockTimMs-waitdOnMtadataMs);byt[]srializdKy=kySrializr.srializ(cord.topic(),cord.ky());byt[]srializdValu=valuSrializr.srializ(cord.topic(),cord.valu());intsrializdSiz=Rcords.LOG_OVERHEAD+Rcord.cordSiz(srializdKy,srializdValu);nsuValidRcordSiz(srializdSiz);tp=nwTopicPartition(cord.topic(),partition);longtimstamp=cord.timstamp()==null?tim.millisconds():cord.timstamp();CallbackintrcptCallback=this.intrcptors==null?callback:nwIntrcptorCallback(callback,this.intrcptors,tp);//将路由结果、初步序列化的消息放入到消息内存缓冲器中RcordAccumulator.RcordAppndRsultsult=accumulator.appnd(tp,timstamp,srializdKy,srializdValu,intrcptCallback,mainingWaitMs);if(sult.batchIsFull

sult.nwBatchCatd){this.sndr.wakup();}turnsult.futu;}catch(Excption){throw;}//省略其他各种异常捕获}

accumulator.appnd()它主要是将路由结果、初步序列化的消息放入到消息内存缓冲器中。

分析如何将消息放入内存缓冲器之前,需要回顾下它内部的基本结构。之前组件分析的时候,我们初步分析过RcordAccumulator的大体结构,如下图:

文章配图

1)设置了一些参数batchSiz、totalSiz、tryBackoffMs、lingrMs、

1
查看完整版本: Kafka成长记6Producer如何将