【比原链】如何从比原节点拿到区块数据?

作者:freewind
在前一篇中,我们已经知道如何连上一个比原节点的p2p端口,并与对方完成身份验证。此时,双方结点已经建立起来了信任,并且连接也不会断开,下一步,两者就可以继续交换数据了。

那么,我首先想到的就是,如何才能让对方把它已有的区块数据全都发给我呢?

这其实可以分为三个问题:

  1. 我需要发给它什么样的数据?
  2. 它在内部由是如何应答的呢?
  3. 我拿到数据之后,应该怎么处理?

由于这一块的逻辑还是比较复杂的,所以在本篇我们先回答第一个问题:

我们要发送什么样的数据请求,才能让比原节点把它持有的区块数据发给我?

找到发送请求的代码

首先我们先要在代码中定位到,比原到底是在什么时候来向对方节点发送请求的。

在前一篇讲的是如何建立连接并验证身份,那么发出数据请求的操作,一定在上次的代码之后。按照这个思路,我们在SyncManager类中Switch启动之后,找到了一个叫BlockKeeper的类,相关的操作是在它里面完成的。

下面是老规矩,还是从启动开始,但是会更简化一些:

cmd/bytomd/main.go#L54

func main() {
    cmd := cli.PrepareBaseCmd(commands.RootCmd, "TM", os.ExpandEnv(config.DefaultDataDir()))
    cmd.Execute()
}

cmd/bytomd/commands/run_node.go#L41

func runNode(cmd *cobra.Command, args []string) error {
    n := node.NewNode(config)
    if _, err := n.Start(); err != nil {
        // ...
}

node/node.go#L169

func (n *Node) OnStart() error {
    // ...
    n.syncManager.Start()
    // ...
}

netsync/handle.go#L141

func (sm *SyncManager) Start() {
    go sm.netStart()
    // ...
    go sm.syncer()
}

注意sm.netStart(),我们在一篇中建立连接并验证身份的操作,就是在它里面完成的。而这次的这个问题,是在下面的sm.syncer()中完成的。

另外注意,由于这两个函数调用都使用了goroutine,所以它们是同时进行的。

sm.syncer()的代码如下:

netsync/sync.go#L46

func (sm *SyncManager) syncer() {
    sm.fetcher.Start()
    defer sm.fetcher.Stop()

    // ...
    for {
        select {
        case <-sm.newPeerCh:
            log.Info("New peer connected.")
            // Make sure we have peers to select from, then sync
            if sm.sw.Peers().Size() < minDesiredPeerCount {
                break
            }
            go sm.synchronise()
            // ..
    }
}

这里混入了一个叫fetcher的奇怪的东西,名字看起来好像是专门去抓取数据的,我们要找的是它吗?

可惜不是,fetcher的作用是从多个peer那里拿到了区块数据之后,对数据进行整理,把有用的放到本地链上。我们在以后会研究它,所以这里不展开讨论。

接着是一个for循环,当发现通道newPeerCh有了新数据(也就是有了新的节点连接上了),会判断一下当前自己连着的节点是否够多(大于等于minDesiredPeerCount,值为5),够多的话,就会进入sm.synchronise(),进行数据同步。

这里为什么要多等几个节点,而不是一连上就马上同步呢?我想这是希望有更多选择的机会,找到一个数据够多的节点。

sm.synchronise()还是属于SyncManager的方法。在真正调用到BlockKeeper的方法之前,它还做了一些比如清理已经断开的peer,找到最适合同步数据的peer等。其中“清理peer”的工作涉及到不同的对象持有的peer集合间的同步,略有些麻烦,但对当前问题帮助不大,所以我打算把它们放在以后的某个问题中回答(比如“当一个节点断开了,比原会有什么样的处理”),这里就先省略。

sm.synchronise()代码如下:

netsync/sync.go#L77

func (sm *SyncManager) synchronise() {
    log.Info("bk peer num:", sm.blockKeeper.peers.Len(), " sw peer num:", sm.sw.Peers().Size(), " ", sm.sw.Peers().List())
    // ...
    peer, bestHeight := sm.peers.BestPeer()
    // ...
    if bestHeight > sm.chain.BestBlockHeight() {
        // ...
        sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)
    }
}

可以看到,首先是从众多的peers中,找到最合适的那个。什么叫Best呢?看一下BestPeer()的定义:

netsync/peer.go#L266

func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
    // ...
    for _, p := range ps.peers {
        if bestPeer == nil || p.height > bestHeight {
            bestPeer, bestHeight = p.swPeer, p.height
        }
    }
    return bestPeer, bestHeight
}

其实就是持有区块链数据最长的那个。

找到了BestPeer之后,就调用sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)方法,从这里,正式进入BlockKeeper -- 也就是本文的主角 -- 的世界。

BlockKeeper

blockKeeper.BlockRequestWorker的逻辑比较复杂,它包含了:

  1. 根据自己持有的区块数据来计算需要同步的数据
  2. 向前面找到的最佳节点发送数据请求
  3. 拿到对方发过来的区块数据
  4. 对数据进行处理
  5. 广播新状态
  6. 处理各种出错情况,等等

由于本文中只关注“发送请求”,所以一些与之关系不大的逻辑我会忽略掉,留待以后再讲。

在“发送请求”这里,实际也包含了两种情形,一种简单的,一种复杂的:

  1. 简单的:假设不存在分叉,则直接检查本地高度最高的区块,然后请求下一个区块
  2. 复杂的:考虑分叉的情况,则当前本地的区块可能就存在分叉,那么到底应该请求哪个区块,就需要慎重考虑

由于第2种情况对于本文来说过于复杂(因为需要深刻理解比原链中分叉的处理逻辑),所以在本文中将把问题简化,只考虑第1种。而分叉的处理,将放在以后讲解。

下面是把blockKeeper.BlockRequestWorker中的代码简化成了只包含第1种情况:

netsync/block_keeper.go#L72

func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
    num := bk.chain.BestBlockHeight() + 1
    reqNum := uint64(0)
    reqNum = num
    // ...
    bkPeer, ok := bk.peers.Peer(peerID)
    swPeer := bkPeer.getPeer()
    // ...
    block, err := bk.BlockRequest(peerID, reqNum)
    // ...
}

在这种情况下,我们可以认为bk.chain.BestBlockHeight()中的Best,指的是本地持有的不带分叉的区块链高度最高的那个。(需要提醒的是,如果存在分叉情况,则Best不一定是高度最高的那个)

那么我们就可以直接向最佳peer请求下一个高度的区块,它是通过bk.BlockRequest(peerID, reqNum)实现的:

netsync/block_keeper.go#L152

func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
    var block *types.Block

    if err := bk.blockRequest(peerID, height); err != nil {
        return nil, errReqBlock
    }

    // ...

    for {
        select {
        case pendingResponse := <-bk.pendingProcessCh:
            block = pendingResponse.block
            // ...
            return block, nil
        // ...
        }
    }
}

在上面简化后的代码中,主要分成了两个部分。一个是发送请求bk.blockRequest(peerID, height),这是本文的重点;它下面的for-select部分,已经是在等待并处理对方节点的返回数据了,这部分我们今天先略过不讲。

bk.blockRequest(peerID, height)这个方法,从逻辑上又可以分成两部分:

  1. 构造出请求的信息
  2. 把信息发送给对方节点

构造出请求的信息

bk.blockRequest(peerID, height)经过一连串的方法调用之后,使用height构造出了一个BlockRequestMessage对象,代码如下:

netsync/block_keeper.go#L148

func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
    return bk.peers.requestBlockByHeight(peerID, height)
}

netsync/peer.go#L332

func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
    peer, ok := ps.Peer(peerID)
    // ...
    return peer.requestBlockByHeight(height)
}

netsync/peer.go#L73

func (p *peer) requestBlockByHeight(height uint64) error {
    msg := &BlockRequestMessage{Height: height}
    p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
    return nil
}

到这里,终于构造出了所需要的BlockRequestMessage,其实主要就是把height告诉peer。

然后,通过PeerTrySend()把该信息发出去。

发送请求

TrySend中,主要是通过github.com/tendermint/go-wire库将其序列化,再发送给对方。看起来应该是很简单的操作吧,先预个警,还是挺绕的。

当我们进入TrySend()后:

p2p/peer.go#L242

func (p *Peer) TrySend(chID byte, msg interface{}) bool {
    if !p.IsRunning() {
        return false
    }
    return p.mconn.TrySend(chID, msg)
}

发现它把锅丢给了p.mconn.TrySend方法,那么mconn是什么?chID又是什么?

mconnMConnection的实例,它是从哪儿来的?它应该在之前的某个地方初始化了,否则我们没法直接调用它。所以我们先来找到它初始化的地方。

经过一番寻找,发现原来是在前一篇之后,即比原节点与另一个节点完成了身份验证之后,具体的位置在Switch类启动的地方。

我们这次直接从SwtichOnStart作为起点:

p2p/switch.go#L186

func (sw *Switch) OnStart() error {
    //...
    // Start listeners
    for _, listener := range sw.listeners {
        go sw.listenerRoutine(listener)
    }
    return nil
}

p2p/switch.go#L498

func (sw *Switch) listenerRoutine(l Listener) {
    for {
        inConn, ok := <-l.Connections()
        // ...
        err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
        // ...
    }
}

p2p/switch.go#L645

func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
    // ...
    peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
    // ...
}

p2p/peer.go#L87

func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
    return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
}

p2p/peer.go#L91

func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
    conn := rawConn
    // ...
    if config.AuthEnc {
        // ...
        conn, err = MakeSecretConnection(conn, ourNodePrivKey)
        // ...
    }

    // Key and NodeInfo are set after Handshake
    p := &Peer{
        outbound: outbound,
        conn:     conn,
        config:   config,
        Data:     cmn.NewCMap(),
    }

    p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig)

    p.BaseService = *cmn.NewBaseService(nil, "Peer", p)

    return p, nil
}

终于找到了。上面方法中的MakeSecretConnection就是与对方节点交换公钥并进行身份验证的地方,下面的p.mconn = createMConnection(...)就是创建mconn的地方。

继续进去:

p2p/peer.go#L292

func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
    onReceive := func(chID byte, msgBytes []byte) {
        reactor := reactorsByCh[chID]
        if reactor == nil {
            if chID == PexChannel {
                return
            } else {
                cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
            }
        }
        reactor.Receive(chID, p, msgBytes)
    }

    onError := func(r interface{}) {
        onPeerError(p, r)
    }

    return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
}

原来mconnMConnection的实例,它是通过NewMConnectionWithConfig创建的。

看了上面的代码,发现这个MConnectionWithConfig与普通的net.Conn并没有太大的区别,只不过是当收到了对方发来的数据后,会根据指定的chID调用相应的ReactorReceive方法来处理。所以它起到了将数据分发给Reactor的作用。

为什么需要这样的分发操作呢?这是因为,在比原中,节点之间交换数据,有多种不同的方式:

  1. 一种是规定了详细的数据交互协议(比如有哪些信息类型,分别代表什么意思,什么情况下发哪个,如何应答等),在ProtocolReactor中实现,它对应的chIDBlockchainChannel,值为byte(0x40)
  2. 另一种使用了与BitTorrent类似的文件共享协议,叫PEX,在PEXReactor中实现,它对应的chIDPexChannel,值为byte(0x00)

所以节点之间发送信息的时候,需要知道对方发过来的数据对应的是哪一种方式,然后转交给相应的Reactor去处理。

在比原中,前者是主要的方式,后者起到辅助作用。我们目前的文章中涉及到的都是前者,后者将在以后专门研究。

p.mconn.TrySend

当我们知道了p.mconn.TrySend中的mconn是什么,并且在什么时候初始化以后,下面就可以进入它的TrySend方法了。

p2p/connection.go#L243

func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
    // ...
    channel, ok := c.channelsIdx[chID]
    // ...
    ok = channel.trySendBytes(wire.BinaryBytes(msg))
    if ok {
        // Wake up sendRoutine if necessary
        select {
        case c.send <- struct{}{}:
        default:
        }
    }

    return ok
}

可以看到,它找到相应的channel后(在这里应该是ProtocolReactor对应的channel),调用channel的trySendBytes方法。在发送数据的时候,使用了github.com/tendermint/go-wire库,将msg序列化为二进制数组。

p2p/connection.go#L602

func (ch *Channel) trySendBytes(bytes []byte) bool {
    select {
    case ch.sendQueue <- bytes:
        atomic.AddInt32(&ch.sendQueueSize, 1)
        return true
    default:
        return false
    }
}

原来它是把要发送的数据,放到了该channel对应的sendQueue中,交由别人来发送。具体是由谁来发送,我们马上要就找到它。

细心的同学会发现,Channel除了trySendBytes方法外,还有一个sendBytes(在本文中没有用上):

p2p/connection.go#L589

func (ch *Channel) sendBytes(bytes []byte) bool {
    select {
    case ch.sendQueue <- bytes:
        atomic.AddInt32(&ch.sendQueueSize, 1)
        return true
    case <-time.After(defaultSendTimeout):
        return false
    }
}

它们两个的区别是,前者尝试把待发送数据bytes放入ch.sendQueue时,如果能放进去,则返回true,否则马上失败,返回false,所以它是非阻塞的。而后者,如果放不进去(sendQueue已满,那边还没处理完),则等待defaultSendTimeout(值为10秒),然后才会失败。另外,sendQueue的容量默认为1

到这里,我们其实已经知道比原是如何向其它节点请求区块数据,以及何时把信息发送出去。

本想在本篇中就把真正发送数据的代码也一起讲了,但是发现它的逻辑也相当复杂,所以就另开一篇讲吧。

再回到本文问题,再强调一下,我们前面说了,对于向peer请求区块数据,有两种情况:一种是简单的不考虑分叉的,另一种是复杂的考虑分叉的。在本文只考虑了简单的情况,在这种情况下,所谓的bestHeight就是指的最高的那个区块的高度,而在复杂情况下,它就不一定了。这就留待以后我们再详细讨论,本文的问题就算是回答完毕了。

原文地址:http://blog.51cto.com/13794581/2130230

时间: 2024-10-10 20:07:56

【比原链】如何从比原节点拿到区块数据?的相关文章

剥开比原看代码05:如何从比原节点拿到区块数据?

作者:freewind 比原项目仓库: Github地址:https://github.com/Bytom/bytom Gitee地址:https://gitee.com/BytomBlockchain/bytom 在前一篇中,我们已经知道如何连上一个比原节点的p2p端口,并与对方完成身份验证.此时,双方结点已经建立起来了信任,并且连接也不会断开,下一步,两者就可以继续交换数据了. 那么,我首先想到的就是,如何才能让对方把它已有的区块数据全都发给我呢? 这其实可以分为三个问题: 我需要发给它什么

【比原链】如何连上一个比原节点

作者:freewind 在上一篇我们已经知道了比原是如何监听节点的p2p端口,本篇就要继续在上篇中提到的问题:我们如何成功的连接上比原的节点,并且通过身份验证,以便后续继续交换数据? 在上一篇中,我们的比原节点是以solonet这个chain_id启动的,它监听的是46658端口.我们可以使用telnet连上它: $ telnet localhost 46658 Trying 127.0.0.1... Connected to localhost. Escape character is '^]

浅析Facebook LibraBFT与比原链Bystack BBFT共识

如果说什么是区块链的灵魂,那一定是共识机制. 它是区块链的根基.无论公链或是联盟链,共识机制都从基础上限制了区块链的交易处理能力和扩展性. 2019年6月18日,Facebook 发布了自己 Libra 项目的白皮书,引发广泛关注.作为 Facebook 试图创造国际流通数字货币的重要项目,Libra 区块链采用的是 LibraBFT 共识机制,是一个为 Libra 设计的鲁棒的高效的状态复制系统.它基于一种新型的 BFT 共识算法,HotStuff. 就在 Facebook Libra 项目白

社区观点 | 理解比原链MOV链上交换协议

去中心化交换协议的发展 从Bitshare,Stellar到以太坊上的Etherdelta,Bancor,0x协议,去中心化交换协议也经过了好几代发展和很多模式的探索,每一代都通过前面的协议的痛点来进行改进和深化, 主要分为: 链上orderbook,链上结算; 链下orderbook,链上结算; 基于智能合约管理的资金池; 链上orderbook,链上结算 最早的 基于以太坊的去中心化交换协议的成功探索非Etherdelta莫属,曾一度占据去中心化交换市场的半壁江山.Etherdelta是较为

比原链设计思考: 扩展性UTXO模型

用户模型是比原链在最初就需要确定的重要数据结构, 团队的选择还是聚焦在两种典型的模型系统中,Account模型和UTXO模型,和其他大多数区块链设计一样, 选择了模型就决定了协议层的重要实现,两种模型各有利弊,不同区块链针对想聚焦的场景自身会有判断. UTXO 的起源(来自高明的中本聪) 中本聪对比特币的设计,让整个世界进入了数字货币时代.比特币起源于中本聪,UTXO出自比特币.自然,UTXO来自高明的中本聪.UTXO的优点: 在版本控制方面的考虑,svn 是中心化的数据库保持一份账本,这和区块

图解比原链Tensority算法:如何让POW做到人工智能友好

共识算法说起 区块链系统首先是分布式系统,而一致性是分布式系统的基础问题,要保证系统满足不同程度的一致性,则就要用到共识算法. 现在主流的算法有POW.POS.DPOS等等,比特币采用的POW共识算法运行9年之久,已被证明稳定可靠,然而因为巨大的硬件和能源消耗而饱受诟病,特别是专用矿机,在被淘汰之后就变成了废铁. POS和DPOS为了避免资源的浪费,直接采取抛弃计算的方式,通过持有证明和选举来进行共识,牺牲了一定准入性和去中心化.而比原链从另一个角度来切入和解决POW资源浪费的问题. 比原链共识

终于把区块链与物流的关联搞清楚了 区块链的分类 物流业中区块链技术应用的案例

原文 区块链的分类 区块链可以分为三类:公有链.私有链.联盟链. 1.公有链无官方组织及管理机构,无中心服务器,参与的节点按照系统规则自由接入网络.不受控制,节点间基于共识机制开展工作. 2.私有链建立在某个企业内部,系统的运作规则根据企业要求进行设定,修改甚至是读取权限仅限于少数节点,同时仍保留着区块链的真实性和部分去中心化的特性. 3.联盟链由若干机构联合发起,介于公有链和私有链之间,兼具部分去中心化的特性. 去中心化是区块链的最重要特征 区块链技术提供了一种无需信任单个节点.还能创建共识网

【区块链Go语言实现】第一部分:区块链基本原型

0x00 介绍 区块链(Blockchain)是21世纪最具革命性的技术之一,目前它仍处于逐渐成熟阶段,且其发展潜力尚未被完全意识到.从本质上讲,区块链只是一种记录的分布式数据库.但它之所以独特,是因为它并不是一个私有的数据库,而是一个公共数据库,也就是说,每个使用它的人都有一份完整或部分的数据副本.并且,只有在数据库的其他持有者同意的情况下,才可以向区块链中添加新的记录.此外,正是区块链使得加密货币和智能合约成为可能. 在本系列文章中,我们将基于区块链构建一种简单的加密货币. 0x01 区块

区块链的相关名词有哪些?星光区块链

区块链的相关名词有哪些?随着区块链这个名词越来越火,现在有很多人都去了解区块链技术是怎样的?但有一些人因为不知道它相关名词的意思,陷入一个似懂非懂的状态.为了使大家更加深入的了解区块链技术,下面就让星光区块链来告诉大家一些常用的区块链的相关名词有哪些?它们是什么意思?-:半衰期来源于比特币的激励机制,即为矿工每验证一个区块即可得到的奖励.从最开始的50BTC,每四年减半,目前是12.5BTC,已经历过2次半衰(50BTC-->25BTC-->12.5BTC). 二:.分叉指向同一个父块的2个区