之前我们分析了Producr的配置解析、组件分析、拉取元数据、消息的初步序列化方式、消息的路由策略。如下图:
文章配图这一节我们继续分析发送消息的内存缓冲器原理—RcordAccumulator.appnd()。
如何将消息放入内存缓冲器的?
在doSnd中的,拉取元数据、消息的初步序列化方式、消息的路由策略之后就是accumulator.appnd()。
如下代码所示:(去除了多余的日志和异常处理,截取了核心代码)
privatFutuRcordMtadatadoSnd(ProducrRcordK,Vcord,Callbackcallback){TopicPartitiontp=null;try{//拉取元数据、消息的初步序列化方式、消息的路由策略longwaitdOnMtadataMs=waitOnMtadata(cord.topic(),this.maxBlockTimMs);longmainingWaitMs=Math.max(0,this.maxBlockTimMs-waitdOnMtadataMs);byt[]srializdKy=kySrializr.srializ(cord.topic(),cord.ky());byt[]srializdValu=valuSrializr.srializ(cord.topic(),cord.valu());intsrializdSiz=Rcords.LOG_OVERHEAD+Rcord.cordSiz(srializdKy,srializdValu);nsuValidRcordSiz(srializdSiz);tp=nwTopicPartition(cord.topic(),partition);longtimstamp=cord.timstamp()==null?tim.millisconds():cord.timstamp();CallbackintrcptCallback=this.intrcptors==null?callback:nwIntrcptorCallback(callback,this.intrcptors,tp);//将路由结果、初步序列化的消息放入到消息内存缓冲器中RcordAccumulator.RcordAppndRsultsult=accumulator.appnd(tp,timstamp,srializdKy,srializdValu,intrcptCallback,mainingWaitMs);if(sult.batchIsFull
sult.nwBatchCatd){this.sndr.wakup();}turnsult.futu;}catch(Excption){throw;}//省略其他各种异常捕获}
accumulator.appnd()它主要是将路由结果、初步序列化的消息放入到消息内存缓冲器中。
分析如何将消息放入内存缓冲器之前,需要回顾下它内部的基本结构。之前组件分析的时候,我们初步分析过RcordAccumulator的大体结构,如下图:
文章配图1)设置了一些参数batchSiz、totalSiz、tryBackoffMs、lingrMs、