Golang implementation of the Raft consensus protocol (2)
Posted onViews: 99+
Abstract
In this big data time, high performance distributed systems are required to process the large volumn of data. However, it is not easy to organize plenty of nodes. One of the significant problems is distributed consensus, which means every node in the cluster will eventually reach a consensus without any conflicts.
Raft is a distributed consensus algorithm which has been proved workable. This expriment contitues the previous expriment and implements the log replication and finally tests the whole system in many abnormal situations.
Before Raft, (multi-)Paxos has been treated as an industry standard for a long time. However, even with the help of Paxos, we still find it hard to build up a reliable distributed system.
Just as the comment from Chubby implementers:
There are significant gaps between the description of the Paxos algorithm and the needs of a real-world system…. the final system will be based on an unproven protocol.
Paxos is rather difficult to implement mainly because it is not easy to understand for those who are not mathematicians.
Then Raft came out, which has a good understandability. Compared with Paxos, there are smaller state space achieved by reducing states. Also, Raft decomposes the problem into leader election, log replication, safety and membership changes, instead of treating them as a total of mess.
To further understand distributed consensus, this expriment tries to implement the Raft algorithm in go language.
Design & Realization
The language we use in this expriment is Go 1.9.
Structure
There are some states such as current term, logs, role of node that have to be stored and shared across the threads, so we design a structure called Raft. Among the variables, currentTerm, votedFor and logs are required to be persisted while the rest are volatile. To save space, some variables for loop control are not mentioned here.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
type Raft struct { mu sync.Mutex peers []*labrpc.ClientEnd persister *Persister me int// index into peers[]
currentTerm int votedFor int logs []map[string]int
commitIndex int lastApplied int
nextIndex []int matchIndex []int
role int//0-follower, 1-candidate, 2-leader electionTimeout time.Duration }
Initialization & main loop
The main loop is almost the same as we mentioned in leader election in the previous expriment, so here we only discuss the differences.
Election
In this expriment, we add following lines to the vote function, meaning inititialze nextIndex and matchIndex after being elected as the leader.
Also, the process of handling vote request differs.
The server will compare term of last log entry with candidate’s last term, if the candidate claims a higher term or the two terms equal but candidate has larger log, it grants the request. Otherwise, it rejects.
The main process of log replication works as follows.
First of all, when a new command reaches the leader, the leader appendes it to log and reply an index where command will exists if successfully replicated. Followers won’t accept requests from clients, they simply redirect clients to the leader.
The leader sends new log entries to each server, attching index of previous log entry and the term of that entry.The log entries are determined by nextIndex.
Followers check whether they have previous log entries and then append new log entries to certain location.
If a majority of followers accept a command, the leader increases his commitIndex and replies to the client.
The commitIndex will be sent in next appendEntry request, followers commit the commands known to have been replicated in a majority servers.
The format of request and reply.
1 2 3 4 5 6 7 8
type AppendEntriesArgs struct { Term int LeaderId int PrevLogIndex int PrevLogTerm int Entries []map[string]int LeaderCommit int }
1 2 3 4 5
type AppendEntriesReply struct { Term int Success bool Len int }
The leader sends appendEntries request to each follower periodically to keep the role state. In a request, if a follower has log entries not replicated, the next log entries will be attached in the request. Each reuqest is executed in a new thread in case the network is slow or unreachable, which will block the loop.
Every appendEntries contains more than one log entries. When a follower receives this request, it first confirms the role of the server claimed to be the server. Then it checks if it has already recorded the log entries before newer ones. If all pass, the server overwrite the log and replace log from certain index with given log entries in request.
The leader receives reply from followers and increase matchIndex, which means logs entries known to replicated in a follower. If a majority followers have replicated a log entry, the leader increase the commitIndex by one and replies to the client. To prevent potential problems of unreliable network, the log entries are commited one by one in incremental order.
There is a potential problem in the log replication. If a leader receives a command but fails to replicate it due to network failure, it then becomes the leader in later term, now it can replicate the command to other followers. Unfortunatelly, it fails again shortyly after commiting the entry. In the origin algorithm, if a server is elected as the leader but that server does not contain that command, it will override the entry, and results in some followers commiting same log entry twice but with different commands. It conflicts with Raft rules that commands commited will exist in following leaders.
To prevent this, we extend the algorithm to not commit commands from older term immediatelly after a majority replicates. Instead, they are commited just after a command in current term to be commited. This ensures only server which contains the newest lon entry can be elected as the leader.
func(rf *Raft) sendAppendEntries(server int, args AppendEntriesArgs, reply *AppendEntriesReply) bool { ok := rf.peers[server].Call("Raft.AppendEntries", args, reply) if ok { if reply.Term > rf.currentTerm { rf.mu.Lock() rf.currentTerm = reply.Term rf.role = 0 rf.votedFor = -1 rf.persist() rf.mu.Unlock() } if ok && rf.role == 2 && !reply.Success { rf.nextIndex[server] = args.PrevLogIndex if reply.Len < args.PrevLogIndex { rf.nextIndex[server] = reply.Len + 1 } } } return ok && reply.Success }
In some cases, a follower may lose log entries and as a result it can not simply override the log. In these cases, it replies false in the appendEntries request, the leader will then decrease the nextIndex and sends older log entries until the missing logs entries are fixed.
However, a server may lose too many log entries. If we use the above strategy, it consumes a long time. To speed up the decreament, we add a parameter len to the reply structure which means the log longth of follower. Then the leader can reset nextIndex to len to reduce the time re-trying. Or we can use a snapshot.
Validation
There are a total 17 test case TestInitialElection, TestReElection, TestBasicAgree, TestFailAgree, TestFailNoAgree, TestConcurrentStarts, TestRejoin, TestBackup, TestCount, TestPersist1, TestPersist2, TestPersist3, TestFigure8, TestUnreliableAgree, TestFigure8Unreliable, TestReliableChurn and internalChurn, ranging from normal state to unreliable network such as network delay, network partition, package loss, duplicated packages and reordering of packages.
Run the tests many times and the result shows that our system passes all the test cases successfully.
0 says: hello world! 1 says: hello world! 2 says: hello world! 3 says: hello world! 4 says: hello world! Test: basic agreement ... 1 says: I am not a leader 2 says: I am not a leader 3 says: I am not a leader 4 says: I am not a leader 0 says: I am not a leader 2 says: bye~ 0 says: stop heart beat 1 says: I am not a leader 2 says: I am not a leader 3 says: I am not a leader 4 says: I am not a leader 0 says: I am not a leader 0 says: bye~ 1 says: I am not a leader 2 says: I am not a leader 3 says: I am not a leader 4 says: I am not a leader 0 says: I am not a leader 1 says: bye~ 1 tells 2 : vote me, {28 1 0 0} 1 tells 0 : vote me, {28 1 0 0} 0 says: higher term detected, term= 28 0 tells 1 : vote granted 2 says: higher term detected, term= 28 2 tells 1 : vote granted 1 says: I am the leader in term 28 1 says: stop heart beat 3 tells 0 : vote me, {1 3 0 0} 3 tells 4 : vote me, {1 3 0 0} 3 tells 1 : vote me, {1 3 0 0} 3 tells 2 : vote me, {1 3 0 0} 0 says: higher term detected, term= 1 0 tells 3 : vote granted 4 says: higher term detected, term= 1 4 tells 3 : vote granted 1 says: higher term detected, term= 1 1 tells 3 : vote granted 2 says: higher term detected, term= 1 2 tells 3 : vote granted 3 says: I am the leader in term 1 3 tells 4 : ping, {1 3 0 0 [] 0} 3 tells 1 : ping, {1 3 0 0 [] 0} 3 tells 2 : ping, {1 3 0 0 [] 0} 3 tells 0 : ping, {1 3 0 0 [] 0} 4 tells 3 : pong, &{1 true} 2 tells 3 : pong, &{1 true} 1 tells 3 : pong, &{1 true} 0 tells 3 : pong, &{1 true} 1 says: I am not a leader 2 says: I am not a leader 3 says: new command 100 in term 1 3 tells 2 : ping, {1 3 0 0 [map[command:100 term:1]] 0} 3 tells 0 : ping, {1 3 0 0 [map[command:100 term:1]] 0} 3 tells 1 : ping, {1 3 0 0 [map[command:100 term:1]] 0} 3 tells 4 : ping, {1 3 0 0 [map[command:100 term:1]] 0} 1 tells 3 : pong, &{1 true} 2 tells 3 : pong, &{1 true} 4 tells 3 : pong, &{1 true} 0 tells 3 : pong, &{1 true} 3 says: 100 is committed, index= 1 3 tells 4 : ping, {1 3 1 1 [] 1} 3 tells 2 : ping, {1 3 1 1 [] 1} 3 tells 1 : ping, {1 3 1 1 [] 1} 3 tells 0 : ping, {1 3 1 1 [] 1} 2 says: commit 100 index= 1 4 says: commit 100 index= 1 2 tells 3 : pong, &{1 true} 4 tells 3 : pong, &{1 true} 1 says: commit 100 index= 1 1 tells 3 : pong, &{1 true} 0 says: commit 100 index= 1 0 tells 3 : pong, &{1 true} 1 says: I am not a leader 2 says: I am not a leader 3 says: new command 200 in term 1 3 tells 1 : ping, {1 3 1 1 [map[command:200 term:1]] 1} 3 tells 2 : ping, {1 3 1 1 [map[command:200 term:1]] 1} 3 tells 4 : ping, {1 3 1 1 [map[command:200 term:1]] 1} 3 tells 0 : ping, {1 3 1 1 [map[command:200 term:1]] 1} 1 tells 3 : pong, &{1 true} 4 tells 3 : pong, &{1 true} 2 tells 3 : pong, &{1 true} 0 tells 3 : pong, &{1 true} 3 says: 200 is committed, index= 2 3 tells 4 : ping, {1 3 2 1 [] 2} 3 tells 0 : ping, {1 3 2 1 [] 2} 3 tells 1 : ping, {1 3 2 1 [] 2} 3 tells 2 : ping, {1 3 2 1 [] 2} 0 says: commit 200 index= 2 0 tells 3 : pong, &{1 true} 2 says: commit 200 index= 2 4 says: commit 200 index= 2 2 tells 3 : pong, &{1 true} 1 says: commit 200 index= 2 1 tells 3 : pong, &{1 true} 4 tells 3 : pong, &{1 true} 1 says: I am not a leader 2 says: I am not a leader 3 says: new command 300 in term 1 3 tells 1 : ping, {1 3 2 1 [map[command:300 term:1]] 2} 3 tells 2 : ping, {1 3 2 1 [map[term:1 command:300]] 2} 3 tells 0 : ping, {1 3 2 1 [map[command:300 term:1]] 2} 3 tells 4 : ping, {1 3 2 1 [map[command:300 term:1]] 2} 1 tells 3 : pong, &{1 true} 2 tells 3 : pong, &{1 true} 3 says: 300 is committed, index= 3 4 tells 3 : pong, &{1 true} 0 tells 3 : pong, &{1 true} 3 tells 4 : ping, {1 3 3 1 [] 3} 3 tells 1 : ping, {1 3 3 1 [] 3} 3 tells 2 : ping, {1 3 3 1 [] 3} 3 tells 0 : ping, {1 3 3 1 [] 3} 4 says: commit 300 index= 3 4 tells 3 : pong, &{1 true} 1 says: commit 300 index= 3 2 says: commit 300 index= 3 0 says: commit 300 index= 3 1 tells 3 : pong, &{1 true} 2 tells 3 : pong, &{1 true} 0 tells 3 : pong, &{1 true} ... Passed
Conclusion
This expriment implements the rest parts of Raft and then makes a fully test on the whole system. The result shows that the cluster quickly generates a leader and remains the normal state until a failure, and after the failure the cluster can re-elect a new leader in a short time. Even in some extremely bad network situations, the system can tolerant the unreliable network and works well. This expriment proves the reliablity of Raft algorithm in another way.
What’s more, this expriment shows that test-driven development has great value, it can expose potential problems which is not easy to find by code review.