Hyperledger Fabric源码分析之Gossip


Gossip算法正如它的名字,小道消息只需要有人传播一次,那么过一段时间,所有人都会知道,就像新冠病毒一样,所以Gossip算法也有其它的别名“传染病扩散算法”、“谣言传播算法”等等。在Fabric中Gossip算法用于区块的传播,即共识后将得到的区块广播给组织内和其它组织。本节会介绍Gossip算法的设计原理、数据结构和部分核心源码的实现,例如Push算法、Pull算法等。

1. 设计原理

Gossip协议可以抽象成广播,即发送节点向其它节点传播同样的消息。在节点数量较少的情况下,分布式系统无需实现Gossip协议,仅仅依靠发送节点多次发送消息即可。而当节点数量增加时,简单的广播会导致发送节点资源消耗的线性增长,直至占满带宽/CPU,从而产生较高的传播延迟。Gossip协议本质上就是解决这样的一个问题而被设计存在的。

Gossip协议本质上是平均每个节点的资源消耗,能够保证每个节点资源的消耗只会随着总节点数的增多而缓慢增长。考虑一种Push方案1,每个节点在收到其它节点发送的消息时,立刻随机向其它固定数量的节点转发这个消息,例如向4个随机节点发送这个消息。即便没有仔细的计算,我们依然可以直观的感觉到这个简单的方案是可以满足平均资源消耗的目的的。但是,随着需要广播的数据越来越多,每次广播一个消息,整体来看,宽带的占用提高了4倍。这会导致节点间会充斥着已经被充分广播的旧消息。

Push & Pull & Anti-tropy 机制

上文的简单Push方案没有一个终止机制,随着需要广播的数据越来越多,每次广播一个消息,宽带的占用提高了4倍。这会导致节点间会充斥着已经被充分广播的旧消息。

我们可以考虑使用一个Time-To-Live (TTL)方法来提高Push方案4的效率。简要的来说就是可以通过在每个消息第一次被发送时,为这个消息增加一个TTL域。TTL域最开始被设置一个固定的值,例如7。消息每传播一次,TTL域就会被减少一次。如果一个节点收到TTL域为1的消息时,就直接丢弃这个消息,而不是转发它。

上图中初始的TTL为2,每个节点在收到一次消息后,就转发给随机两个节点。直到所有消息的TTL值耗尽。此时可能存在还没有收到消息的节点。但是当我们增大初始TTL的值时,存在还没有收到消息节点的概率会成指数下降。例如4在100个节点的网络中,每次转发给4个节点,初始化TTL为9时,当所有消息都转发完成后,存在未收到消息节点的概率是\(10^{-6}\)。所以这种增加了TTL的Push方案是可行的。

然而,考虑到消息总共需要转发\(4^9=262144\)次!所以这种终止方式会消耗大量的CPU和带宽。在Fabric项目中,没有使用这种TTL机制,如果一个节点第二次收到相同的消息时,就不会转发这个消息,如下图。这样的设置可以保证一个消息最多被转发\(4*100\)次,大大减少的资源的占用。如果一个节点在Push过程结束后仍然没有收到消息,它会定期的进行Pull操作,即向随机的其它节点拉取最近的消息。

Pull机制让节点可以主动发现新的块。节点会在本地设置一个定时循环发起的请求,每次都随机选择多个本地列表中的其它节点。首先,对每个其它节点发送一个 Hello消息,等待节点回应本地消息的摘要集合 Digest,比如本地区块的高度。在收到每个其它节点返回的摘要后,节点会将本地摘要与这种消息进行对比,得到那些本地没有的消息,然后向其它节点发送一个 Request消息,等待回应具体的消息值 Response,如下图(来自注释)。

Fabric通过Push和定期的Pull操作保证一个消息可以以较低的资源占用和效率,可靠的广播给所有节点。然而,Pull操作中 Digest需要包含自账本创建以来的所有区块摘要,尽管摘要可能仅仅是区块的序号,但随着账本高度的增加,这一步骤会消耗一定的宽带。并且,在大多数情况下,并不需要同步比较旧的区块,所以仅仅需要在内存中存储一定数量的区块缓存即可。Fabric通过设置 peer.gossip.maxBlockCountToStore来设置缓存的最大区块数量,默认是 10

对于那些离线时间较长或者新加入的节点,Fabric使用一种恢复机制,叫做反熵(Anti-entropy)。反熵机制允许节点任意的选择缺少区块编号,向其它节点查询。通过配置,节点可以设置是否开启Anti-entropy机制,例如在稳定的网络中,可以不开启Anti-entropy。如果开启Anti-entropy机制,每隔固定的时间,节点就会观察通道上其它的节点,它会选择一个拥有最高账本的节点,请求连续的缺失区块。在Fabric中,默认查询间隔是 10秒。

主节点选举机制

主节点选举算法的目标是在一组节点中选举出一个节点作为主节点,完成特殊的任务5。在Fabric Gossip算法中,主节点需要负责从排序节点接收区块,然后广播给组织中其它节点。一个简单的选举方式就是推选ID或者网络地址中最小/最大的节点为主节点。但是在群组中的节点可能会随时加入或者退出网络,同时节点间的消息也存在丢失的可能,这就需要一种同步机制来保证主节点尽可能的保证唯一。

下面给出一个简单的主节点选举算法,每个节点在成为主节点后立刻向其它节点发送一个声明"I am leader",同时设置一个定时器来提醒自己定时的发送这个声明。如果一个节点收到来自其它节点的声明,那么就重新设置Follower的定时器和停止Leader定时器。如果节点没有收到来自其它节点的声明并且Follower定时器过期了,那么该节点就推举自己为主节点。Follower定时器和Leader定时器过期时节点的行为是一致的,所以在下面的算法中通过一个条件分支来表达。

LeaderElection(LeaderTime, FollowerTime):
	LeaderAnnouce(LeaderTime)
	While do:
		if receive annouce(msg) && msg.ID >= leader:
			set follower timer with "FollowerTime"
			clear leader timer
			leader = msg.ID
		if leader/follower timer expired:
			LeaderAnnouce(LeaderTime)
LeaderAnnouce(LeaderTime):
	leader = me.ID
	broadcast annouce "I am leader"
	set leader timer with "LeaderTime"

上述算法有一个小问题,就是当在所有节点同时开始算法时或者网络分区消失时,如何避免多次的主节点转换。具体来说,假设节点1、节点2、节点3同时开始算法,它们首先都假定自己是主节点,同时发送自己是主节点的声明。节点3首先受到节点2发送的声明,观察到节点2的ID比自身ID小,所以设置主节点为节点2。然后,节点3又收到节点1的声明,同样的又设置主节点为节点1。

主节点选举算法的目的是从一个群组中选举出一个节点作为主节点,尽管过程中可能出现多个节点,但我们希望这个过程是收敛的,即任意时刻群组中主节点的数量越少越好。在多次主节点转换问题的解决上,有以下两种简单的改进方法:

  • 增加一个Propose阶段,广播自己想要成为主节点的意愿,同时收集其它节点的Propose,最后选择ID最小的节点作为主节点。
  • 随机等待一段时间,即每个想要成为主节点的节点先随机等待一段时间再发送成为主节点的声明,这样可以在概率上保证同一时刻想要成为主节点的节点不会太多。

Fabric采取了第一种方法来解决问题。但实际上,主节点选举算法是分布式系统中基础问题,有很多广泛的研究和解决方案,这里只是做一些微小的分析。

2. 数据结构

Gossip的传输的数据格式被定义在对应的message.proto格式文件 中。这个文件主要是定义了Gossip Service的RPC格式和Gossip Message的数据报文格式。

Gossip Service

// Gossip
service Gossip {

  // GossipStream is the gRPC stream used for sending and receiving messages
  rpc GossipStream (stream Envelope) returns (stream Envelope);

  // Ping is used to probe a remote peer's aliveness
  rpc Ping (Empty) returns (Empty);
}

GossipStream能接收一个流式参数,并返回一个流式值。它们的类型都是Envelope,它是对Gossip Message数据的一种封装,包括了原数据和发送者对其的签名。Ping函数则仅仅用于测试远程节点的可用性。

Gossip Message

下图是Gossip Message主要的数据结构,其中channel指消息所在的通道,tag是指消息传播的策略,例如仅仅组织内传播或者通道内传播。Gossip服务通过这两个域来控制Gossip Message广播的范围,达到链间的数据访问控制,保护用户的隐私。contentGossip的核心数据结构,oneof是protobuf的一个关键字,类似于C语言中的联合体,在oneof关键字定义的数据结构中,共同使用content域。

syntax = "proto3";

message GossipMessage {
    bytes channel = 2;
    Tag tag = 3;
    oneof content {
      ...
    }
}
enum Tag {
    UNDEFINED    = 0;
    EMPTY        = 1;
    ORG_ONLY     = 2;
    CHAN_ONLY    = 3;
    CHAN_AND_ORG = 4;
    CHAN_OR_ORG  = 5;
}

content定义了21种数据结构,理解它们将会加深对Fabric中Gossip服务整体的理解。根据第1节中的分析,其实我们已经可以猜测出很多数据格式的用途。例如:

  • DataMessage数据格式,用于Push协议。
  • GossipHello等4个数据格式,用于对Pull协议。
  • StateInfo等5个数据格式,用于对Anti-tropy抓取旧的区块数据。
  • RemotePvtDataRequest等3个数据格式,用于获取隐私数据。

这里我们并不会详细解释每个数据,但这些消息包括了Gossip模块的所有功能。

// Membership
AliveMessage alive_msg = 5;
MembershipRequest mem_req = 6;
MembershipResponse mem_res = 7;

// Contains a ledger block
DataMessage data_msg = 8;

// Used for push&pull
GossipHello hello = 9;
DataDigest  data_dig = 10;
DataRequest data_req = 11;
DataUpdate  data_update = 12;

// Empty message, used for pinging
Empty empty = 13;

// ConnEstablish, used for establishing a connection
ConnEstablish conn = 14;

// Used for relaying information
// about state
StateInfo state_info = 15;

// Used for sending sets of StateInfo messages
StateInfoSnapshot state_snapshot = 16;

// Used for asking for StateInfoSnapshots
StateInfoPullRequest state_info_pull_req = 17;

//  Used to ask from a remote peer a set of blocks
RemoteStateRequest state_request = 18;

// Used to send a set of blocks to a remote peer
RemoteStateResponse state_response = 19;

// Used to indicate intent of peer to become leader
LeadershipMessage leadership_msg = 20;

// Used to learn of a peer's certificate
PeerIdentity peer_identity = 21;

Acknowledgement ack = 22;

// Used to request private data
RemotePvtDataRequest privateReq = 23;

// Used to respond to private data requests
RemotePvtDataResponse privateRes = 24;

// Encapsulates private data used to distribute
// private rwset after the endorsement
PrivateDataMessage private_data = 25;

3. 实现

Gossip组件在Fabric中的作用包括了组织内区块数据交换和私有数据的分发。为了完成组织内区块数据交换的功能,首先需要在组织内利用主节点选举机制选出主节点,然后主节点负责与排序节点通信,获取区块。最后,通过数据传播算法(包括了Push、Pull和Anti-tropy)将数据广播给组织内其它所有节点。为了完成私有数据的分发,背书节点在执行完成链码后,将私有数据广播给自己所在的组织。

节点通信

Gossip协议通信实现并不能保证节点间的通信能达到,是一种尽力而为的通信方式。Gossip通信支撑了几乎Gossip协议的所有基础功能,包括服务发现、私有数据传输和区块广播等。

本地节点和远程节点间通信使用了gRPC来定义数据发送的格式:

// Gossip
service Gossip {
    // GossipStream is the gRPC stream used for sending and receiving messages
    rpc GossipStream (stream Envelope) returns (stream Envelope);
    // Ping is used to probe a remote peer's aliveness
    rpc Ping (Empty) returns (Empty);
}

Ping是用来测试远程节点是否存活的函数,而GossipStream则是实际用来传输数据的函数。
当本地节点首次向远程节点发送消息时,本地节点会建立并维护一个gRPC连接,维护gRPC链接的数据结构在gossip/comm/conn.go:connectionStore 中。同样的,如果接收到了来自远程节点的连接,也会在connectionStore中维护这个连接。这个数据结构保证了针对每一个远程节点同时只会存在一份stream

type connectionStore struct {
	...
    pki2Conn         map[string]*connection // connection 是对 stream 读写的封装。
        ...
}

在建立起与远程节点的连接后,对每一个connection,会调用serviceConnection 函数来运行两个协程,分别负责从stream中读写数据。

func (conn *connection) serviceConnection() error {
    ...
    go conn.readFromStream(errChan, msgChan)
    go conn.writeToStream()
    ...
}

Comm 接口主要实现几种与远程节点通信的方式,它们的实现并不复杂,这里着重解释一下其中的订阅机制,即从远程节点接收到消息后如何出理。订阅机制的主要实现在demux.go 文件中。对外的接口则是Accept 函数接口。如下所示,它接受一个谓词common.MessageAcceptor来匹配是否是需要的消息,并返回一个Channel来供上层应用接收信息。

type Comm interface {
    Accept(common.MessageAcceptor) <-chan protoext.ReceivedMessage
}

在具体实现上,会将收到的谓词加入到ChannelDeMultiplexer结构体中,然后等待消息。
对所有的接收到的消息,相应的Connection会先检查其是否为ack类型的消息,然后再将消息发布到ChannelDeMultiplexer结构体中进行检查。具体实现在comm_impl.go 中。

h := func(m *protoext.SignedGossipMessage) {
    c.logger.Debug("Got message:", m)
    c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
    conn:                conn,
    SignedGossipMessage: m,
    connInfo:            connInfo,
    })
}
conn.handler = interceptAcks(h, connInfo.ID, c.pubSub)

在消息到达ChannelDeMultiplexer结构体后,消息会轮询所有收集到的MessageAcceptor谓词,然后将满足谓词条件的消息发送到对应的Channel中。

for _, ch := range channels {
	...
    if ch.pred(msg) { // 判断是否满足谓词
        select {
            case <-m.stopCh: // 判断是否停止
                m.deMuxInProgress.Done()
                return // stopping
            case ch.ch <- msg: // 将谓词发送给对应的Channel
        } 
    }
    ...
}

区块数据传播

区块数据传播的目的是将一个区块在一个组织内传播起来。首先,一个组织内选择主节点,主节点负责连接排序节点,然后将从排序节点获得的区块,通过Gossip协议,广播给组织内其它所有节点。当账本通道Channel建立后,不同组织的节点需要互相通信,例如广播私有数据、身份信息等。这时需要依赖于anchor node节点的帮助。在账本通道建立的时刻,必须至少配置一个anchor node在通道初始化的配置交易里6 。这样通过定时连接anchor node中继节点可以获取其它组织内节点的信息,随着时间的延长,所有节点都会得到一个全局统一的节点视图。

主节点选举

选择一个主节点的算法已经在第1 节中进行了介绍,它的实现在gossip/election包内。创建主节点选举功能的代码在NewLeaderElectionService 中。Fabric中每一个的Channel的创建都需要初始化一个新的主节点选举。这里就不多做介绍,主要是指出一下节点在当选主节点后需要从排序节点收取数据。

注意go语言中的Channel是一个用来协程之间共享数据的关键字,而Fabric中的Channel是指多个组织中形成的账本通道。

func NewLeaderElectionService(adapter LeaderElectionAdapter, id string, callback leadershipCallback, config ElectionConfig) LeaderElectionService {
    ...
}

adapter是指Gossip服务本身,可以负责在组织内节点间进行数据传播,而leadershipCallback则是任何一个节点在当选主节点或者停止当选主机节点后都会调用的函数。该函数在onStatusChangeFactory 中给出。

func (g *GossipService) onStatusChangeFactory(channelID string, committer blocksprovider.LedgerInfo) func(bool) {
	return func(isLeader bool) {
		if isLeader {
			yield := func() {
				g.lock.RLock()
				le := g.leaderElection[channelID]
				g.lock.RUnlock()
				le.Yield()
			}
			logger.Info("Elected as a leader, starting delivery service for channel", channelID)
			if err := g.deliveryService[channelID].StartDeliverForChannel(channelID, committer, yield); err != nil {
				logger.Errorf("Delivery service is not able to start blocks delivery for chain, due to %+v", err)
			}
		} else {
			logger.Info("Renounced leadership, stopping delivery service for channel", channelID)
			if err := g.deliveryService[channelID].StopDeliverForChannel(channelID); err != nil {
				logger.Errorf("Delivery service is not able to stop blocks delivery for chain, due to %+v", err)
			}
		}
	}
}

可以比较清晰的观察到当节点当选为主节点后,startDeliverForChannel开始为当前通道收取区块数据,stopDeliverForChannel则是结束从通道收取区块数据。

Push

Push操作比较简单,涉及到的数据结构仅仅包括了DataMessage,其定义如下:

syntax = "proto3";
// DataMessage is the message that contains a block
message DataMessage {
    Payload payload = 1;
}
// Payload contains a block
message Payload {
  uint64 seq_num              = 1;
  bytes data                  = 2;
  repeated bytes private_data = 3;
}

DataMessage只有一个域Payload,Payload中包含了一个区块号,序列化后的区块数据和私有数据三个域。处理DataMessage的函数在HandleMessage 中。跳过一大段验证的逻辑,当接收到的msgDataMessage时,它会做以下操作:

added = gc.blockMsgStore.Add(msg.GetGossipMessage())
if added {
	gc.logger.Debugf("Adding %v to the block puller", msg.GetGossipMessage())
	gc.blocksPuller.Add(msg.GetGossipMessage())
}
...
if added {
	// Forward the message
	gc.Forward(msg)
	// DeMultiplex to local subscribers
	gc.DeMultiplex(m)
}

blockMsgStore是一个带时间过期的缓存结构,它会在其中的消息大小超过最大的消息容量或者消息TTL到期时,移除那些旧消息。当移除旧消息时,blockMsgStore会使用一个回调函数,将这个旧消息从blocksPuller中删除。这可以达到Pull算法始终只计算最新的缓存消息。Forward函数正如它的名字,转发它的消息参数,但它不会立即对消息进行转发,而是等待一段时间或者缓冲区满时,统一随机选择节点进行转发。DeMultiplex函数在上一小节中已经介绍,这里不再赘述。

Pull

Pull算法的实现被解耦成两个部分gossip/gossip/algo/pull.gogossip/gossip/pull/pullstore.gopull.go中主要是算法流程的时间,而pullstore负责处理数据格式等具体事务。值得注意的是,一个区块的Digest摘要就是指它的区块号。

Anti-tropy

为了介绍Anti-tropy的实现,首先要解释一下Gossip模块是如何与Ledger存储交互的。主要的代码在gossip/state包内。当一个节点收到一个区块,它会先观察到这个区块是否是比账本高度正好高一个的区块,如果是,就立刻进行存储,如果不是,就先缓存在PayloadsBuffer 这个数据结构中,等待后面用到,这个功能在Push 函数中被实现。

func (b *PayloadsBufferImpl) Push(payload *proto.Payload) {
	...
        seqNum := payload.SeqNum
	if seqNum < b.next || b.buf[seqNum] != nil {
		b.logger.Debugf("Payload with sequence number = %d has been already processed", payload.SeqNum)
		return
	}
	b.buf[seqNum] = payload
	// Send notification that next sequence has arrived
	if seqNum == b.next && len(b.readyChan) == 0 {
		b.readyChan <- struct{}{} // 发送已经准备好提交的信号
	}
}

在准备好区块数据后,以协程运行的 deliverPayloads 函数会将区块提交到账本上。

func (s *GossipStateProviderImpl) deliverPayloads() {
	for {
		select {
		// Wait for notification that next seq has arrived
		case <-s.payloads.Ready():
                        ...
                        err := s.commitBlock(rawBlock, p)
		case <-s.stopCh:
			return
		}
}

antiEntropy 函数也是以协程的方式运行,如下面的代码。它会根据StateCheckInterval配置定期的检查其它所有节点的区块高度,然后自身的区块不是最新的,就向其它区块请求最高高度区块和自身区块中的所有区块。

func (s *GossipStateProviderImpl) antiEntropy() {
	defer s.logger.Debug("State Provider stopped, stopping anti entropy procedure.")

	for {
		select {
		case <-s.stopCh:
			return
		case <-time.After(s.config.StateCheckInterval):
			ourHeight, err := s.ledger.LedgerHeight()
			if err != nil {
				// Unable to read from ledger continue to the next round
				s.logger.Errorf("Cannot obtain ledger height, due to %+v", errors.WithStack(err))
				continue
			}
			if ourHeight == 0 {
				s.logger.Error("Ledger reported block height of 0 but this should be impossible")
				continue
			}
			maxHeight := s.maxAvailableLedgerHeight()
			if ourHeight >= maxHeight {
				continue
			}

			s.requestBlocksInRange(uint64(ourHeight), uint64(maxHeight)-1)
		}
	}
}

私有数据传播

私有数据的传递只有两个操作,一个背书节点在检查peer节点发送的proposal时,会将产生的私有数据通过DistributePrivateData 函数,根据链码的策略广播给所有相关节点。另外一个是节点可以定时的通过reconcile 函数从其它节点拉取私有数据。

func (r *Reconciler) run() {
	for {
		select {
		case <-r.stopChan:
			return
		case <-time.After(r.ReconcileSleepInterval):
			r.logger.Debug("Start reconcile missing private info")
			if err := r.reconcile(); err != nil {
				r.logger.Error("Failed to reconcile missing private info, error: ", err.Error())
			}
		}
	}
}

节点通过fetchPrivateData 函数调用scatterRequestsgatherResponses两个函数分别发布获取私有数据的请求和接受私有数据的请求。

func (p *puller) fetchPrivateData(dig2Filter digestToFilterMapping) (*privdatacommon.FetchedPvtDataContainer, error) {
    ...
    subscriptions := p.scatterRequests(peer2digests)
    responses := p.gatherResponses(subscriptions)
    ...
}

对于来自其它节点的请求,节点在handle函数中分别对RemotePvtDataRequestRemotePvtDataResponse两种类型的消息进行处理。

func (p *puller) listen() {
	for {
		select {
		case <-p.stopChan:
			return
		case msg := <-p.msgChan:
			if msg == nil {
				// comm module stopped, hence this channel
				// closed
				return
			}
			if msg.GetGossipMessage().GetPrivateRes() != nil {
				p.handleResponse(msg)
			}
			if msg.GetGossipMessage().GetPrivateReq() != nil {
				p.handleRequest(msg)
			}
		}
	}
}

4. 小结

Gossip模块是Fabric运行中节点通信的重要组成部分,主要有两个功能,一是从orderer节点接收区块并传播,二是通过背书节点对私有数据进行传播。Gossip通过主节点选取、Push&Pull机制、反熵机制、缓存机制等保证了可以高效的传播数据。本节不仅介绍了Gossip模块的设计原理,还分析了重要数据结构和代码实现,帮助我们更好的理解Gossip模块的与逆行原理。

Gossip协议是一个非常经典的分布式协议,它保证广播操作的可扩展性,不会因为节点数量增多而降低效率,已经应用在包括分布式数据库、区块链等多个领域。同时,Gossip协议在学术上也存在广泛的研究。本文只是对Fabric中Gossip模块做出简要的分析,任何错误或者不完善的内容都非常欢迎被指出,我将及时修改。

5 参考内容

[1] Randomized Rumor Spreading https://ieeexplore.ieee.org/abstract/document/892324/

[2] Hyperledger fabric dissemination network https://jira.hyperledger.org/secure/attachment/10049/gossipArch.pdf

[3] Gossip data dissemination protocol https://hyperledger-fabric.readthedocs.io/vi/latest/gossip.html

[4] Fair and Efficient Gossip in Hyperledger Fabric https://arxiv.org/pdf/2004.07060

[5] Why multicast protocols (don't) scale: an analysis of multipoint algorithms for scalable group communication, chapter 05 https://thesis.library.caltech.edu/3236/

[6] Architecture Reference Docs: Gossip data dissemination protocol https://hyperledger-fabric.readthedocs.io/en/latest/gossip.html


声明

本内容并非是面向区块链初学者的,而是假设阅读者已经了解区块链和Hyperledger fabric的基本原理(文档)。
本内容的Hyperledger Fabric的源码剖析如果没有特殊说明,版本为release 2.4,使用Goland开发工具对代码进行追踪。
转载需要注明出处。