面试官:聊聊 etcd 中的 Raft 吧( 四 )

propc和recvc中拿到的是从上层应用传进来的消息 , 这个消息会被交给 raft 层的Step函数处理 , 具体处理逻辑我上面有过介绍 。
下面来解释下readyc的作用 。 在 etcd 的这个实现中 , node并不负责数据的持久化、网络消息的通信、以及将已经提交的 log 应用到状态机中 , 所以node使用readyc这个channel对外通知有数据要处理了 , 并将这些需要外部处理的数据打包到一个Ready结构体中:
// #L52// Ready encapsulates the entries and messages that are ready to read,// be saved to stable storage, committed or sent to other peers.// All fields in Ready are read-only.type Ready struct {// The current volatile state of a Node.// SoftState will be nil if there is no update.// It is not required to consume or store SoftState.*SoftState// The current state of a Node to be saved to stable storage BEFORE// Messages are sent.// HardState will be equal to empty state if there is no update.pb.HardState// ReadStates can be used for node to serve linearizable read requests locally// when its applied index is greater than the index in ReadState.// Note that the readState will be returned when raft receives msgReadIndex.// The returned is only valid for the request that requested to read.ReadStates []ReadState// Entries specifies entries to be saved to stable storage BEFORE// Messages are sent.Entries []pb.Entry// Snapshot specifies the snapshot to be saved to stable storage.Snapshot pb.Snapshot// CommittedEntries specifies entries to be committed to a// store/state-machine. These have previously been committed to stable// store.CommittedEntries []pb.Entry// Messages specifies outbound messages to be sent AFTER Entries are// committed to stable storage.// If it contains a MsgSnap message, the application MUST report back to raft// when the snapshot has been received or has failed by calling ReportSnapshot.Messages []pb.Message// MustSync indicates whether the HardState and Entries must be synchronously// written to disk or if an asynchronous write is permissible.MustSync bool}应用程序得到这个Ready之后 , 需要:

  1. 将 HardState, Entries, Snapshot 持久化到 storage 。
  2. 将 Messages 广播给其他节点 。
  3. 将 CommittedEntries(已经 commit 还没有 apply)应用到状态机 。
  4. 如果发现 CommittedEntries 中有成员变更类型的 entry , 调用node.ApplyConfChange()方法让node知道 。
  5. 最后再调用node.Advance()告诉 raft , 这批状态更新处理完了 , 状态已经演进了 , 可以给我下一批 Ready 让我处理 。
Life of a Request前面我们把整个包的结构过了一遍 , 下面来结合具体的代码看看 raft 对一个请求的处理过程是怎样的 。 我一直觉得 , 如果能从代码的层面追踪到一个请求的处理过程 , 那无论是从宏观还是微观的角度 , 对理解整个系统都是非常有帮助的 。
Life of a Vote Request
  1. 首先 , 在node的大循环里 , 有一个会定时输出的tick channel , 它来触发raft.tick()函数 , 根据上面的介绍可知 , 如果当前节点是 follower , 那它的tick函数会指向tickElection 。 tickElection的处理逻辑是给自己发送一个MsgHup的内部消息 , Step函数看到这个消息后会调用campaign函数 , 进入竞选状态 。
// tickElection is run by followers and candidates after r.electionTimeout.func (r *raft) tickElection() {r.electionElapsed++if r.promotable()--tt-darkmode-color: #EF7060;">campaign则会调用becomeCandidate把自己切换到 candidate 模式 , 并递增Term值 。 然后再将自己的Term及日志信息发送给其他的节点 , 请求投票 。 func (r *raft) campaign(t CampaignType) {//...r.becomeCandidate()// Get peer id from progressfor id := range r.prs {//...r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})}}
  1. 另一方面 , 其他节点在接受到这个请求后 , 会首先比较接收到的Term是不是比自己的大 , 以及接受到的日志信息是不是比自己的要新 , 从而决定是否投票 。 这个逻辑我们还是可以从Step函数中找到:
func (r *raft) Step(m pb.Message) error {//...switch m.Type {case pb.MsgVote, pb.MsgPreVote:// We can vote if this is a repeat of a vote we've already cast...canVote := r.Vote == m.From ||// ...we haven't voted and we don't think there's a leader yet in this term...(r.Vote == None--tt-darkmode-color: #EF7060;">node.Propose开始 , Propose方法将这个写请求封装到一个MsgProp消息里面 , 发送给自己处理 。
  • 消息处理函数Step无法直接处理这个消息 , 它会调用那个小写的step函数 , 来根据当前的状态进行处理 。
  • 如果当前是 follower , 那它会把这个消息转发给 leader 。
  • func stepFollower(r *raft, m pb.Message) error {switch m.Type {case pb.MsgProp://...m.To = r.leadr.send(m)}}