本文以官方示例公链Basecoin的basecoindstart命令为入口,结合日志与源码分析Tendermint节点创建、启动及生成区块等过程。
文中代码细节较多,但限于篇幅没有太深入某些细节,比如共识过程。如只想快速了解整体过程,可只阅读有basecoindstart日志部分的内容。
start命令入口
执行basecoindinit命令初始化genesis配置、priv-validator文件及pp-node文件后,在命令行执行basecoindstart启动节点。在不带有任何参数时,Tendermint会与ABCI应用一起执行:
日志:StartingABCIwithTendermintmodule=main
func(cmd*cobra.Command,args[]string)error{ if!viper.GetBool(flagWithTendermint){ ctx.Logger.Info("StartingABCIwithoutTendermint") turnstartStandAlone(ctx,appCator) } ctx.Logger.Info("StartingABCIwithTendermint") turnstartInProcess(ctx,appCator)}
创建节点
这一节把NewNode()
创建三条连接的客户端
startInProcess会创建Tendermint节点,然后启动节点。
创建节点时Tendermint与ABCI应用会创建建立三条连接所需的客户端:即query、mempool以及consensus连接。
**注意:**所有要启动服务都是通过BaseService.Start来执行的。
日志:StartingmultiAppConnmodule=proxyimpl=multiAppConn
NewNode()中相关代码:
proxyApp:=proxy.NewAppConns(clientCator,handshaker)//实际执行的是multiAppConn.multiAppConn()iferr:=proxyApp.Start();err!=nil{ turnnil,fmt.Errorf("Errorstartingproxyappconnections:%v",err) }
三条连接的客户端的建立在multiAppConn.OnStart方法中完成,在这里三条连接的客户端都是localClient结构,由于localClient没有实现OnStart方法,它们的Start方法调用的都是cmn.BaseService的OnStart方法,也就是querycli.Start()(另两个也一样)除了打印日志外,什么都不做。
接下来分别把localClient封装成了appConnQuery、appConnMempool和appConnConsensus结构。
日志:StartinglocalClientmodule=abci-clientconnection=queryimpl=localClient
multiAppConn.OnStart()中相关代码:
//在创建节点时传入的clientCator的具体实现是localClientCator结构,//这里最终返回的是localClient结构querycli,err:=app.clientCator.NewABCIClient()//执行的是cmn.BaseService的OnStart方法,啥都没做iferr:=querycli.Start();err!=nil{ turnerrors.Wrap(err,"ErrorstartingABCIclient(queryconnection)") }
其它两个连接的客户端与query连接客户端的创建相同。
握手同步
在建立完毕以上三条连接的客户端后,会执行app.handshaker.Handshake(app)来握手,确保Tendermint节点与应用程序的状态是同步的。
日志:
ABCIHandshakemodule=consensusappHeight=appHash=DC1ED0D0D1EE0CCBD7AEEAE7EABCIReplayBlocksmodule=consensusappHeight=stoHeight=stateHeight=CompletedABCIHandshake-TendermintandAppasyncedmodule=consensusappHeight=appHash=DC1ED0D0D1EE0CCBD7AEEAE7E
在query连接上通过ABCIInfo查询ABCI应用的blocksto中的最新状态,然后将Tendermint节点的区块重放到此状态:
multiAppConn.OnStart()中相关代码:
ifapp.handshaker!=nil{turnapp.handshaker.Handshake(app)}
Handshaker.Handshake()中相关代码:
//从ABCI应用的blocksto中获取最新状态s,err:=proxyApp.Query().InfoSync(abci.RequestInfo{version.Version})//重放所有区块到最新状态_,err=h.ReplayBlocks(h.initialState,appHash,blockHeight,proxyApp)
重放完毕后,Tendermint节点已经和ABCI应用同步到了相同的区块高度。
快速同步设置及验证人节点确认
在进入代码细节之前,先了解一下快速同步的概念。
快速同步(FastSync):在当前节点落后于区块链的最新状态时,需要进行节点间同步,快速同步只下载区块并检查验证人的默克尔树,比运行实时一致性八卦协议快得多。一旦追上其它节点的状态,守护进程将切换出快速同步并进入正常共识模式。在运行一段时间后,如果此节点至少有一个peer,并且其区块高度至少与最大的报告的peer高度一样高,则认为该节点已追上区块链最新状态(即caughtup)。
现在回到NewNode()源码,当节点同步到ABCI应用的最新状态后,会检查当前状态的验证人集合中是否只有当前节点一个验证人,如果是,则无需快速同步。
//重新从数据库加载状态,因为可能在握手时有更新state=sm.LoadState(stateDB)//Decidewhethertofast-syncornot//Wedontfast-syncwhentheonlyvalidatorisus.//此字段用来指定当此节点在区块链末端有许多区块需要同步时,是否启用快速同步功能,默认为truefastSync:=config.FastSyncifstate.Validators.Size()==1{addr,_:=state.Validators.GetByIndex(0)ifbytes.Equal(privValidator.GetAddss(),addr){fastSync=false}}
日志:
Thisnodeisavalidatormodule=consensusaddr=FFAFDCDBFA87DDEC5F0E1pubKey=PubKeyEd{76CA1AE9AFB1FDCBE1A00CFE5EEDBEBDCC05CD1A76AEFB}
NewNode()中相关代码,显示当前节点是否是验证人:
//Logwhetherthisnodeisavalidatororanobserverifstate.Validators.HasAddss(privValidator.GetAddss()){consensusLogger.Info("Thisnodeisavalidator","addr",privValidator.GetAddss(),"pubKey",privValidator.GetPubKey())}else{consensusLogger.Info("Thisnodeisnotavalidator","addr",privValidator.GetAddss(),"pubKey",privValidator.GetPubKey())}
创建各种Reactor
Reactor是处理各类传入消息的结构,一共有5种类型,通过将其添加到Switch中来实现。先熟悉一下Switch的结构:
Switch处理peer连接,并暴露一个API以在各类Reactor上接收传入的消息。每个Reactor负责处理一个或多个“Channels”的传入消息。因此,发送传出消息通常在peer执行,传入的消息在Reactor上接收。
typeSwitchstruct{cmn.BaseServiceconfig*config.PPConfiglisteners[]Listener//添加的Reactor都存在这里actorsmap[string]ReactorchDescs[]*conn.ChannelDescriptoractorsByChmap[byte]Reactorpeers*PeerSetdialing*cmn.CMapconnecting*cmn.CMapnodeInfoNodeInfo//ournodeinfonodeKey*NodeKey//ournodeprivkeyaddrBookAddrBookfilterConnByAddrfunc(net.Addr)errorfilterConnByIDfunc(ID)errorrng*cmn.Rand//seedforrandomizingdialtimesandorders}
MempoolReactor
Mempool是一个有序的内存池,交易在被共识提议之前会存储在这里,而在存储到这里之前会通过ABCI应用的CheckTx方法检查其合法性。
先看NewNode()中相关代码:
mempoolLogger:=logger.With("module","mempool")//创建Mempoolmempool:=mempl.NewMempool(config.Mempool,proxyApp.Mempool(),state.LastBlockHeight)//初始化Mempool的write-aheadlog(确保可以从任何形式的崩溃中恢复过来)mempool.InitWAL()//noneedtohavethemempoolwalduringtestsmempool.SetLogger(mempoolLogger)//创建MempoolReactormempoolReactor:=mempl.NewMempoolReactor(config.Mempool,mempool)mempoolReactor.SetLogger(mempoolLogger)//这里根据配置,判断是否要等待有交易时才生成新区块ifconfig.Consensus.WaitForTxs(){mempool.EnableTxsAvailable()}
MempoolReactor用来在peer之间对mempool交易进行广播。看一下它的数据结构:
typeMempoolReactorstruct{pp.BaseReactorconfig*cfg.MempoolConfigMempool*Mempool}funcNewMempoolReactor(config*cfg.MempoolConfig,mempool*Mempool)*MempoolReactor{ memR:=MempoolReactor{ config:config, Mempool:mempool, } memR.BaseReactor=*pp.NewBaseReactor("MempoolReactor",memR) turnmemR}
EvidenceReactor
Evidence是一个接口,表示验证人的任何可证明的恶意活动,主要有DuplicateVoteEvidence(包含验证人签署两个相互矛盾的投票的证据。)这种实现。
NewNode()中相关代码:
evidenceDB,err:=dbProvider(DBContext{"evidence",config})iferr!=nil{turnnil,err}evidenceLogger:=logger.With("module","evidence")//EvidenceSto用来存储见过的所有Evidence,包括已提交的、已经过验证但没有广播的以及已经广播但未提交的evidenceSto:=evidence.NewEvidenceSto(evidenceDB)//EvidencePool在EvidenceSto中维护一组有效的EvidenceevidencePool:=evidence.NewEvidencePool(stateDB,evidenceSto)evidencePool.SetLogger(evidenceLogger)//创建EvidenceReactovidenceReactor:=evidence.NewEvidenceReactor(evidencePool)evidenceReactor.SetLogger(evidenceLogger)
EvidenceReactor用来在peer间对EvidencePool中Evidence进行广播。
typeEvidenceReactorstruct{pp.BaseReactorevpool*EvidencePooleventBus*types.EventBus}
BlockchainReactor
NewNode()中相关代码:
blockExecLogger:=logger.With("module","state")//BlockExecutor用来处理区块执行和状态更新。//它暴露一个ApplyBlock()方法,用来验证并执行区块、更新状态和ABCI应答,然后以原子方式提交//并更新mempool,最后保存状态。blockExec:=sm.NewBlockExecutor(stateDB,blockExecLogger,proxyApp.Consensus(),mempool,evidencePool)//创建BlockchainReactorbcReactor:=bc.NewBlockchainReactor(state.Copy(),blockExec,blockSto,fastSync)bcReactor.SetLogger(logger.With("module","blockchain"))
BlockchainReactor用来处理长期的catchup同步。
typeBlockchainReactorstruct{ pp.BaseReactor //immutable initialStatesm.State blockExec*sm.BlockExecutor//区块的底层存储。主要存储三种类型的信息:BlockMeta、Blockpart和Commit sto*BlockSto//当加入到BlockPool时,peer自己报告它们的高度。//从当前节点最新的pool.height开始,从报告的高于我们高度的peer顺序请求区块。//节点经常问peer他们当前的高度,这样我们就可以继续前进。//不断请求更高的区块直到到达限制。如果大多数请求没有可用的peer,并且没有处在peer限制,可以//切换到consensusactor pool*BlockPool fastSyncbool questsCh-chanBlockRequest errorsCh-chanpeerError}
ConsensusReactor
NewNode()中相关代码:
ConsensusState用来处理共识算法的执行。它处理投票和提案,一旦达成一致,将区块提交给区块链并针对ABCI应用执行它们。内部状态机从peer、内部验证人和定时器接收输入。
//创建ConsensusStateconsensusState:=cs.NewConsensusState(config.Consensus,state.Copy(),blockExec,blockSto,mempool,evidencePool)consensusState.SetLogger(consensusLogger)ifprivValidator!=nil{//设置PrivValidator用来投票consensusState.SetPrivValidator(privValidator)}//创建ConsensusReactorconsensusReactor:=cs.NewConsensusReactor(consensusState,fastSync)consensusReactor.SetLogger(consensusLogger)
ConsensusReactor用于共识服务。
typeConsensusReactorstruct{pp.BaseReactor//BaseService+pp.SwitchconS*ConsensusStatemtxsync.RWMutexfastSyncbooleventBus*types.EventBus}
把actor添加到Switch
NewNode()中相关代码:
把上面创建的四个Reactor添加到Switch中。
ppLogger:=logger.With("module","pp")sw:=pp.NewSwitch(config.PP)sw.SetLogger(ppLogger)sw.AddReactor("MEMPOOL",mempoolReactor)sw.AddReactor("BLOCKCHAIN",bcReactor)sw.AddReactor("CONSENSUS",consensusReactor)sw.AddReactor("EVIDENCE",evidenceReactor)
PEXReactor(可选的)
看代码之前先了解几个概念:
seeds:启动节点时可通过--pp.seeds标签来指定种子节点,可以从中获得许多其它peer的地址。
tendermintnode--pp.seeds"f9baeaa15fedf5e1ef78dd60f6c01f1a9e9c
1...:,d7a8e0fcf10aaf18c51d6a1d0df1bd5.6.7.8:"persistent_peers:可指定与当前节点保持持久连接的一组节点。或使用RPC端点/dial_peers来指定而无需停止Tendermint实例。tendermintnode--pp.persistent_peers"9fcfbf58d77eacdd
10.11.1.1:,add0d7b9d17dcbafc6910.11.1.1:"curllocalhost:/dial_peers?persistent=truepeers=\["9fcfbf58d77eacdd10.11.1.1:","add0d7b9d17dcbafc6910.11.1.1:"\]PEX:peer-exchange协议的缩写,默认是开启的,在第一次启动后通常不需要种子。peer之间将传播已知的peer并形成一个网络。peer地址存储在addrbook中。如果PEX模式是打开的,它应该处理种子的拨号,否则将由Switch来处理。
NewNode()中相关代码:
//创建addrBookaddrBook:=pex.NewAddrBook(config.PP.AddrBookFile(),config.PP.AddrBookStrict)addrBook.SetLogger(ppLogger.With("book",config.PP.AddrBookFile()))//如果开启了PEX模式ifconfig.PP.PexReactor{//创建PEXactorpexReactor:=pex.NewPEXReactor(addrBook,pex.PEXReactorConfig{Seeds:cmn.SplitAndTrim(config.PP.Seeds,",",""),SeedMode:config.PP.SeedMode,PrivatePeerIDs:cmn.SplitAndTrim(config.PP.PrivatePeerIDs,",","")})pexReactor.SetLogger(ppLogger)//添加到Switchsw.AddReactor("PEX",pexReactor)}sw.SetAddrBook(addrBook)
PEXReactor处理PEX并保证足够数量的peer连接到Switch。用AddrBook存储peer的NetAddss。为防止滥用,只接受来自peer的pexAddrsMsg,我们也发送了相应的pexRequestMsg。每个defaultEnsuPeersPeriod时间段内只接收一个pexRequestMsg。
FilterPeers
配置中的config.FilterPeers字段用来指定当连接到一个新peer时是否要查询ABCI应用,由应用用来决定是否要保持连接。
使用ABCI查询通过addr或pubkey来过滤peer,如果返回OK则添加peer。
NewNode()中相关代码:
ifconfig.FilterPeers{//设置两种类型的Filtersw.SetAddrFilter(func(addrnet.Addr)error{sQuery,err:=proxyApp.Query().QuerySync(abci.RequestQuery{Path:cmn.Fmt("/pp/filter/addr/%s",addr.String())})iferr!=nil{turnerr}ifsQuery.IsErr(){turnfmt.Errorf("Errorqueryingabciapp:%v",sQuery)}turnnil})sw.SetIDFilter(func(idpp.ID)error{sQuery,err:=proxyApp.Query().QuerySync(abci.RequestQuery{Path:cmn.Fmt("/pp/filter/pubkey/%s",id)})iferr!=nil{turnerr}ifsQuery.IsErr(){turnfmt.Errorf("Errorqueryingabciapp:%v",sQuery)}turnnil})}
设置EventBus
EventBus是一个通过此系统的所有事件的事件总线,所有的调用都代理到底层的pubsub服务器。所有事件都必须用EventBus来发布,以保证正确的数据类型。
typeEventBusstruct{ cmn.BaseService//Server允许client订阅/取消订阅消息,带或不带tag发布消息,并管理内部状态 pubsub*tmpubsub.Server}
NewNode()中相关代码:
eventBus:=types.NewEventBus()eventBus.SetLogger(logger.With("module","events"))//将要发布和/或订阅消息(事件)的服务//consensusReactor将在consensusState和blockExecutor上设置eventBusconsensusReactor.SetEventBus(eventBus)
设置TxIndexer
TxIndexer是一个接口,定义了索引和搜索交易的方法。它有两种实现,一个是kv.TxIndex,由键值存储支持(levelDB),另一个是null.TxIndex,即不设置索引(默认的)。
IndexerService会把TxIndexer和EventBus连接在一起,以对来自EventBus的交易进行索引。
typeIndexerServicestruct{ cmn.BaseService idrTxIndexer eventBus*types.EventBus}
NewNode()中相关代码:
vartxIndexertxindex.TxIndexerswitchconfig.TxIndex.Indexer{//键值存储索引case"kv"://创建DBsto,err:=dbProvider(DBContext{"tx_index",config})iferr!=nil{turnnil,err}//有指定要索引的标签列表(以逗号分隔)ifconfig.TxIndex.IndexTags!=""{txIndexer=kv.NewTxIndex(sto,kv.IndexTags(cmn.SplitAndTrim(config.TxIndex.IndexTags,",","")))//要索引所有标签}elseifconfig.TxIndex.IndexAllTags{txIndexer=kv.NewTxIndex(sto,kv.IndexAllTags())//不索引标签}else{txIndexer=kv.NewTxIndex(sto)}default:txIndexer=null.TxIndex{}}//创建IndexerServiceindexerService:=txindex.NewIndexerService(txIndexer,eventBus)indexerService.SetLogger(logger.With("module","txindex"))
创建节点的BaseService
节点所需的所有字段内容已在以上部分创建完毕,在创建BaseService后,就可以启动节点了。
node.BaseService=*cmn.NewBaseService(logger,"Node",node)
创建节点总结
总结一下NewNode函数都做了哪些事情:
创建Tendermint与ABCI应用建立mempool、consensus和query连接所需的客户端。Tendermint节点与应用程序执行握手,确保其状态是同步的。根据配置config.FastSync及privValidator.GetAddss()方法,判断是否需要快速同步,是否是验证人节点。创建并在Switch中设置Reactor,即MempoolReactor、EvidenceReactor、BlockchainReactor、ConsensusReactor以及PEXReactor这五种。用来在peer上接收不同类型的消息。根据配置config.FilterPeers判断是否要用ABCI查询通过addr或pubkey来过滤要新连接的peer,如果返回OK则添加peer。创建并设置EventBus,订阅/发布事件。设置TxIndexer,对交易进行索引。
启动节点
回到startInProcess函数,在这里创建完毕节点后,执行Start方法,实际执行的是节点的OnStart方法。
接下来就看这个方法:
//日志:StartingNodemodule=nodeimpl=Nodefunc(n*Node)OnStart()error{//日志:StartingEventBusmodule=eventsimpl=EventBus err:=n.eventBus.Start() iferr!=nil{ turnerr }//监听PP连接的端口 //日志:Locallistenermodule=ppip=::port=//日志:CouldnotperformUPNPdiscovermodule=pperr="writeudp0.0.0.0:-9.55.55.50::i/otimeout"//日志:StartingDefaultListenermodule=ppimpl=Listener(
.5.7.7:) protocol,addss:=cmn.ProtocolAndAddss(n.config.PP.ListenAddss) l:=pp.NewDefaultListener(protocol,addss,n.config.PP.SkipUPNP,n.Logger.With("module","pp")) n.sw.AddListener(l) //生成节点的PrivKey//日志:PPNodeIDmodule=nodeID=bddbe0bef9bfaaf5beabfile=/Users/LLLeon/.basecoind/config/node_key.json nodeKey,err:=pp.LoadOrGenNodeKey(n.config.NodeKeyFile()) iferr!=nil{ turnerr } n.Logger.Info("PPNodeID","ID",nodeKey.ID(),"file",n.config.NodeKeyFile()) nodeInfo:=n.makeNodeInfo(nodeKey.ID()) n.sw.SetNodeInfo(nodeInfo) n.sw.SetNodeKey(nodeKey) //将自己添加到addrbook以防止连接自己//日志:Addouraddsstobookmodule=ppbook=/Users/LLLeon/.basecoind/config/addrbook.jsonaddr=bddbe0bef9bfaaf5beab.5.7.7: n.addrBook.AddOurAddss(nodeInfo.NetAddss())//在PP服务器之前启动RPC服务器,所以可以例如:接收第一个区块的交易//日志:StartingRPCHTTPserverontcp://0.0.0.0:module=rpc-server ifn.config.RPC.ListenAddss!=""{ listeners,err:=n.startRPC() iferr!=nil{ turnerr } n.rpcListeners=listeners } //启动switch(PP服务器),函数内部依次启动了各Reactor,//Switch会在端口(默认的)监听peer的连接,连接后会通过Reactor的Receive方法//接收来自peer的消息//日志:StartingPPSwitchmodule=ppimpl="PPSwitch"//日志:StartingBlockchainReactormodule=blockchainimpl=BlockchainReactor//日志:StartingConsensusReactormodule=consensusimpl=ConsensusReactor//日志:ConsensusReactormodule=consensusfastSync=false//日志:StartingConsensusStatemodule=consensusimpl=ConsensusState//日志:StartingbaseWALmodule=consensuswal=/Users/LLLeon/.basecoind/data/cs.wal/walimpl=baseWAL//日志:Catchupbyplayingconsensusmessagesmodule=consensusheight=5//日志:Replay:Donemodule=consensus//日志:StartingEvidenceReactormodule=evidenceimpl=EvidenceReactor//日志:StartingPEXReactormodule=ppimpl=PEXReactor//日志:StartingAddrBookmodule=ppbook=/Users/LLLeon/.basecoind/config/addrbook.jsonimpl=AddrBook//日志:enterNewRound(5/0).Curnt:5/0/RoundStepNewHeightmodule=consensusheight=5round=0//日志:enterPropose(5/0).Curnt:5/0/RoundStepNewRoundmodule=consensusheight=5round=0//日志:enterPropose:Ourturntoproposemodule=consensusheight=5round=0proposer=FFAFDCDBFA87DDEC5F0E1privValidator="PrivValidator{FFAFDCDBFA87DDEC5F0E1LH:,LR:0,LS:}"//日志:StartingMempoolReactormodule=mempoolimpl=MempoolReactor err=n.sw.Start() iferr!=nil{ turnerr } //始终连接到持久peers ifn.config.PP.PersistentPeers!=""{ err=n.sw.DialPeersAsync(n.addrBook,cmn.SplitAndTrim(n.config.PP.PersistentPeers,",",""),true) iferr!=nil{ turnerr } } //启动交易的indexer//日志:StartingIndexerServicemodule=txindeximpl=IndexerService turnn.indexerService.Start()}至此,节点中各模块已经启动完毕,接下来进入catchup、共识协议等核心处理逻辑。
Tendermint核心处理逻辑
这部分包括区块的catchup、共识协议处理等内容。
这部分逻辑是在ConsensusReactor启动时处理的。
日志:
StartingConsensusReactormodule=consensusimpl=ConsensusReactorConsensusReactormodule=consensusfastSync=falseStartingConsensusStatemodule=consensusimpl=ConsensusStateStartingbaseWALmodule=consensuswal=/Users/LLLeon/.basecoind/data/cs.wal/walimpl=baseWALStartingTimeoutTickermodule=consensusimpl=TimeoutTickerCatchupbyplayingconsensusmessagesmodule=consensusheight=5Replay:Donemodule=consensus
先看ConsensusReactor启动的代码:
func(conR*ConsensusReactor)OnStart()error{conR.Logger.Info("ConsensusReactor","fastSync",conR.FastSync())iferr:=conR.BaseReactor.OnStart();err!=nil{turnerr}//订阅这几种类型的事件:EventNewRoundStep、EventVote和EventProposalHeartbeat,一旦接收到这些//类型的消息就会用在state中定义的pubsub广播给peerconR.subscribeToBroadcastEvents()//由于只有我们一个节点,所以会设置FastSync=false,会启动ConsensusStateif!conR.FastSync(){err:=conR.conS.Start()iferr!=nil{turnerr}}turnnil}
现在看ConsensusState的启动:
func(cs*ConsensusState)OnStart()error{ iferr:=cs.evsw.Start();err!=nil{ turnerr } //wemaysettheWALintestingbefocallingStart, //soonlyOpenWALifitsstillthenilWAL if_,ok:=cs.wal.(nilWAL);ok{ walFile:=cs.config.WalFile()//这里会启动baseWAL,它在处理消息之前会将其写入磁盘。可用于崩溃恢复和确定性重放 wal,err:=cs.OpenWAL(walFile) iferr!=nil{ cs.Logger.Error("ErrorloadingConsensusStatewal","err",err.Error()) turnerr } cs.wal=wal } //weneedthetimeoutRoutineforplayso //wedontblockonthetickchan. //NOTE:wewillgetabuildupofgarbagegoroutines //firingonthetockChanuntiltheceiveRoutineisstarted //todealwiththem(bythatpoint,atmostonewillbevalid)//用来对每一步的超时进行控制//里面是在协程里面执行timeoutRoutine方法,内部会监听timeoutTicker.tickChan,进行到下一步时//会中止并重置旧timer,并更新timeoutInfo。tickChan上超时时间为0时,会立即转发到tockChan//日志:StartingTimeoutTickermodule=consensusimpl=TimeoutTicker iferr:=cs.timeoutTicker.Start();err!=nil{ turnerr } //wemayhavelostsomevotesiftheprocesscrashed //loadfromconsensuslogtocatchup//新建ConsensusState时已设置为true ifcs.doWALCatchup{//catchup:仅重放自上一个区块以来的那些消息//日志:Catchupbyplayingconsensusmessagesmodule=consensusheight=5//日志:Replay:Donemodule=consensus iferr:=cs.catchupReplay(cs.Height);err!=nil{ cs.Logger.Error("Erroroncatchupplay.ProceedingtostartConsensusStateanyway","err",err.Error()) //NOTE:ifweeverdoturnanerrorhe, //makesutostopthetimeoutTicker } }//核心处理逻辑,主要的协程,详细解释见下面 gocs.ceiveRoutine(0) //schedulethefirstround! //useGetRoundStatesowedontracetheceiveRoutineforaccess cs.scheduleRound0(cs.GetRoundState()) turnnil}
这个方法里面主要做了区块的catchup、在协程中启动ceiveRoutine()方法来接收并处理消息,以及执行scheduleRound0()方法来进入新回合,下面逐一说明。
catchup
在这次启动节点之前,已经运行过一段时间,区块最新高度为5。首先看节点在重启后是如何catchup到此高度的。
catchupReplay源码:
func(cs*ConsensusState)catchupReplay(csHeightint6)error{//SetplayModetotruesowedontlogsigningerrors.cs.playMode=truedeferfunc(){cs.playMode=false}()//Ensuthat#ENDHEIGHTforthisheightdoesntexist.//NOTE:Thisisjustasanitycheck.Asfarasweknowthingsworkfine//withoutit,andHandshakecoulduseConsensusStateifitwentfor//thischeck(sincewecancrashafterwriting#ENDHEIGHT).////Ignodatacorruptionerrorssincethisisasanitycheck.gr,found,err:=cs.wal.SearchForEndHeight(csHeight,WALSearchOptions{IgnoDataCorruptionErrors:true})iferr!=nil{turnerr}ifgr!=nil{iferr:=gr.Close();err!=nil{turnerr}}iffound{turnfmt.Errorf("WALshouldnotcontain#ENDHEIGHT%d",csHeight)}//Searchforlastheightmarker.////Ignodatacorruptionerrorsinpviousheightsbecauseweonlycaaboutlastheightgr,found,err=cs.wal.SearchForEndHeight(csHeight-1,WALSearchOptions{IgnoDataCorruptionErrors:true})iferr==io.EOF{cs.Logger.Error("Replay:wal.group.SearchturnedEOF","#ENDHEIGHT",csHeight-1)}elseiferr!=nil{turnerr}if!found{turnfmt.Errorf("Cannotplayheight%d.WALdoesnotcontain#ENDHEIGHTfor%d",csHeight,csHeight-1)}defergr.Close()//nolint:errcheckcs.Logger.Info("Catchupbyplayingconsensusmessages","height",csHeight)varmsg*TimedWALMessagedec:=WALDecoder{gr}for{msg,err=dec.Decode()iferr==io.EOF{bak}elseifIsDataCorruptionError(err){cs.Logger.Debug("datahasbeencorruptedinlastheightofconsensusWAL","err",err,"height",csHeight)panic(fmt.Sprintf("datahasbeencorrupted(%v)inlastheight%dofconsensusWAL",err,csHeight))}elseiferr!=nil{turnerr}//NOTE:sincetheprivkeyissetwhenthemsgsaceived//itwillattempttoegdoublesignbutwecanjustignoit//sincethevoteswillbeplayedandwellgettothenextstepiferr:=cs.adReplayMessage(msg,nil);err!=nil{turnerr}}cs.Logger.Info("Replay:Done")turnnil}
ceiveRoutine
核心处理逻辑:需要重点看一下cs.ceiveRoutine()方法。
这个方法处理可能引起状态转换的消息,参数maxSteps表示退出前要处理的消息数量,0表示永不退出。它持有RoundState并且是唯一更新它的地方。更新发生在超时、完成提案和/多数时。ConsensusState在任意内部状态更新之前必须是锁定的。
这个方法在单独的协程里面,接收并处理来自peer、节点内部或超时消息。
func(cs*ConsensusState)ceiveRoutine(maxStepsint){deferfunc(){ifr:=cover();r!=nil{cs.Logger.Error("CONSENSUSFAILURE!!!","err",r,"stack",string(debug.Stack()))}}()for{//到达设定的maxSteps时退出ifmaxSteps0{ifcs.nSteps=maxSteps{cs.Logger.Info("achedmaxsteps.exitingceiveroutine")cs.nSteps=0turn}}rs:=cs.RoundStatevarmimsgInfoselect{//TxsAvailable返回一个通道,每添加一个交易到mempool后会触发一次,//并且只有在mempool中的交易可用时才会触发。//如果没有调用EnableTxsAvailable,返回的channel可能为nil。caseheight:=-cs.mempool.TxsAvailable()://这里会执行enterPropose(),进入提议环节,完成后会执行enterPvote进入Pvote环节//日志:enterPropose(5/0).Curnt:5/0/RoundStepNewRoundmodule=consensusheight=5round=0//日志:enterPropose:Ourturntoproposemodule=consensusheight=5round=0proposer=FFAFDCDBFA87DDEC5F0E1privValidator="PrivValidator{FFAFDCDBFA87DDEC5F0E1LH:,LR:0,LS:}"cs.handleTxsAvailable(height)//接收到来自peer的消息casemi=-cs.peerMsgQueue:cs.wal.Write(mi)//handlesproposals,blockparts,votes//maygenerateinternalevents(votes,