数据结构论坛

首页 » 分类 » 定义 » Tendermint节点启动源码分析
TUhjnbcbe - 2025/4/10 17:21:00
有哪些白癜风医院 https://jbk.39.net/yiyuanfengcai/zn_bjzkbdfyy/

本文以官方示例公链Basecoin的basecoindstart命令为入口,结合日志与源码分析Tendermint节点创建、启动及生成区块等过程。

文中代码细节较多,但限于篇幅没有太深入某些细节,比如共识过程。如只想快速了解整体过程,可只阅读有basecoindstart日志部分的内容。

start命令入口

执行basecoindinit命令初始化genesis配置、priv-validator文件及pp-node文件后,在命令行执行basecoindstart启动节点。在不带有任何参数时,Tendermint会与ABCI应用一起执行:

日志:StartingABCIwithTendermintmodule=main

func(cmd*cobra.Command,args[]string)error{  if!viper.GetBool(flagWithTendermint){    ctx.Logger.Info("StartingABCIwithoutTendermint")    turnstartStandAlone(ctx,appCator)  }  ctx.Logger.Info("StartingABCIwithTendermint")  turnstartInProcess(ctx,appCator)}

创建节点

这一节把NewNode()

创建三条连接的客户端

startInProcess会创建Tendermint节点,然后启动节点。

创建节点时Tendermint与ABCI应用会创建建立三条连接所需的客户端:即query、mempool以及consensus连接。

**注意:**所有要启动服务都是通过BaseService.Start来执行的。

日志:StartingmultiAppConnmodule=proxyimpl=multiAppConn

NewNode()中相关代码:

proxyApp:=proxy.NewAppConns(clientCator,handshaker)//实际执行的是multiAppConn.multiAppConn()iferr:=proxyApp.Start();err!=nil{    turnnil,fmt.Errorf("Errorstartingproxyappconnections:%v",err)  }

三条连接的客户端的建立在multiAppConn.OnStart方法中完成,在这里三条连接的客户端都是localClient结构,由于localClient没有实现OnStart方法,它们的Start方法调用的都是cmn.BaseService的OnStart方法,也就是querycli.Start()(另两个也一样)除了打印日志外,什么都不做。

接下来分别把localClient封装成了appConnQuery、appConnMempool和appConnConsensus结构。

日志:StartinglocalClientmodule=abci-clientconnection=queryimpl=localClient

multiAppConn.OnStart()中相关代码:

//在创建节点时传入的clientCator的具体实现是localClientCator结构,//这里最终返回的是localClient结构querycli,err:=app.clientCator.NewABCIClient()//执行的是cmn.BaseService的OnStart方法,啥都没做iferr:=querycli.Start();err!=nil{    turnerrors.Wrap(err,"ErrorstartingABCIclient(queryconnection)")  }

其它两个连接的客户端与query连接客户端的创建相同。

握手同步

在建立完毕以上三条连接的客户端后,会执行app.handshaker.Handshake(app)来握手,确保Tendermint节点与应用程序的状态是同步的。

日志:

ABCIHandshakemodule=consensusappHeight=appHash=DC1ED0D0D1EE0CCBD7AEEAE7EABCIReplayBlocksmodule=consensusappHeight=stoHeight=stateHeight=CompletedABCIHandshake-TendermintandAppasyncedmodule=consensusappHeight=appHash=DC1ED0D0D1EE0CCBD7AEEAE7E

在query连接上通过ABCIInfo查询ABCI应用的blocksto中的最新状态,然后将Tendermint节点的区块重放到此状态:

multiAppConn.OnStart()中相关代码:

ifapp.handshaker!=nil{turnapp.handshaker.Handshake(app)}

Handshaker.Handshake()中相关代码:

//从ABCI应用的blocksto中获取最新状态s,err:=proxyApp.Query().InfoSync(abci.RequestInfo{version.Version})//重放所有区块到最新状态_,err=h.ReplayBlocks(h.initialState,appHash,blockHeight,proxyApp)

重放完毕后,Tendermint节点已经和ABCI应用同步到了相同的区块高度。

快速同步设置及验证人节点确认

在进入代码细节之前,先了解一下快速同步的概念。

快速同步(FastSync):在当前节点落后于区块链的最新状态时,需要进行节点间同步,快速同步只下载区块并检查验证人的默克尔树,比运行实时一致性八卦协议快得多。一旦追上其它节点的状态,守护进程将切换出快速同步并进入正常共识模式。在运行一段时间后,如果此节点至少有一个peer,并且其区块高度至少与最大的报告的peer高度一样高,则认为该节点已追上区块链最新状态(即caughtup)。

现在回到NewNode()源码,当节点同步到ABCI应用的最新状态后,会检查当前状态的验证人集合中是否只有当前节点一个验证人,如果是,则无需快速同步。

//重新从数据库加载状态,因为可能在握手时有更新state=sm.LoadState(stateDB)//Decidewhethertofast-syncornot//Wedontfast-syncwhentheonlyvalidatorisus.//此字段用来指定当此节点在区块链末端有许多区块需要同步时,是否启用快速同步功能,默认为truefastSync:=config.FastSyncifstate.Validators.Size()==1{addr,_:=state.Validators.GetByIndex(0)ifbytes.Equal(privValidator.GetAddss(),addr){fastSync=false}}

日志:

Thisnodeisavalidatormodule=consensusaddr=FFAFDCDBFA87DDEC5F0E1pubKey=PubKeyEd{76CA1AE9AFB1FDCBE1A00CFE5EEDBEBDCC05CD1A76AEFB}

NewNode()中相关代码,显示当前节点是否是验证人:

//Logwhetherthisnodeisavalidatororanobserverifstate.Validators.HasAddss(privValidator.GetAddss()){consensusLogger.Info("Thisnodeisavalidator","addr",privValidator.GetAddss(),"pubKey",privValidator.GetPubKey())}else{consensusLogger.Info("Thisnodeisnotavalidator","addr",privValidator.GetAddss(),"pubKey",privValidator.GetPubKey())}

创建各种Reactor

Reactor是处理各类传入消息的结构,一共有5种类型,通过将其添加到Switch中来实现。先熟悉一下Switch的结构:

Switch处理peer连接,并暴露一个API以在各类Reactor上接收传入的消息。每个Reactor负责处理一个或多个“Channels”的传入消息。因此,发送传出消息通常在peer执行,传入的消息在Reactor上接收。

typeSwitchstruct{cmn.BaseServiceconfig*config.PPConfiglisteners[]Listener//添加的Reactor都存在这里actorsmap[string]ReactorchDescs[]*conn.ChannelDescriptoractorsByChmap[byte]Reactorpeers*PeerSetdialing*cmn.CMapconnecting*cmn.CMapnodeInfoNodeInfo//ournodeinfonodeKey*NodeKey//ournodeprivkeyaddrBookAddrBookfilterConnByAddrfunc(net.Addr)errorfilterConnByIDfunc(ID)errorrng*cmn.Rand//seedforrandomizingdialtimesandorders}

MempoolReactor

Mempool是一个有序的内存池,交易在被共识提议之前会存储在这里,而在存储到这里之前会通过ABCI应用的CheckTx方法检查其合法性。

先看NewNode()中相关代码:

mempoolLogger:=logger.With("module","mempool")//创建Mempoolmempool:=mempl.NewMempool(config.Mempool,proxyApp.Mempool(),state.LastBlockHeight)//初始化Mempool的write-aheadlog(确保可以从任何形式的崩溃中恢复过来)mempool.InitWAL()//noneedtohavethemempoolwalduringtestsmempool.SetLogger(mempoolLogger)//创建MempoolReactormempoolReactor:=mempl.NewMempoolReactor(config.Mempool,mempool)mempoolReactor.SetLogger(mempoolLogger)//这里根据配置,判断是否要等待有交易时才生成新区块ifconfig.Consensus.WaitForTxs(){mempool.EnableTxsAvailable()}

MempoolReactor用来在peer之间对mempool交易进行广播。看一下它的数据结构:

typeMempoolReactorstruct{pp.BaseReactorconfig*cfg.MempoolConfigMempool*Mempool}funcNewMempoolReactor(config*cfg.MempoolConfig,mempool*Mempool)*MempoolReactor{  memR:=MempoolReactor{    config:config,    Mempool:mempool,  }  memR.BaseReactor=*pp.NewBaseReactor("MempoolReactor",memR)  turnmemR}

EvidenceReactor

Evidence是一个接口,表示验证人的任何可证明的恶意活动,主要有DuplicateVoteEvidence(包含验证人签署两个相互矛盾的投票的证据。)这种实现。

NewNode()中相关代码:

evidenceDB,err:=dbProvider(DBContext{"evidence",config})iferr!=nil{turnnil,err}evidenceLogger:=logger.With("module","evidence")//EvidenceSto用来存储见过的所有Evidence,包括已提交的、已经过验证但没有广播的以及已经广播但未提交的evidenceSto:=evidence.NewEvidenceSto(evidenceDB)//EvidencePool在EvidenceSto中维护一组有效的EvidenceevidencePool:=evidence.NewEvidencePool(stateDB,evidenceSto)evidencePool.SetLogger(evidenceLogger)//创建EvidenceReactovidenceReactor:=evidence.NewEvidenceReactor(evidencePool)evidenceReactor.SetLogger(evidenceLogger)

EvidenceReactor用来在peer间对EvidencePool中Evidence进行广播。

typeEvidenceReactorstruct{pp.BaseReactorevpool*EvidencePooleventBus*types.EventBus}

BlockchainReactor

NewNode()中相关代码:

blockExecLogger:=logger.With("module","state")//BlockExecutor用来处理区块执行和状态更新。//它暴露一个ApplyBlock()方法,用来验证并执行区块、更新状态和ABCI应答,然后以原子方式提交//并更新mempool,最后保存状态。blockExec:=sm.NewBlockExecutor(stateDB,blockExecLogger,proxyApp.Consensus(),mempool,evidencePool)//创建BlockchainReactorbcReactor:=bc.NewBlockchainReactor(state.Copy(),blockExec,blockSto,fastSync)bcReactor.SetLogger(logger.With("module","blockchain"))

BlockchainReactor用来处理长期的catchup同步。

typeBlockchainReactorstruct{  pp.BaseReactor  //immutable  initialStatesm.State  blockExec*sm.BlockExecutor//区块的底层存储。主要存储三种类型的信息:BlockMeta、Blockpart和Commit  sto*BlockSto//当加入到BlockPool时,peer自己报告它们的高度。//从当前节点最新的pool.height开始,从报告的高于我们高度的peer顺序请求区块。//节点经常问peer他们当前的高度,这样我们就可以继续前进。//不断请求更高的区块直到到达限制。如果大多数请求没有可用的peer,并且没有处在peer限制,可以//切换到consensusactor  pool*BlockPool  fastSyncbool  questsCh-chanBlockRequest  errorsCh-chanpeerError}

ConsensusReactor

NewNode()中相关代码:

ConsensusState用来处理共识算法的执行。它处理投票和提案,一旦达成一致,将区块提交给区块链并针对ABCI应用执行它们。内部状态机从peer、内部验证人和定时器接收输入。

//创建ConsensusStateconsensusState:=cs.NewConsensusState(config.Consensus,state.Copy(),blockExec,blockSto,mempool,evidencePool)consensusState.SetLogger(consensusLogger)ifprivValidator!=nil{//设置PrivValidator用来投票consensusState.SetPrivValidator(privValidator)}//创建ConsensusReactorconsensusReactor:=cs.NewConsensusReactor(consensusState,fastSync)consensusReactor.SetLogger(consensusLogger)

ConsensusReactor用于共识服务。

typeConsensusReactorstruct{pp.BaseReactor//BaseService+pp.SwitchconS*ConsensusStatemtxsync.RWMutexfastSyncbooleventBus*types.EventBus}

把actor添加到Switch

NewNode()中相关代码:

把上面创建的四个Reactor添加到Switch中。

ppLogger:=logger.With("module","pp")sw:=pp.NewSwitch(config.PP)sw.SetLogger(ppLogger)sw.AddReactor("MEMPOOL",mempoolReactor)sw.AddReactor("BLOCKCHAIN",bcReactor)sw.AddReactor("CONSENSUS",consensusReactor)sw.AddReactor("EVIDENCE",evidenceReactor)

PEXReactor(可选的)

看代码之前先了解几个概念:

seeds:启动节点时可通过--pp.seeds标签来指定种子节点,可以从中获得许多其它peer的地址。

tendermintnode--pp.seeds"f9baeaa15fedf5e1ef78dd60f6c01f1a9e9c

1...:,d7a8e0fcf10aaf18c51d6a1d0df1bd

5.6.7.8:"

persistent_peers:可指定与当前节点保持持久连接的一组节点。或使用RPC端点/dial_peers来指定而无需停止Tendermint实例。

tendermintnode--pp.persistent_peers"9fcfbf58d77eacdd

10.11.1.1:,add0d7b9d17dcbafc69

10.11.1.1:"curllocalhost:/dial_peers?persistent=truepeers=\["9fcfbf58d77eacdd

10.11.1.1:","add0d7b9d17dcbafc69

10.11.1.1:"\]

PEX:peer-exchange协议的缩写,默认是开启的,在第一次启动后通常不需要种子。peer之间将传播已知的peer并形成一个网络。peer地址存储在addrbook中。

如果PEX模式是打开的,它应该处理种子的拨号,否则将由Switch来处理。

NewNode()中相关代码:

//创建addrBookaddrBook:=pex.NewAddrBook(config.PP.AddrBookFile(),config.PP.AddrBookStrict)addrBook.SetLogger(ppLogger.With("book",config.PP.AddrBookFile()))//如果开启了PEX模式ifconfig.PP.PexReactor{//创建PEXactorpexReactor:=pex.NewPEXReactor(addrBook,pex.PEXReactorConfig{Seeds:cmn.SplitAndTrim(config.PP.Seeds,",",""),SeedMode:config.PP.SeedMode,PrivatePeerIDs:cmn.SplitAndTrim(config.PP.PrivatePeerIDs,",","")})pexReactor.SetLogger(ppLogger)//添加到Switchsw.AddReactor("PEX",pexReactor)}sw.SetAddrBook(addrBook)

PEXReactor处理PEX并保证足够数量的peer连接到Switch。用AddrBook存储peer的NetAddss。为防止滥用,只接受来自peer的pexAddrsMsg,我们也发送了相应的pexRequestMsg。每个defaultEnsuPeersPeriod时间段内只接收一个pexRequestMsg。

FilterPeers

配置中的config.FilterPeers字段用来指定当连接到一个新peer时是否要查询ABCI应用,由应用用来决定是否要保持连接。

使用ABCI查询通过addr或pubkey来过滤peer,如果返回OK则添加peer。

NewNode()中相关代码:

ifconfig.FilterPeers{//设置两种类型的Filtersw.SetAddrFilter(func(addrnet.Addr)error{sQuery,err:=proxyApp.Query().QuerySync(abci.RequestQuery{Path:cmn.Fmt("/pp/filter/addr/%s",addr.String())})iferr!=nil{turnerr}ifsQuery.IsErr(){turnfmt.Errorf("Errorqueryingabciapp:%v",sQuery)}turnnil})sw.SetIDFilter(func(idpp.ID)error{sQuery,err:=proxyApp.Query().QuerySync(abci.RequestQuery{Path:cmn.Fmt("/pp/filter/pubkey/%s",id)})iferr!=nil{turnerr}ifsQuery.IsErr(){turnfmt.Errorf("Errorqueryingabciapp:%v",sQuery)}turnnil})}

设置EventBus

EventBus是一个通过此系统的所有事件的事件总线,所有的调用都代理到底层的pubsub服务器。所有事件都必须用EventBus来发布,以保证正确的数据类型。

typeEventBusstruct{  cmn.BaseService//Server允许client订阅/取消订阅消息,带或不带tag发布消息,并管理内部状态  pubsub*tmpubsub.Server}

NewNode()中相关代码:

eventBus:=types.NewEventBus()eventBus.SetLogger(logger.With("module","events"))//将要发布和/或订阅消息(事件)的服务//consensusReactor将在consensusState和blockExecutor上设置eventBusconsensusReactor.SetEventBus(eventBus)

设置TxIndexer

TxIndexer是一个接口,定义了索引和搜索交易的方法。它有两种实现,一个是kv.TxIndex,由键值存储支持(levelDB),另一个是null.TxIndex,即不设置索引(默认的)。

IndexerService会把TxIndexer和EventBus连接在一起,以对来自EventBus的交易进行索引。

typeIndexerServicestruct{  cmn.BaseService  idrTxIndexer  eventBus*types.EventBus}

NewNode()中相关代码:

vartxIndexertxindex.TxIndexerswitchconfig.TxIndex.Indexer{//键值存储索引case"kv"://创建DBsto,err:=dbProvider(DBContext{"tx_index",config})iferr!=nil{turnnil,err}//有指定要索引的标签列表(以逗号分隔)ifconfig.TxIndex.IndexTags!=""{txIndexer=kv.NewTxIndex(sto,kv.IndexTags(cmn.SplitAndTrim(config.TxIndex.IndexTags,",","")))//要索引所有标签}elseifconfig.TxIndex.IndexAllTags{txIndexer=kv.NewTxIndex(sto,kv.IndexAllTags())//不索引标签}else{txIndexer=kv.NewTxIndex(sto)}default:txIndexer=null.TxIndex{}}//创建IndexerServiceindexerService:=txindex.NewIndexerService(txIndexer,eventBus)indexerService.SetLogger(logger.With("module","txindex"))

创建节点的BaseService

节点所需的所有字段内容已在以上部分创建完毕,在创建BaseService后,就可以启动节点了。

node.BaseService=*cmn.NewBaseService(logger,"Node",node)

创建节点总结

总结一下NewNode函数都做了哪些事情:

创建Tendermint与ABCI应用建立mempool、consensus和query连接所需的客户端。Tendermint节点与应用程序执行握手,确保其状态是同步的。根据配置config.FastSync及privValidator.GetAddss()方法,判断是否需要快速同步,是否是验证人节点。创建并在Switch中设置Reactor,即MempoolReactor、EvidenceReactor、BlockchainReactor、ConsensusReactor以及PEXReactor这五种。用来在peer上接收不同类型的消息。根据配置config.FilterPeers判断是否要用ABCI查询通过addr或pubkey来过滤要新连接的peer,如果返回OK则添加peer。创建并设置EventBus,订阅/发布事件。设置TxIndexer,对交易进行索引。

启动节点

回到startInProcess函数,在这里创建完毕节点后,执行Start方法,实际执行的是节点的OnStart方法。

接下来就看这个方法:

//日志:StartingNodemodule=nodeimpl=Nodefunc(n*Node)OnStart()error{//日志:StartingEventBusmodule=eventsimpl=EventBus  err:=n.eventBus.Start()  iferr!=nil{    turnerr  }//监听PP连接的端口  //日志:Locallistenermodule=ppip=::port=//日志:CouldnotperformUPNPdiscovermodule=pperr="writeudp0.0.0.0:-9.55.55.50::i/otimeout"//日志:StartingDefaultListenermodule=ppimpl=Listener(

.5.7.7:)  protocol,addss:=cmn.ProtocolAndAddss(n.config.PP.ListenAddss)  l:=pp.NewDefaultListener(protocol,addss,n.config.PP.SkipUPNP,n.Logger.With("module","pp"))  n.sw.AddListener(l)  //生成节点的PrivKey//日志:PPNodeIDmodule=nodeID=bddbe0bef9bfaaf5beabfile=/Users/LLLeon/.basecoind/config/node_key.json  nodeKey,err:=pp.LoadOrGenNodeKey(n.config.NodeKeyFile())  iferr!=nil{    turnerr  }  n.Logger.Info("PPNodeID","ID",nodeKey.ID(),"file",n.config.NodeKeyFile())  nodeInfo:=n.makeNodeInfo(nodeKey.ID())  n.sw.SetNodeInfo(nodeInfo)  n.sw.SetNodeKey(nodeKey)  //将自己添加到addrbook以防止连接自己//日志:Addouraddsstobookmodule=ppbook=/Users/LLLeon/.basecoind/config/addrbook.jsonaddr=bddbe0bef9bfaaf5beab

.5.7.7:  n.addrBook.AddOurAddss(nodeInfo.NetAddss())//在PP服务器之前启动RPC服务器,所以可以例如:接收第一个区块的交易//日志:StartingRPCHTTPserverontcp://0.0.0.0:module=rpc-server  ifn.config.RPC.ListenAddss!=""{    listeners,err:=n.startRPC()    iferr!=nil{      turnerr    }    n.rpcListeners=listeners  }  //启动switch(PP服务器),函数内部依次启动了各Reactor,//Switch会在端口(默认的)监听peer的连接,连接后会通过Reactor的Receive方法//接收来自peer的消息//日志:StartingPPSwitchmodule=ppimpl="PPSwitch"//日志:StartingBlockchainReactormodule=blockchainimpl=BlockchainReactor//日志:StartingConsensusReactormodule=consensusimpl=ConsensusReactor//日志:ConsensusReactormodule=consensusfastSync=false//日志:StartingConsensusStatemodule=consensusimpl=ConsensusState//日志:StartingbaseWALmodule=consensuswal=/Users/LLLeon/.basecoind/data/cs.wal/walimpl=baseWAL//日志:Catchupbyplayingconsensusmessagesmodule=consensusheight=5//日志:Replay:Donemodule=consensus//日志:StartingEvidenceReactormodule=evidenceimpl=EvidenceReactor//日志:StartingPEXReactormodule=ppimpl=PEXReactor//日志:StartingAddrBookmodule=ppbook=/Users/LLLeon/.basecoind/config/addrbook.jsonimpl=AddrBook//日志:enterNewRound(5/0).Curnt:5/0/RoundStepNewHeightmodule=consensusheight=5round=0//日志:enterPropose(5/0).Curnt:5/0/RoundStepNewRoundmodule=consensusheight=5round=0//日志:enterPropose:Ourturntoproposemodule=consensusheight=5round=0proposer=FFAFDCDBFA87DDEC5F0E1privValidator="PrivValidator{FFAFDCDBFA87DDEC5F0E1LH:,LR:0,LS:}"//日志:StartingMempoolReactormodule=mempoolimpl=MempoolReactor  err=n.sw.Start()  iferr!=nil{    turnerr  }  //始终连接到持久peers  ifn.config.PP.PersistentPeers!=""{    err=n.sw.DialPeersAsync(n.addrBook,cmn.SplitAndTrim(n.config.PP.PersistentPeers,",",""),true)    iferr!=nil{      turnerr    }  }  //启动交易的indexer//日志:StartingIndexerServicemodule=txindeximpl=IndexerService  turnn.indexerService.Start()}

至此,节点中各模块已经启动完毕,接下来进入catchup、共识协议等核心处理逻辑。

Tendermint核心处理逻辑

这部分包括区块的catchup、共识协议处理等内容。

这部分逻辑是在ConsensusReactor启动时处理的。

日志:

StartingConsensusReactormodule=consensusimpl=ConsensusReactorConsensusReactormodule=consensusfastSync=falseStartingConsensusStatemodule=consensusimpl=ConsensusStateStartingbaseWALmodule=consensuswal=/Users/LLLeon/.basecoind/data/cs.wal/walimpl=baseWALStartingTimeoutTickermodule=consensusimpl=TimeoutTickerCatchupbyplayingconsensusmessagesmodule=consensusheight=5Replay:Donemodule=consensus

先看ConsensusReactor启动的代码:

func(conR*ConsensusReactor)OnStart()error{conR.Logger.Info("ConsensusReactor","fastSync",conR.FastSync())iferr:=conR.BaseReactor.OnStart();err!=nil{turnerr}//订阅这几种类型的事件:EventNewRoundStep、EventVote和EventProposalHeartbeat,一旦接收到这些//类型的消息就会用在state中定义的pubsub广播给peerconR.subscribeToBroadcastEvents()//由于只有我们一个节点,所以会设置FastSync=false,会启动ConsensusStateif!conR.FastSync(){err:=conR.conS.Start()iferr!=nil{turnerr}}turnnil}

现在看ConsensusState的启动:

func(cs*ConsensusState)OnStart()error{  iferr:=cs.evsw.Start();err!=nil{    turnerr  }  //wemaysettheWALintestingbefocallingStart,  //soonlyOpenWALifitsstillthenilWAL  if_,ok:=cs.wal.(nilWAL);ok{    walFile:=cs.config.WalFile()//这里会启动baseWAL,它在处理消息之前会将其写入磁盘。可用于崩溃恢复和确定性重放    wal,err:=cs.OpenWAL(walFile)    iferr!=nil{      cs.Logger.Error("ErrorloadingConsensusStatewal","err",err.Error())      turnerr    }    cs.wal=wal  }  //weneedthetimeoutRoutineforplayso  //wedontblockonthetickchan.  //NOTE:wewillgetabuildupofgarbagegoroutines  //firingonthetockChanuntiltheceiveRoutineisstarted  //todealwiththem(bythatpoint,atmostonewillbevalid)//用来对每一步的超时进行控制//里面是在协程里面执行timeoutRoutine方法,内部会监听timeoutTicker.tickChan,进行到下一步时//会中止并重置旧timer,并更新timeoutInfo。tickChan上超时时间为0时,会立即转发到tockChan//日志:StartingTimeoutTickermodule=consensusimpl=TimeoutTicker  iferr:=cs.timeoutTicker.Start();err!=nil{    turnerr  }  //wemayhavelostsomevotesiftheprocesscrashed  //loadfromconsensuslogtocatchup//新建ConsensusState时已设置为true  ifcs.doWALCatchup{//catchup:仅重放自上一个区块以来的那些消息//日志:Catchupbyplayingconsensusmessagesmodule=consensusheight=5//日志:Replay:Donemodule=consensus    iferr:=cs.catchupReplay(cs.Height);err!=nil{      cs.Logger.Error("Erroroncatchupplay.ProceedingtostartConsensusStateanyway","err",err.Error())      //NOTE:ifweeverdoturnanerrorhe,      //makesutostopthetimeoutTicker    }  }//核心处理逻辑,主要的协程,详细解释见下面  gocs.ceiveRoutine(0)  //schedulethefirstround!  //useGetRoundStatesowedontracetheceiveRoutineforaccess  cs.scheduleRound0(cs.GetRoundState())  turnnil}

这个方法里面主要做了区块的catchup、在协程中启动ceiveRoutine()方法来接收并处理消息,以及执行scheduleRound0()方法来进入新回合,下面逐一说明。

catchup

在这次启动节点之前,已经运行过一段时间,区块最新高度为5。首先看节点在重启后是如何catchup到此高度的。

catchupReplay源码:

func(cs*ConsensusState)catchupReplay(csHeightint6)error{//SetplayModetotruesowedontlogsigningerrors.cs.playMode=truedeferfunc(){cs.playMode=false}()//Ensuthat#ENDHEIGHTforthisheightdoesntexist.//NOTE:Thisisjustasanitycheck.Asfarasweknowthingsworkfine//withoutit,andHandshakecoulduseConsensusStateifitwentfor//thischeck(sincewecancrashafterwriting#ENDHEIGHT).////Ignodatacorruptionerrorssincethisisasanitycheck.gr,found,err:=cs.wal.SearchForEndHeight(csHeight,WALSearchOptions{IgnoDataCorruptionErrors:true})iferr!=nil{turnerr}ifgr!=nil{iferr:=gr.Close();err!=nil{turnerr}}iffound{turnfmt.Errorf("WALshouldnotcontain#ENDHEIGHT%d",csHeight)}//Searchforlastheightmarker.////Ignodatacorruptionerrorsinpviousheightsbecauseweonlycaaboutlastheightgr,found,err=cs.wal.SearchForEndHeight(csHeight-1,WALSearchOptions{IgnoDataCorruptionErrors:true})iferr==io.EOF{cs.Logger.Error("Replay:wal.group.SearchturnedEOF","#ENDHEIGHT",csHeight-1)}elseiferr!=nil{turnerr}if!found{turnfmt.Errorf("Cannotplayheight%d.WALdoesnotcontain#ENDHEIGHTfor%d",csHeight,csHeight-1)}defergr.Close()//nolint:errcheckcs.Logger.Info("Catchupbyplayingconsensusmessages","height",csHeight)varmsg*TimedWALMessagedec:=WALDecoder{gr}for{msg,err=dec.Decode()iferr==io.EOF{bak}elseifIsDataCorruptionError(err){cs.Logger.Debug("datahasbeencorruptedinlastheightofconsensusWAL","err",err,"height",csHeight)panic(fmt.Sprintf("datahasbeencorrupted(%v)inlastheight%dofconsensusWAL",err,csHeight))}elseiferr!=nil{turnerr}//NOTE:sincetheprivkeyissetwhenthemsgsaceived//itwillattempttoegdoublesignbutwecanjustignoit//sincethevoteswillbeplayedandwellgettothenextstepiferr:=cs.adReplayMessage(msg,nil);err!=nil{turnerr}}cs.Logger.Info("Replay:Done")turnnil}

ceiveRoutine

核心处理逻辑:需要重点看一下cs.ceiveRoutine()方法。

这个方法处理可能引起状态转换的消息,参数maxSteps表示退出前要处理的消息数量,0表示永不退出。它持有RoundState并且是唯一更新它的地方。更新发生在超时、完成提案和/多数时。ConsensusState在任意内部状态更新之前必须是锁定的。

这个方法在单独的协程里面,接收并处理来自peer、节点内部或超时消息。

func(cs*ConsensusState)ceiveRoutine(maxStepsint){deferfunc(){ifr:=cover();r!=nil{cs.Logger.Error("CONSENSUSFAILURE!!!","err",r,"stack",string(debug.Stack()))}}()for{//到达设定的maxSteps时退出ifmaxSteps0{ifcs.nSteps=maxSteps{cs.Logger.Info("achedmaxsteps.exitingceiveroutine")cs.nSteps=0turn}}rs:=cs.RoundStatevarmimsgInfoselect{//TxsAvailable返回一个通道,每添加一个交易到mempool后会触发一次,//并且只有在mempool中的交易可用时才会触发。//如果没有调用EnableTxsAvailable,返回的channel可能为nil。caseheight:=-cs.mempool.TxsAvailable()://这里会执行enterPropose(),进入提议环节,完成后会执行enterPvote进入Pvote环节//日志:enterPropose(5/0).Curnt:5/0/RoundStepNewRoundmodule=consensusheight=5round=0//日志:enterPropose:Ourturntoproposemodule=consensusheight=5round=0proposer=FFAFDCDBFA87DDEC5F0E1privValidator="PrivValidator{FFAFDCDBFA87DDEC5F0E1LH:,LR:0,LS:}"cs.handleTxsAvailable(height)//接收到来自peer的消息casemi=-cs.peerMsgQueue:cs.wal.Write(mi)//handlesproposals,blockparts,votes//maygenerateinternalevents(votes,

1
查看完整版本: Tendermint节点启动源码分析