Paxos算法也许是最著名的分布式一致性算法,而Raft则大概是最流行的分布式一致性算法。由于经验和水平所限,单纯看论文感觉并不能达到更进一步的理解。前面听闻Kubernetes, Docker Swarm, CockroachDB等等牛逼的项目都在用Raft。毕竟是经过大规模生产环境考验的技术,我觉得很有必要学习一下。而且etcd的Raft实现是开源的,毕竟“源码之前,了无秘密”。
无论是Paxos还是Raft,它们都是致力于维护一个RSM(Replicated State Machine),如上图所示。对于RSM来说,状态存储是非常关键的。在这篇博客里,我准备基于etcd的实现分析一下Raft的状态存储。Raft状态的存储主要靠Snapshot和WAL(write ahead log)实现。
- 和很多数据库一样,为了保证数据的安全性(crash或者宕机下的恢复),都会使用WAL,etcd也不例外。etcd中的每一个事务操作(即写操作),都会预先写到事务文件中,这种文件就是WAL。
-
此外,etcd作为一个高可用的KV存储系统,不可能只依靠log replay来实现数据恢复。因此,etcd还提供了snapshot(快照)功能。snapshot即是定期把整个数据库保存成一个单独的快照文件,这样一来,不但缩短了日志重放的时间,也减轻了WAL的存储量,过早的WAL可以删除掉。
etcd使用了protobuf来定义协议格式,snapshot和log也在其中。raft/raft.proto文件部分内容如下:
enum EntryType {
EntryNormal = 0;
EntryConfChange = 1;
}
message Entry {
optional uint64 Term = 2 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations
optional uint64 Index = 3 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations
optional EntryType Type = 1 [(gogoproto.nullable) = false];
optional bytes Data = 4;
}
message SnapshotMetadata {
optional ConfState conf_state = 1 [(gogoproto.nullable) = false];
optional uint64 index = 2 [(gogoproto.nullable) = false];
optional uint64 term = 3 [(gogoproto.nullable) = false];
}
message Snapshot {
optional bytes data = 1;
optional SnapshotMetadata metadata = 2 [(gogoproto.nullable) = false];
}
其中,entry即是logEntry,表示一条log。
1. Raft library提供的接口
etcd的Raft library其实也不是开箱即用,应用程序需要实现存储io和网络通信。存储io在Raft library被定义为一个Storage接口,这个Storage接口是Raft library用来读取log、snapshot等等数据的接口。Raft library本身提供了一个MemoryStorage的实现,这个实现是基于内存存储的,不能仅仅依靠它来保存持久化数据。
这个Storage的接口定义如下:
type Storage interface {
// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)
// Entries returns a slice of log entries in the range [lo,hi).
// MaxSize limits the total size of the log entries returned, but
// Entries returns at least one entry if any.
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
// Term returns the term of entry i, which must be in the range
// [FirstIndex()-1, LastIndex()]. The term of the entry before
// FirstIndex is retained for matching purposes even though the
// rest of that entry may not be available.
Term(i uint64) (uint64, error)
// LastIndex returns the index of the last entry in the log.
LastIndex() (uint64, error)
// FirstIndex returns the index of the first log entry that is
// possibly available via Entries (older entries have been incorporated
// into the latest Snapshot; if storage only contains the dummy entry the
// first log entry is not available).
FirstIndex() (uint64, error)
// Snapshot returns the most recent snapshot.
// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
// so raft state machine could know that Storage needs some time to prepare
// snapshot and call Snapshot later.
Snapshot() (pb.Snapshot, error)
}
既然仅仅依靠memoryStorage是不够用的,那么我们还是来看看etcd本身是如何使用Raft libray的。etcd的Storage接口其实也复用了memoryStorage,但是仅仅把它当做一层内存的cache。每一次事务性操作中,etcd都会事先将存储内容flush到持久化存储设备上,然后写入memoryStorage。正如前文所述,Storage仅仅是用做汇报内容给Raft library的,只要能保证它和持久化内容的一致即可。而这一点在单机上很容易保证。此外,Raft library是通过raftlog来操作Storage的,详情见 etcd/raft/raft.go 。
2. etcd的具体实现
etcd server是通过WAL和snapshot实现持久化存储的。etcd使用了一个包裹层,一个叫storage的struct。为了避免混淆,贴一点代码(etcd/etcdserver/storage.go)。
type Storage interface {
// Save function saves ents and state to the underlying stable storage.
// Save MUST block until st and ents are on stable storage.
Save(st raftpb.HardState, ents []raftpb.Entry) error
// SaveSnap function saves snapshot to the underlying stable storage.
SaveSnap(snap raftpb.Snapshot) error
// Close closes the Storage and performs finalization.
Close() error
}
type storage struct {
*wal.WAL
*snap.Snapshotter
}
func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
return &storage{w, s}
}
注意,这个Storage和之前的那个Storage并没有关系,千万不要搞混淆了。
由于golang的语言特性,storage struct可以直接使用WAL和Snapshotter的方法,因为没有声明成员变量的名字。那么etcd是如何结合使用Raft library的memoryStorage和这里的storage struct的呢?答案就在etcd/etcdserver/raft.go。etcd对Raft library进行了进一步的封装,称之为raftNode,raftNode包含了一个raftNodeConfig的匿名成员。raftNodeConfig的定义如下所示:
type raftNodeConfig struct {
// to check if msg receiver is removed from cluster
isIDRemoved func(id uint64) bool
raft.Node
raftStorage *raft.MemoryStorage
storage Storage
heartbeat time.Duration // for logging
// transport specifies the transport to send and receive msgs to members.
// Sending messages MUST NOT block. It is okay to drop messages, since
// clients should timeout and reissue their messages.
// If transport is nil, server will panic.
transport rafthttp.Transporter
}
源码看起来就是一目了然,raftStorage就是提供给Raft library的,而storage则是etcd实现的持久化存贮。在使用中,etcd以连续调用的方式实现二者一致的逻辑。以etcd server重启为例,我们看看同步是如何实现的,且看restartNode()的实现。
func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := membership.NewCluster("")
cl.SetID(cid)
s := raft.NewMemoryStorage()
if snapshot != nil {
s.ApplySnapshot(*snapshot)
}
s.SetHardState(st)
s.Append(ents)
c := &raft.Config{
ID: uint64(id),
ElectionTick: cfg.ElectionTicks,
HeartbeatTick: 1,
Storage: s,
MaxSizePerMsg: maxSizePerMsg,
MaxInflightMsgs: maxInflightMsgs,
CheckQuorum: true,
}
n := raft.RestartNode(c)
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
advanceTicksForElection(n, c.ElectionTick)
return id, cl, n, s, w
}
这个函数的主要逻辑就是通过读取snapshot和WAL,然后通过s.SetHardState()和s.Append()使得memoryStrorage的状态得到恢复。在etcd的工作过程中也是类似的形式,不信请看raft.go的start()方法:
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
plog.Fatalf("raft save state and entries error: %v", err)
}
if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
}
// gofail: var raftAfterSave struct{}
r.raftStorage.Append(rd.Entries)
代码我删减了一部分,总体的逻辑可以看得更清楚。r.storage.Save()和r.raftStorage.Append()这种连续调用保证了storage和raftStorage的一致性。
好吧!状态存储就到这里了,但这仅仅是Raft的基本内容,后边继续探索Raft的日志复制、leader选举以及事务提交的实现,当然还有RPC。