package raft // // support for Raft tester. // // we will use the original config.go to test your code for grading. // so, while you can modify this code to help you debug, please // test with the original before submitting. // import "6.5840/labgob" import "6.5840/labrpc" import "bytes" import "log" import "sync" import "sync/atomic" import "testing" import "runtime" import "math/rand" import crand "crypto/rand" import "math/big" import "encoding/base64" import "time" import "fmt" func randstring(n int) string { b := make([]byte, 2*n) crand.Read(b) s := base64.URLEncoding.EncodeToString(b) return s[0:n] } func makeSeed() int64 { max := big.NewInt(int64(1) << 62) bigx, _ := crand.Int(crand.Reader, max) x := bigx.Int64() return x } type config struct { mu sync.Mutex t *testing.T finished int32 net *labrpc.Network n int rafts []*Raft applyErr []string // from apply channel readers connected []bool // whether each server is on the net saved []*Persister endnames [][]string // the port file names each sends to logs []map[int]interface{} // copy of each server's committed entries lastApplied []int start time.Time // time at which make_config() was called // begin()/end() statistics t0 time.Time // time at which test_test.go called cfg.begin() rpcs0 int // rpcTotal() at start of test cmds0 int // number of agreements bytes0 int64 maxIndex int maxIndex0 int } var ncpu_once sync.Once func make_config(t *testing.T, n int, unreliable bool, snapshot bool) *config { ncpu_once.Do(func() { if runtime.NumCPU() < 2 { fmt.Printf("warning: only one CPU, which may conceal locking bugs\n") } rand.Seed(makeSeed()) }) runtime.GOMAXPROCS(4) cfg := &config{} cfg.t = t cfg.net = labrpc.MakeNetwork() cfg.n = n cfg.applyErr = make([]string, cfg.n) cfg.rafts = make([]*Raft, cfg.n) cfg.connected = make([]bool, cfg.n) cfg.saved = make([]*Persister, cfg.n) cfg.endnames = make([][]string, cfg.n) cfg.logs = make([]map[int]interface{}, cfg.n) cfg.lastApplied = make([]int, cfg.n) cfg.start = time.Now() cfg.setunreliable(unreliable) cfg.net.LongDelays(true) applier := cfg.applier if snapshot { applier = cfg.applierSnap } // create a full set of Rafts. for i := 0; i < cfg.n; i++ { cfg.logs[i] = map[int]interface{}{} cfg.start1(i, applier) } // connect everyone for i := 0; i < cfg.n; i++ { cfg.connect(i) } return cfg } // shut down a Raft server but save its persistent state. func (cfg *config) crash1(i int) { cfg.disconnect(i) cfg.net.DeleteServer(i) // disable client connections to the server. cfg.mu.Lock() defer cfg.mu.Unlock() // a fresh persister, in case old instance // continues to update the Persister. // but copy old persister's content so that we always // pass Make() the last persisted state. if cfg.saved[i] != nil { cfg.saved[i] = cfg.saved[i].Copy() } rf := cfg.rafts[i] if rf != nil { cfg.mu.Unlock() rf.Kill() cfg.mu.Lock() cfg.rafts[i] = nil } if cfg.saved[i] != nil { raftlog := cfg.saved[i].ReadRaftState() snapshot := cfg.saved[i].ReadSnapshot() cfg.saved[i] = &Persister{} cfg.saved[i].Save(raftlog, snapshot) } } func (cfg *config) checkLogs(i int, m ApplyMsg) (string, bool) { err_msg := "" v := m.Command for j := 0; j < len(cfg.logs); j++ { if old, oldok := cfg.logs[j][m.CommandIndex]; oldok && old != v { log.Printf("%v: log %v; server %v\n", i, cfg.logs[i], cfg.logs[j]) // some server has already committed a different value for this entry! err_msg = fmt.Sprintf("commit index=%v server=%v %v != server=%v %v", m.CommandIndex, i, m.Command, j, old) } } _, prevok := cfg.logs[i][m.CommandIndex-1] cfg.logs[i][m.CommandIndex] = v if m.CommandIndex > cfg.maxIndex { cfg.maxIndex = m.CommandIndex } return err_msg, prevok } // applier reads message from apply ch and checks that they match the log // contents func (cfg *config) applier(i int, applyCh chan ApplyMsg) { for m := range applyCh { if m.CommandValid == false { // ignore other types of ApplyMsg } else { cfg.mu.Lock() err_msg, prevok := cfg.checkLogs(i, m) cfg.mu.Unlock() if m.CommandIndex > 1 && prevok == false { err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex) } if err_msg != "" { log.Fatalf("apply error: %v", err_msg) cfg.applyErr[i] = err_msg // keep reading after error so that Raft doesn't block // holding locks... } } } } // returns "" or error string func (cfg *config) ingestSnap(i int, snapshot []byte, index int) string { if snapshot == nil { log.Fatalf("nil snapshot") return "nil snapshot" } r := bytes.NewBuffer(snapshot) d := labgob.NewDecoder(r) var lastIncludedIndex int var xlog []interface{} if d.Decode(&lastIncludedIndex) != nil || d.Decode(&xlog) != nil { log.Fatalf("snapshot decode error") return "snapshot Decode() error" } if index != -1 && index != lastIncludedIndex { err := fmt.Sprintf("server %v snapshot doesn't match m.SnapshotIndex", i) return err } cfg.logs[i] = map[int]interface{}{} for j := 0; j < len(xlog); j++ { cfg.logs[i][j] = xlog[j] } cfg.lastApplied[i] = lastIncludedIndex return "" } const SnapShotInterval = 10 // periodically snapshot raft state func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) { cfg.mu.Lock() rf := cfg.rafts[i] cfg.mu.Unlock() if rf == nil { return // ??? } for m := range applyCh { err_msg := "" if m.SnapshotValid { cfg.mu.Lock() err_msg = cfg.ingestSnap(i, m.Snapshot, m.SnapshotIndex) cfg.mu.Unlock() } else if m.CommandValid { if m.CommandIndex != cfg.lastApplied[i]+1 { err_msg = fmt.Sprintf("server %v apply out of order, expected index %v, got %v", i, cfg.lastApplied[i]+1, m.CommandIndex) } if err_msg == "" { cfg.mu.Lock() var prevok bool err_msg, prevok = cfg.checkLogs(i, m) cfg.mu.Unlock() if m.CommandIndex > 1 && prevok == false { err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex) } } cfg.mu.Lock() cfg.lastApplied[i] = m.CommandIndex cfg.mu.Unlock() if (m.CommandIndex+1)%SnapShotInterval == 0 { w := new(bytes.Buffer) e := labgob.NewEncoder(w) e.Encode(m.CommandIndex) var xlog []interface{} for j := 0; j <= m.CommandIndex; j++ { xlog = append(xlog, cfg.logs[i][j]) } e.Encode(xlog) rf.Snapshot(m.CommandIndex, w.Bytes()) } } else { // Ignore other types of ApplyMsg. } if err_msg != "" { log.Fatalf("apply error: %v", err_msg) cfg.applyErr[i] = err_msg // keep reading after error so that Raft doesn't block // holding locks... } } } // start or re-start a Raft. // if one already exists, "kill" it first. // allocate new outgoing port file names, and a new // state persister, to isolate previous instance of // this server. since we cannot really kill it. func (cfg *config) start1(i int, applier func(int, chan ApplyMsg)) { cfg.crash1(i) // a fresh set of outgoing ClientEnd names. // so that old crashed instance's ClientEnds can't send. cfg.endnames[i] = make([]string, cfg.n) for j := 0; j < cfg.n; j++ { cfg.endnames[i][j] = randstring(20) } // a fresh set of ClientEnds. ends := make([]*labrpc.ClientEnd, cfg.n) for j := 0; j < cfg.n; j++ { ends[j] = cfg.net.MakeEnd(cfg.endnames[i][j]) cfg.net.Connect(cfg.endnames[i][j], j) } cfg.mu.Lock() cfg.lastApplied[i] = 0 // a fresh persister, so old instance doesn't overwrite // new instance's persisted state. // but copy old persister's content so that we always // pass Make() the last persisted state. if cfg.saved[i] != nil { cfg.saved[i] = cfg.saved[i].Copy() snapshot := cfg.saved[i].ReadSnapshot() if snapshot != nil && len(snapshot) > 0 { // mimic KV server and process snapshot now. // ideally Raft should send it up on applyCh... err := cfg.ingestSnap(i, snapshot, -1) if err != "" { cfg.t.Fatal(err) } } } else { cfg.saved[i] = MakePersister() } cfg.mu.Unlock() applyCh := make(chan ApplyMsg) rf := Make(ends, i, cfg.saved[i], applyCh) cfg.mu.Lock() cfg.rafts[i] = rf cfg.mu.Unlock() go applier(i, applyCh) svc := labrpc.MakeService(rf) srv := labrpc.MakeServer() srv.AddService(svc) cfg.net.AddServer(i, srv) } func (cfg *config) checkTimeout() { // enforce a two minute real-time limit on each test if !cfg.t.Failed() && time.Since(cfg.start) > 120*time.Second { cfg.t.Fatal("test took longer than 120 seconds") } } func (cfg *config) checkFinished() bool { z := atomic.LoadInt32(&cfg.finished) return z != 0 } func (cfg *config) cleanup() { atomic.StoreInt32(&cfg.finished, 1) for i := 0; i < len(cfg.rafts); i++ { if cfg.rafts[i] != nil { cfg.rafts[i].Kill() } } cfg.net.Cleanup() cfg.checkTimeout() } // attach server i to the net. func (cfg *config) connect(i int) { // fmt.Printf("connect(%d)\n", i) cfg.connected[i] = true // outgoing ClientEnds for j := 0; j < cfg.n; j++ { if cfg.connected[j] { endname := cfg.endnames[i][j] cfg.net.Enable(endname, true) } } // incoming ClientEnds for j := 0; j < cfg.n; j++ { if cfg.connected[j] { endname := cfg.endnames[j][i] cfg.net.Enable(endname, true) } } } // detach server i from the net. func (cfg *config) disconnect(i int) { // fmt.Printf("disconnect(%d)\n", i) cfg.connected[i] = false // outgoing ClientEnds for j := 0; j < cfg.n; j++ { if cfg.endnames[i] != nil { endname := cfg.endnames[i][j] cfg.net.Enable(endname, false) } } // incoming ClientEnds for j := 0; j < cfg.n; j++ { if cfg.endnames[j] != nil { endname := cfg.endnames[j][i] cfg.net.Enable(endname, false) } } } func (cfg *config) rpcCount(server int) int { return cfg.net.GetCount(server) } func (cfg *config) rpcTotal() int { return cfg.net.GetTotalCount() } func (cfg *config) setunreliable(unrel bool) { cfg.net.Reliable(!unrel) } func (cfg *config) bytesTotal() int64 { return cfg.net.GetTotalBytes() } func (cfg *config) setlongreordering(longrel bool) { cfg.net.LongReordering(longrel) } // check that one of the connected servers thinks // it is the leader, and that no other connected // server thinks otherwise. // // try a few times in case re-elections are needed. func (cfg *config) checkOneLeader() int { for iters := 0; iters < 10; iters++ { ms := 450 + (rand.Int63() % 100) time.Sleep(time.Duration(ms) * time.Millisecond) leaders := make(map[int][]int) for i := 0; i < cfg.n; i++ { if cfg.connected[i] { if term, leader := cfg.rafts[i].GetState(); leader { leaders[term] = append(leaders[term], i) } } } lastTermWithLeader := -1 for term, leaders := range leaders { if len(leaders) > 1 { cfg.t.Fatalf("term %d has %d (>1) leaders", term, len(leaders)) } if term > lastTermWithLeader { lastTermWithLeader = term } } if len(leaders) != 0 { return leaders[lastTermWithLeader][0] } } cfg.t.Fatalf("expected one leader, got none") return -1 } // check that everyone agrees on the term. func (cfg *config) checkTerms() int { term := -1 for i := 0; i < cfg.n; i++ { if cfg.connected[i] { xterm, _ := cfg.rafts[i].GetState() if term == -1 { term = xterm } else if term != xterm { cfg.t.Fatalf("servers disagree on term") } } } return term } // check that none of the connected servers // thinks it is the leader. func (cfg *config) checkNoLeader() { for i := 0; i < cfg.n; i++ { if cfg.connected[i] { _, is_leader := cfg.rafts[i].GetState() if is_leader { cfg.t.Fatalf("expected no leader among connected servers, but %v claims to be leader", i) } } } } // how many servers think a log entry is committed? func (cfg *config) nCommitted(index int) (int, interface{}) { count := 0 var cmd interface{} = nil for i := 0; i < len(cfg.rafts); i++ { if cfg.applyErr[i] != "" { cfg.t.Fatal(cfg.applyErr[i]) } cfg.mu.Lock() cmd1, ok := cfg.logs[i][index] cfg.mu.Unlock() if ok { if count > 0 && cmd != cmd1 { cfg.t.Fatalf("committed values do not match: index %v, %v, %v", index, cmd, cmd1) } count += 1 cmd = cmd1 } } return count, cmd } // wait for at least n servers to commit. // but don't wait forever. func (cfg *config) wait(index int, n int, startTerm int) interface{} { to := 10 * time.Millisecond for iters := 0; iters < 30; iters++ { nd, _ := cfg.nCommitted(index) if nd >= n { break } time.Sleep(to) if to < time.Second { to *= 2 } if startTerm > -1 { for _, r := range cfg.rafts { if t, _ := r.GetState(); t > startTerm { // someone has moved on // can no longer guarantee that we'll "win" return -1 } } } } nd, cmd := cfg.nCommitted(index) if nd < n { cfg.t.Fatalf("only %d decided for index %d; wanted %d", nd, index, n) } return cmd } // do a complete agreement. // it might choose the wrong leader initially, // and have to re-submit after giving up. // entirely gives up after about 10 seconds. // indirectly checks that the servers agree on the // same value, since nCommitted() checks this, // as do the threads that read from applyCh. // returns index. // if retry==true, may submit the command multiple // times, in case a leader fails just after Start(). // if retry==false, calls Start() only once, in order // to simplify the early Lab 3B tests. func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int { t0 := time.Now() starts := 0 for time.Since(t0).Seconds() < 10 && cfg.checkFinished() == false { // try all the servers, maybe one is the leader. index := -1 for si := 0; si < cfg.n; si++ { starts = (starts + 1) % cfg.n var rf *Raft cfg.mu.Lock() if cfg.connected[starts] { rf = cfg.rafts[starts] } cfg.mu.Unlock() if rf != nil { index1, _, ok := rf.Start(cmd) if ok { index = index1 break } } } if index != -1 { // somebody claimed to be the leader and to have // submitted our command; wait a while for agreement. t1 := time.Now() for time.Since(t1).Seconds() < 2 { nd, cmd1 := cfg.nCommitted(index) if nd > 0 && nd >= expectedServers { // committed if cmd1 == cmd { // and it was the command we submitted. return index } } time.Sleep(20 * time.Millisecond) } if retry == false { cfg.t.Fatalf("one(%v) failed to reach agreement", cmd) } } else { time.Sleep(50 * time.Millisecond) } } if cfg.checkFinished() == false { cfg.t.Fatalf("one(%v) failed to reach agreement", cmd) } return -1 } // start a Test. // print the Test message. // e.g. cfg.begin("Test (3B): RPC counts aren't too high") func (cfg *config) begin(description string) { fmt.Printf("%s ...\n", description) cfg.t0 = time.Now() cfg.rpcs0 = cfg.rpcTotal() cfg.bytes0 = cfg.bytesTotal() cfg.cmds0 = 0 cfg.maxIndex0 = cfg.maxIndex } // end a Test -- the fact that we got here means there // was no failure. // print the Passed message, // and some performance numbers. func (cfg *config) end() { cfg.checkTimeout() if cfg.t.Failed() == false { cfg.mu.Lock() t := time.Since(cfg.t0).Seconds() // real time npeers := cfg.n // number of Raft peers nrpc := cfg.rpcTotal() - cfg.rpcs0 // number of RPC sends nbytes := cfg.bytesTotal() - cfg.bytes0 // number of bytes ncmds := cfg.maxIndex - cfg.maxIndex0 // number of Raft agreements reported cfg.mu.Unlock() fmt.Printf(" ... Passed --") fmt.Printf(" %4.1f %d %4d %7d %4d\n", t, npeers, nrpc, nbytes, ncmds) } } // Maximum log size across all servers func (cfg *config) LogSize() int { logsize := 0 for i := 0; i < cfg.n; i++ { n := cfg.saved[i].RaftStateSize() if n > logsize { logsize = n } } return logsize }