[关闭]
@adamhand 2019-03-28T13:54:22.000000Z 字数 11035 阅读 1378

tendermint


简介

Tendermint是一个开源的完整的区块链实现,可以用于公链或联盟链,其官方定位是面向开发者的区块链共识引擎。尽管tendermint包含了区块链的完整实现,但它却是以SDK的形式将这些核心功能提供出来,供开发者方便地定制自己的专有区块链。

Tendermint 包含了两个主要的技术组件

Tendermint Core通过一个满足 ABCI 标准的 socket 协议与应用进行交流。此外,如果需要使用Tendermint开发区块链系统,还要实现client部分。所以,一个完整的基于tendermint的区块链系统包括三个部分:client部分,ABCI部分和tendermint core部分。如下图所示:



Tendermint Core

tendermint采用的共识机制属于一种权益证明( Proof Of Stake)算法,一组验证人(Validator)代替了矿工(Miner)的角色,依据抵押的权益比例轮流出块。

tendermint同时是拜占庭容错的(Byzantine Fault Tolerance),因此对于3f+1个验证节点组成的区块链,即使有f个节点出现拜占庭错误,也可以保证全局正确共识的达成。

下图是tendermint的状态机。



“验证人”(validator)轮流对交易区块进行提议,并对这些区块进行投票。区块会被提交到链上,每一个块占据一个“高度”(height)。

如果提交失败,协议就会开始下一轮的提交,并且一个新的验证人会继续提交那个高度的区块。

ABCI

ABCI 包含了 3 个主要的消息类型,它们由tendermint core 发送至应用,应用会对消息产生相应的回复。

一个应用可能有多个 ABCI socket 连接。Tendermint Core 给应用创建了三个 ABCI 连接:




tendermint docs


KVStore

KVStore是tendermint提供的一个例子,用来说明abci-cli client(模拟tendermint core)和abci应用服务端之间通过socket进行通信的过程。

源码分析

程序入口在tendermint\abci\cmd\abci-cli\main.go中的Execute()方法。这个方法主要做了三件事:

addGlobalFlags()

注册全局 Flags,主要包括:

程序如下所示:

  1. func addGlobalFlags() {
  2. RootCmd.PersistentFlags().StringVarP(&flagAddress, "address", "", "tcp://0.0.0.0:26658", "address of application socket")
  3. RootCmd.PersistentFlags().StringVarP(&flagAbci, "abci", "", "socket", "either socket or grpc")
  4. RootCmd.PersistentFlags().BoolVarP(&flagVerbose, "verbose", "v", false, "print the command and results as if it were a console session")
  5. RootCmd.PersistentFlags().StringVarP(&flagLogLevel, "log_level", "", "debug", "set the logger level")
  6. }

addCommands()

添加子命令到RootCmd命令,包括kvstore 和 dummy 以及 echoCmd、infoCmd、deliverTxCmd 和 commitCmd 等客户端命令。这些命令会在后面被触发执行。

  1. func addCommands() {
  2. RootCmd.AddCommand(batchCmd)
  3. RootCmd.AddCommand(consoleCmd)
  4. RootCmd.AddCommand(echoCmd)
  5. RootCmd.AddCommand(infoCmd)
  6. RootCmd.AddCommand(setOptionCmd)
  7. RootCmd.AddCommand(deliverTxCmd)
  8. RootCmd.AddCommand(checkTxCmd)
  9. RootCmd.AddCommand(commitCmd)
  10. RootCmd.AddCommand(versionCmd)
  11. RootCmd.AddCommand(testCmd)
  12. addQueryFlags()
  13. RootCmd.AddCommand(queryCmd)
  14. // examples
  15. addCounterFlags()
  16. RootCmd.AddCommand(counterCmd)
  17. addKVStoreFlags()
  18. RootCmd.AddCommand(kvstoreCmd)
  19. }

比如KVStore命令如下:

  1. var kvstoreCmd = &cobra.Command{
  2. Use: "kvstore",
  3. Short: "ABCI demo example",
  4. Long: "ABCI demo example",
  5. Args: cobra.ExactArgs(0),
  6. RunE: func(cmd *cobra.Command, args []string) error {
  7. return cmdKVStore(cmd, args)
  8. },
  9. }

当运行abci-cli kdstore的时候,就会执行这个命令,触发cmdKVStore(cmd, args)函数。这个函数具体会在后面建立abci服务端的时候分析。

RootCmd.Execute()

启动abci-cli

这个函数的作用是执行上面添加的命令,但是在执行之前首先会执行RootCmd中的PersistentPreRunE函数。从名字可以看出,这个函数是PreRun的,也就是要先执行。这个函数中比较重要的操作如下:

  1. if client == nil {
  2. var err error
  3. client, err = abcicli.NewClient(flagAddress, flagAbci, false)
  4. if err != nil {
  5. return err
  6. }
  7. client.SetLogger(logger.With("module", "abci-client"))
  8. if err := client.Start(); err != nil {
  9. return err
  10. }
  11. }

这段代码的意思是通过NewClient()函数创建abci-client。NewClient()函数有两个重要参数:

NewClient()函数逻辑如下:

  1. func NewClient(addr, transport string, mustConnect bool) (client Client, err error) {
  2. switch transport {
  3. case "socket":
  4. client = NewSocketClient(addr, mustConnect)
  5. case "grpc":
  6. client = NewGRPCClient(addr, mustConnect)
  7. default:
  8. err = fmt.Errorf("Unknown abci transport %s", transport)
  9. }
  10. return
  11. }

因为选择的是socket,所以会进入NewSocketClient(addr, mustConnect)函数,该函数的逻辑如下:

  1. func NewSocketClient(addr string, mustConnect bool) *socketClient {
  2. cli := &socketClient{
  3. reqQueue: make(chan *ReqRes, reqQueueSize),
  4. flushTimer: cmn.NewThrottleTimer("socketClient", flushThrottleMS),
  5. mustConnect: mustConnect,
  6. addr: addr,
  7. reqSent: list.New(),
  8. resCb: nil,
  9. }
  10. cli.BaseService = *cmn.NewBaseService(nil, "socketClient", cli)
  11. return cli
  12. }

函数首先创建一个socketClient的结构cli,然后将其作为参数传递给cmn.NewBaseService。先看一下socketClient。

  1. type socketClient struct {
  2. cmn.BaseService
  3. reqQueue chan *ReqRes
  4. flushTimer *cmn.ThrottleTimer
  5. mustConnect bool
  6. mtx sync.Mutex
  7. addr string
  8. conn net.Conn
  9. err error
  10. reqSent *list.List
  11. resCb func(*types.Request, *types.Response) // listens to all callbacks
  12. }

这个结构实现了abci/client/Client的所有方法,所以它实现了abci/client/Client接口。同时它还有一个匿名字段cmn.BaseService,BaseService实现了Service接口,所以socketClient也间接实现了 libs/common/service/Service 接口。能够使用其中的Start()方法。

  1. func (bs *BaseService) Start() error {
  2. if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
  3. if atomic.LoadUint32(&bs.stopped) == 1 {
  4. bs.Logger.Error(fmt.Sprintf("Not starting %v -- already stopped", bs.name), "impl", bs.impl)
  5. // revert flag
  6. atomic.StoreUint32(&bs.started, 0)
  7. return ErrAlreadyStopped
  8. }
  9. bs.Logger.Info(fmt.Sprintf("Starting %v", bs.name), "impl", bs.impl)
  10. err := bs.impl.OnStart()
  11. if err != nil {
  12. // revert flag
  13. atomic.StoreUint32(&bs.started, 0)
  14. return err
  15. }
  16. return nil
  17. }
  18. bs.Logger.Debug(fmt.Sprintf("Not starting %v -- already started", bs.name), "impl", bs.impl)
  19. return ErrAlreadyStarted
  20. }

而Start()方法会调用BaseService中的OnStart()方法,这个方法中主要做的就是连接abci服务端并启动两个协程:go cli.sendRequestsRoutine(conn)、go cli.recvResponseRoutine(conn):

  1. func (cli *socketClient) OnStart() error {
  2. if err := cli.BaseService.OnStart(); err != nil {
  3. return err
  4. }
  5. var err error
  6. var conn net.Conn
  7. RETRY_LOOP:
  8. for {
  9. conn, err = cmn.Connect(cli.addr)
  10. if err != nil {
  11. if cli.mustConnect {
  12. return err
  13. }
  14. cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying...", cli.addr), "err", err)
  15. time.Sleep(time.Second * dialRetryIntervalSeconds)
  16. continue RETRY_LOOP
  17. }
  18. cli.conn = conn
  19. go cli.sendRequestsRoutine(conn)
  20. go cli.recvResponseRoutine(conn)
  21. return nil
  22. }
  23. }

sendRequestsRoutine(conn)用来向cli服务器发送请求,recvResponseRoutine()用来处理响应结果。sendRequestsRoutine()逻辑如下,在这个函数中reqres会等待cli.reqQueue信道传来消息,之后会先写入缓冲区中,然后在发送给cliserver。

  1. func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
  2. w := bufio.NewWriter(conn)
  3. for {
  4. select {
  5. case <-cli.flushTimer.Ch:
  6. select {
  7. case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
  8. default:
  9. // Probably will fill the buffer, or retry later.
  10. }
  11. case <-cli.Quit():
  12. return
  13. case reqres := <-cli.reqQueue:
  14. cli.willSendReq(reqres)
  15. err := types.WriteMessage(reqres.Request, w)
  16. if err != nil {
  17. cli.StopForError(fmt.Errorf("Error writing msg: %v", err))
  18. return
  19. }
  20. // cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
  21. if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
  22. err = w.Flush()
  23. if err != nil {
  24. cli.StopForError(fmt.Errorf("Error flushing writer: %v", err))
  25. return
  26. }
  27. }
  28. }
  29. }
  30. }

recvResponseRoutine处理应答的逻辑如下。先从连接中读取应答,出错的话就关闭连接,否则就调用didRecvResponse()处理应答。

  1. func (cli *socketClient) recvResponseRoutine(conn net.Conn) {
  2. r := bufio.NewReader(conn) // Buffer reads
  3. for {
  4. var res = &types.Response{}
  5. err := types.ReadMessage(r, res)
  6. if err != nil {
  7. cli.StopForError(err)
  8. return
  9. }
  10. switch r := res.Value.(type) {
  11. case *types.Response_Exception:
  12. // XXX After setting cli.err, release waiters (e.g. reqres.Done())
  13. cli.StopForError(errors.New(r.Exception.Error))
  14. return
  15. default:
  16. // cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
  17. err := cli.didRecvResponse(res)
  18. if err != nil {
  19. cli.StopForError(err)
  20. return
  21. }
  22. }
  23. }
  24. }
  1. func (cli *socketClient) didRecvResponse(res *types.Response) error {
  2. cli.mtx.Lock()
  3. defer cli.mtx.Unlock()
  4. // Get the first ReqRes
  5. next := cli.reqSent.Front()
  6. if next == nil {
  7. return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))
  8. }
  9. reqres := next.Value.(*ReqRes)
  10. if !resMatchesReq(reqres.Request, res) {
  11. return fmt.Errorf("Unexpected result type %v when response to %v expected",
  12. reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
  13. }
  14. reqres.Response = res // Set response
  15. reqres.Done() // Release waiters
  16. cli.reqSent.Remove(next) // Pop first item from linked list
  17. // Notify reqRes listener if set
  18. if cb := reqres.GetCallback(); cb != nil {
  19. cb(res)
  20. }
  21. // Notify client listener if set
  22. if cli.resCb != nil {
  23. cli.resCb(reqres.Request, res)
  24. }
  25. return nil
  26. }

这样abci-client就启动了。

启动abci服务端

前面说了,当运行abci-cli kdstore的时候,会触发cmdKVStore(cmd, args)函数,这个函数的逻辑如下:

  1. func cmdKVStore(cmd *cobra.Command, args []string) error {
  2. logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))
  3. // Create the application - in memory or persisted to disk
  4. var app types.Application
  5. if flagPersist == "" {
  6. app = kvstore.NewKVStoreApplication()
  7. } else {
  8. app = kvstore.NewPersistentKVStoreApplication(flagPersist)
  9. app.(*kvstore.PersistentKVStoreApplication).SetLogger(logger.With("module", "kvstore"))
  10. }
  11. // Start the listener
  12. srv, err := server.NewServer(flagAddress, flagAbci, app)
  13. if err != nil {
  14. return err
  15. }
  16. srv.SetLogger(logger.With("module", "abci-server"))
  17. if err := srv.Start(); err != nil {
  18. return err
  19. }
  20. // Wait forever
  21. cmn.TrapSignal(func() {
  22. // Cleanup
  23. srv.Stop()
  24. })
  25. return nil
  26. }

这个函数主要做了两件事:

NewKVStoreApplication()逻辑如下,主要是从内存存储 MemDB 结构中获取对应状态,app会以参数的形式传递给NewServer函数用来创建Server。

  1. func NewKVStoreApplication() *KVStoreApplication {
  2. state := loadState(dbm.NewMemDB())
  3. return &KVStoreApplication{state: state}
  4. }

NewServer()主要调用了NewSocketServer(),函数的逻辑如下:

  1. func NewSocketServer(protoAddr string, app types.Application) cmn.Service {
  2. proto, addr := cmn.ProtocolAndAddress(protoAddr)
  3. s := &SocketServer{
  4. proto: proto,
  5. addr: addr,
  6. listener: nil,
  7. app: app,
  8. conns: make(map[int]net.Conn),
  9. }
  10. s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)
  11. return s
  12. }

关键点在最后一行:s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)。BaseService是SocketServer中的一个匿名字段,BaseServcie接口又实现了Service接口,可以调用接口的Start()方法,而Start()方法又会调用OnStart()方法,如下:

  1. func (s *SocketServer) OnStart() error {
  2. if err := s.BaseService.OnStart(); err != nil {
  3. return err
  4. }
  5. ln, err := net.Listen(s.proto, s.addr)
  6. if err != nil {
  7. return err
  8. }
  9. s.listener = ln
  10. go s.acceptConnectionsRoutine()
  11. return nil
  12. }

关键地方在acceptConnectionsRoutine()这个协程,这个协程的作用是在连接中读取请求并向连接中写入应答,逻辑如下:

  1. func (s *SocketServer) acceptConnectionsRoutine() {
  2. for {
  3. // Accept a connection
  4. s.Logger.Info("Waiting for new connection...")
  5. conn, err := s.listener.Accept()
  6. if err != nil {
  7. if !s.IsRunning() {
  8. return // Ignore error from listener closing.
  9. }
  10. s.Logger.Error("Failed to accept connection: " + err.Error())
  11. continue
  12. }
  13. s.Logger.Info("Accepted a new connection")
  14. connID := s.addConn(conn)
  15. closeConn := make(chan error, 2) // Push to signal connection closed
  16. responses := make(chan *types.Response, 1000) // A channel to buffer responses
  17. // Read requests from conn and deal with them
  18. go s.handleRequests(closeConn, conn, responses)
  19. // Pull responses from 'responses' and write them to conn.
  20. go s.handleResponses(closeConn, conn, responses)
  21. // Wait until signal to close connection
  22. go s.waitForClose(closeConn, connID)
  23. }
  24. }

总结

KVStore服务端启动以及和abci-cli交互的过程。


参考


添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注