Fabric 1.4 源码分析 区块同步
本文主要从源码层面介绍 fabric peer 同步区块过程, peer 同步区块主要有 2 个过程:
1)peer 组织的 leader 与 orderer 同步区块
2)peer 组织间 peer 同步区块.
1. peer leader 和 orderer 同步区块
首先, orderer 对外主要是 broadcast 和 deliver 两个服务 orderer 服务介绍. 并且我们知道 peer 和 orderer 同步区块肯定是 deliver 服务实现的, 但是到底是 peer 从 orderer 拉还是 ordrer 推送给 peer 呢? 由于 peer 可以知道 orderer 信息 (配置块) 并且是 grpc 服务, 则推断是 peer 从 orderer 拉区块. 如果是拉区块, 那么 peer 如何获取区块, 获取区块的方式是什么?
1.1 Orderer Deliver 服务
首先, 查看 orderer deliver 服务是怎么运行的, 是如何同步区块的.
当 deliver 服务被调用时, 转到 Handle()方法处理
- func (h *Handler) Handle(ctx context.Context, srv *Server) error {
- ...
- for {
- logger.Debugf("Attempting to read seek info message from %s", addr)
- // 接受发来 envelope
- envelope, err := srv.Recv()
- ...
- // 分发区块
- status, err := h.deliverBlocks(ctx, srv, envelope)
- ...
- }
- }
其中, srv.Recv()接收 envelope, 在根据 envelope 信息分发 block.
- func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) (status cb.Status, err error) {
- addr := util.ExtractRemoteAddress(ctx)
- payload, err := utils.UnmarshalPayload(envelope.Payload)
- if payload.Header == nil {
- logger.Warningf("Malformed envelope received from %s with bad header", addr)
- return cb.Status_BAD_REQUEST, nil
- }
- chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
- err = h.validateChannelHeader(ctx, chdr)
- chain := h.ChainManager.GetChain(chdr.ChannelId)
- defer func() {
- labels := append(labels, "success", strconv.FormatBool(status == cb.Status_SUCCESS))
- h.Metrics.RequestsCompleted.With(labels...).Add(1)
- }()
- accessControl, err := NewSessionAC(chain, envelope, srv.PolicyChecker, chdr.ChannelId, crypto.ExpiresAt)
- if err != nil {
- logger.Warningf("[channel: %s] failed to create access control object due to %s", chdr.ChannelId, err)
- return cb.Status_BAD_REQUEST, nil
- }
- if err := accessControl.Evaluate(); err != nil {
- logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
- return cb.Status_FORBIDDEN, nil
- }
- seekInfo := &ab.SeekInfo{}
- // 返回迭代器及起始区块号
- cursor, number := chain.Reader().Iterator(seekInfo.Start)
- defer cursor.Close()
- var stopNum uint64
- switch stop := seekInfo.Stop.Type.(type) {
- case *ab.SeekPosition_Oldest:
- stopNum = number
- case *ab.SeekPosition_Newest:
- stopNum = chain.Reader().Height() - 1
- case *ab.SeekPosition_Specified:
- stopNum = stop.Specified.Number
- if stopNum <number {
- logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
- return cb.Status_BAD_REQUEST, nil
- }
- }
- for {
- if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
- if number> chain.Reader().Height()-1 {
- return cb.Status_NOT_FOUND, nil
- }
- }
- var block *cb.Block
- var status cb.Status
- iterCh := make(chan struct{})
- go func() {
- // 获取区块
- block, status = cursor.Next()
- close(iterCh)
- }()
- select {
- case <-ctx.Done():
- logger.Debugf("Context canceled, aborting wait for next block")
- return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved")
- case <-erroredChan:
- // TODO, today, the only user of the errorChan is the orderer consensus implementations. If the peer ever reports
- // this error, we will need to update this error message, possibly finding a way to signal what error text to return.
- logger.Warningf("Aborting deliver for request because the backing consensus implementation indicates an error")
- return cb.Status_SERVICE_UNAVAILABLE, nil
- case <-iterCh:
- // Iterator has set the block and status vars
- }
- if status != cb.Status_SUCCESS {
- logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
- return status, nil
- }
- // increment block number to support FAIL_IF_NOT_READY deliver behavior
- number++
- if err := accessControl.Evaluate(); err != nil {
- logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
- return cb.Status_FORBIDDEN, nil
- }
- logger.Debugf("[channel: %s] Delivering block for (%p) for %s", chdr.ChannelId, seekInfo, addr)
- // 发送区块
- if err := srv.SendBlockResponse(block); err != nil {
- logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
- return cb.Status_INTERNAL_SERVER_ERROR, err
- }
- h.Metrics.BlocksSent.With(labels...).Add(1)
- // 如果到了 client 请求对最后区块跳出循环
- if stopNum == block.Header.Number {
- break
- }
- }
- logger.Debugf("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo)
- return cb.Status_SUCCESS, nil
- }
反序列化 envelope.Payload
对 payload.Header 和 ChannelHeader 进行验证
根据通道获取对应对 chain
访问控制相关验证, policy,signature
从 payload.data 解析出 SeekInfo
新建一个迭代器 cursor
通过 stop.type 判断 stopNum()
cursor.Next()获取下个区块及 SendBlockResponse()发送区块
判断是否达到请求的最后区块, 是就跳过循环
- // Chain encapsulates chain operations and data.
- type Chain interface {
- // Sequence returns the current config sequence number, can be used to detect config changes
- Sequence() uint64
- // PolicyManager returns the current policy manager as specified by the chain configuration
- PolicyManager() policies.Manager
- // Reader returns the chain Reader for the chain
- Reader() blockledger.Reader
- // Errored returns a channel which closes when the backing consenter has errored
- Errored() <-chan struct{}
- }
- type SeekInfo struct {
- Start *SeekPosition `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"`
- Stop *SeekPosition `protobuf:"bytes,2,opt,name=stop,proto3" json:"stop,omitempty"`
- Behavior SeekInfo_SeekBehavior `protobuf:"varint,3,opt,name=behavior,proto3,enum=orderer.SeekInfo_SeekBehavior" json:"behavior,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
- }
1.2 peer 从 orderer 同步区块
这里主要解决 1 个问题: peer 如何触发 orderer deliver 服务? 即 peer 和 orderer 怎么同步区块的?
在介绍之前参阅 peer 节点启动流程. 在 peer 节点启动过程中会执行 peer.Initialize()方法, 对 peer 所在的所有 chain 实例化. 其中调用了 createChain()接口创建链对象. 在 createChain()方法中调用了 GossipService.InitializeChannel()方法. 然后调用 g.deliveryService[chainID].StartDeliverForChannel()方法获取区块.
- func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
- d.lock.Lock()
- defer d.lock.Unlock()
- if d.stopping {
- errMsg := fmt.Sprintf("Delivery service is stopping cannot join a new channel %s", chainID)
- logger.Errorf(errMsg)
- return errors.New(errMsg)
- }
- if _, exist := d.blockProviders[chainID]; exist {
- errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID)
- logger.Errorf(errMsg)
- return errors.New(errMsg)
- } else {
- client := d.newClient(chainID, ledgerInfo)
- logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID)
- // 创建区块 deliver 实例
- d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
- // 执行
- go d.launchBlockProvider(chainID, finalizer)
- }
- return nil
- }
其中 newClient()创建一个 broadcastClient, 传入参数为 requester.RequestBlocks(ledgerInfoProvider)方法. 很显然, peer 是通过该方法获取区块的, 那么该方法主要实现是什么?
- func (b *blocksRequester) RequestBlocks(ledgerInfoProvider blocksprovider.LedgerInfo) error {
- height, err := ledgerInfoProvider.LedgerHeight()
- if err != nil {
- logger.Errorf("Can't get ledger height for channel %s from committer [%s]", b.chainID, err)
- return err
- }
- if height> 0 {
- logger.Debugf("Starting deliver with block [%d] for channel %s", height, b.chainID)
- if err := b.seekLatestFromCommitter(height); err != nil {
- return err
- }
- } else {
- logger.Debugf("Starting deliver with oldest block for channel %s", b.chainID)
- if err := b.seekOldest(); err != nil {
- return err
- }
- }
- return nil
- }
调用了 seek_XXX 方法, 其中
- type SeekInfo struct {
- Start *SeekPosition `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"`
- Stop *SeekPosition `protobuf:"bytes,2,opt,name=stop,proto3" json:"stop,omitempty"`
- Behavior SeekInfo_SeekBehavior `protobuf:"varint,3,opt,name=behavior,proto3,enum=orderer.SeekInfo_SeekBehavior" json:"behavior,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
- }
- func (b *blocksRequester) seekOldest() error {
- seekInfo := &orderer.SeekInfo{
- Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}},
- Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
- Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
- }
- //TODO- epoch and msgVersion may need to be obtained for nowfollowing usage in orderer/configupdate/configupdate.go
- msgVersion := int32(0)
- epoch := uint64(0)
- tlsCertHash := b.getTLSCertHash()
- env, err := utils.CreateSignedEnvelopeWithTLSBinding(common.HeaderType_DELIVER_SEEK_INFO, b.chainID, localmsp.NewSigner(), seekInfo, msgVersion, epoch, tlsCertHash)
- if err != nil {
- return err
- }
- // 发送 envelope 给 orderer 获取区块
- return b.client.Send(env)
- }
从其中 RequestBlocks()调用的 2 个方法可知, seekInfo 的 stopNum 都为 math.MaxUint64, 则该方法会持续请求区块知道最大值(可以看作现在到未来的所有区块).
上文可知, broadcastClient 已经实例化, 并且通过调用 broadcastClient.onConnect 向 orderer 发送获取区块的 envelope. 在实例化后, 调用 launchBlockProvider. 然后会调用 pb.DeliverBlocks()方法(开始获取区块).
- type broadcastClient struct {
- stopFlag int32
- stopChan chan struct{}
- createClient clientFactory
- shouldRetry retryPolicy
- onConnect broadcastSetup
- prod comm.ConnectionProducer
- mutex sync.Mutex
- blocksDeliverer blocksprovider.BlocksDeliverer
- conn *connection
- endpoint string
- }
- // DeliverBlocks used to pull out blocks from the ordering service to
- // distributed them across peers
- func (b *blocksProviderImpl) DeliverBlocks() {
- errorStatusCounter := 0
- statusCounter := 0
- defer b.client.Close()
- for !b.isDone() {
- // 接收 orderer 分发的区块
- msg, err := b.client.Recv()
- if err != nil {
- logger.Warningf("[%s] Receive error: %s", b.chainID, err.Error())
- return
- }
- switch t := msg.Type.(type) {
- case *orderer.DeliverResponse_Status:
- if t.Status == common.Status_SUCCESS {
- logger.Warningf("[%s] ERROR! Received success for a seek that should never complete", b.chainID)
- return
- }
- if t.Status == common.Status_BAD_REQUEST || t.Status == common.Status_FORBIDDEN {
- logger.Errorf("[%s] Got error %v", b.chainID, t)
- errorStatusCounter++
- if errorStatusCounter> b.wrongStatusThreshold {
- logger.Criticalf("[%s] Wrong statuses threshold passed, stopping block provider", b.chainID)
- return
- }
- } else {
- errorStatusCounter = 0
- logger.Warningf("[%s] Got error %v", b.chainID, t)
- }
- maxDelay := float64(maxRetryDelay)
- currDelay := float64(time.Duration(math.Pow(2, float64(statusCounter))) * 100 * time.Millisecond)
- time.Sleep(time.Duration(math.Min(maxDelay, currDelay)))
- if currDelay < maxDelay {
- statusCounter++
- }
- if t.Status == common.Status_BAD_REQUEST {
- b.client.Disconnect(false)
- } else {
- b.client.Disconnect(true)
- }
- continue
- case *orderer.DeliverResponse_Block:
- errorStatusCounter = 0
- statusCounter = 0
- blockNum := t.Block.Header.Number
- marshaledBlock, err := proto.Marshal(t.Block)
- if err != nil {
- logger.Errorf("[%s] Error serializing block with sequence number %d, due to %s", b.chainID, blockNum, err)
- continue
- }
- if err := b.mcs.VerifyBlock(gossipcommon.ChainID(b.chainID), blockNum, marshaledBlock); err != nil {
- logger.Errorf("[%s] Error verifying block with sequnce number %d, due to %s", b.chainID, blockNum, err)
- continue
- }
- numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID)))
- // Create payload with a block received
- payload := createPayload(blockNum, marshaledBlock)
- // Use payload to create gossip message
- gossipMsg := createGossipMsg(b.chainID, payload)
- logger.Debugf("[%s] Adding payload to local buffer, blockNum = [%d]", b.chainID, blockNum)
- // Add payload to local state payloads buffer
- if err := b.gossip.AddPayload(b.chainID, payload); err != nil {
- logger.Warningf("Block [%d] received from ordering service wasn't added to payload buffer: %v", blockNum, err)
- }
- // Gossip messages with other nodes
- logger.Debugf("[%s] Gossiping block [%d], peers number [%d]", b.chainID, blockNum, numberOfPeers)
- if !b.isDone() {
- // peer 节点间通过 gossip 同步区块
- b.gossip.Gossip(gossipMsg)
- }
- default:
- logger.Warningf("[%s] Received unknown: %v", b.chainID, t)
- return
- }
- }
- }
DeliverBlocks()方法介绍, 首先调用 b.client.Recv()接收 orderer 传过来的响应,
- // Recv receives a message from the ordering service
- func (bc *broadcastClient) Recv() (*orderer.DeliverResponse, error) {
- o, err := bc.try(func() (interface{}, error) {
- if bc.shouldStop() {
- return nil, errors.New("closing")
- }
- return bc.tryReceive()
- })
- if err != nil {
- return nil, err
- }
- return o.(*orderer.DeliverResponse), nil
- }
这里我们知道大致是 peer 从 orderer 这里拉区块的, 但是还存在疑问, 那就是 peer 如何触发 orderer 的 deliver 服务的? peer 是如何调用 requestBlock 方法的?
peer 调用 deliver 服务
从 Recv()接口会调用 try(),try()会调用 bc.doAction(action, resetAttemptCounter), 然后调用 bc.connect(), 里面调用 bc.createClient(conn).Deliver(ctx), 从而 peer 调用了 orderer 的 deliver 服务.
peer 调用 requestBlock 方法
从上面可知, requestBlock 赋值给 broadcastSetup, 而 broadcastSetup 在连接 orderer 后会立即调用.
- // broadcastSetup is a function that is called by the broadcastClient immediately after each
- // successful connection to the ordering service
1.3 组织间 peer 同步区块
peer 间同步区块是通过 gossip 服务来同步的, 并且通过上述代码可知, leader 和 orderer 同步区块也是伴随着 gossip 服务启动(不过是属于 leader 的).
- // Gossip messages with other nodes
- logger.Debugf("[%s] Gossiping block [%d], peers number [%d]", b.chainID, blockNum, numberOfPeers)
- if !b.isDone() {
- // peer 节点间通过 gossip 同步区块
- b.gossip.Gossip(gossipMsg)
- }
1.4 peer 是如何写区块
首先, 其他 peer 是通过 gossip 服务同步区块, 则保存区块应该是在 gossip 服务里面调用的, 回到 peer 启动时 gossip 服务的设置
- service.GetGossipService().InitializeChannel(bundle.ConfigtxValidator().ChainID(), ordererAddresses, service.Support{
- Validator: validator,
- Committer: c,
- Store: store,
- Cs: simpleCollectionStore,
- IdDeserializeFactory: csStoreSupport,
- })
里面会调用
- g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapter, coordinator,
- g.metrics.StateMetrics, getStateConfiguration())
里面会调用
- // Listen for incoming communication
- go s.listen()
- // Deliver in order messages into the incoming channel
- go s.deliverPayloads()
deliverPayloads()会将 gossip.payload 区块给写入账本.
- func (s *GossipStateProviderImpl) deliverPayloads() {
- defer s.done.Done()
- for {
- select {
- // Wait for notification that next seq has arrived
- case <-s.payloads.Ready():
- logger.Debugf("[%s] Ready to transfer payloads (blocks) to the ledger, next block number is = [%d]", s.chainID, s.payloads.Next())
- // Collect all subsequent payloads
- for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() {
- rawBlock := &common.Block{}
- if err := pb.Unmarshal(payload.Data, rawBlock); err != nil {
- logger.Errorf("Error getting block with seqNum = %d due to (% v)...dropping block", payload.SeqNum, errors.WithStack(err))
- continue
- }
- if rawBlock.Data == nil || rawBlock.Header == nil {
- logger.Errorf("Block with claimed sequence %d has no header (%v) or data (%v)",
- payload.SeqNum, rawBlock.Header, rawBlock.Data)
- continue
- }
- logger.Debugf("[%s] Transferring block [%d] with %d transaction(s) to the ledger", s.chainID, payload.SeqNum, len(rawBlock.Data.Data))
- // Read all private data into slice
- var p util.PvtDataCollections
- if payload.PrivateData != nil {
- err := p.Unmarshal(payload.PrivateData)
- if err != nil {
- logger.Errorf("Wasn't able to unmarshal private data for block seqNum = %d due to (% v)...dropping block", payload.SeqNum, errors.WithStack(err))
- continue
- }
- }
- // 此处会保存区块
- if err := s.commitBlock(rawBlock, p); err != nil {
- if executionErr, isExecutionErr := err.(*vsccErrors.VSCCExecutionFailureError); isExecutionErr {
- logger.Errorf("Failed executing VSCC due to %v. Aborting chain processing", executionErr)
- return
- }
- logger.Panicf("Cannot commit block to the ledger due to % v", errors.WithStack(err))
- }
- }
- case <-s.stopCh:
- s.stopCh <- struct{}{}
- logger.Debug("State provider has been stopped, finishing to push new blocks.")
- return
- }
- }
- }
总结
peer 启动时会启动 gossip 服务模块
leader 启动 gossip 服务时会从 orderer 源源不断的请求块
peer 之间通过 gossip 同步区块, 当 leader 获取区块后会通过区块
peer 当 gossip 模块中会将 gossip.payload 区块信息写入账本
来源: https://www.cnblogs.com/jiliguo/p/11959671.html