MapReduce/raft/config.go

649 lines
16 KiB
Go

package raft
//
// support for Raft tester.
//
// we will use the original config.go to test your code for grading.
// so, while you can modify this code to help you debug, please
// test with the original before submitting.
//
import "6.5840/labgob"
import "6.5840/labrpc"
import "bytes"
import "log"
import "sync"
import "sync/atomic"
import "testing"
import "runtime"
import "math/rand"
import crand "crypto/rand"
import "math/big"
import "encoding/base64"
import "time"
import "fmt"
func randstring(n int) string {
b := make([]byte, 2*n)
crand.Read(b)
s := base64.URLEncoding.EncodeToString(b)
return s[0:n]
}
func makeSeed() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := crand.Int(crand.Reader, max)
x := bigx.Int64()
return x
}
type config struct {
mu sync.Mutex
t *testing.T
finished int32
net *labrpc.Network
n int
rafts []*Raft
applyErr []string // from apply channel readers
connected []bool // whether each server is on the net
saved []*Persister
endnames [][]string // the port file names each sends to
logs []map[int]interface{} // copy of each server's committed entries
lastApplied []int
start time.Time // time at which make_config() was called
// begin()/end() statistics
t0 time.Time // time at which test_test.go called cfg.begin()
rpcs0 int // rpcTotal() at start of test
cmds0 int // number of agreements
bytes0 int64
maxIndex int
maxIndex0 int
}
var ncpu_once sync.Once
func make_config(t *testing.T, n int, unreliable bool, snapshot bool) *config {
ncpu_once.Do(func() {
if runtime.NumCPU() < 2 {
fmt.Printf("warning: only one CPU, which may conceal locking bugs\n")
}
rand.Seed(makeSeed())
})
runtime.GOMAXPROCS(4)
cfg := &config{}
cfg.t = t
cfg.net = labrpc.MakeNetwork()
cfg.n = n
cfg.applyErr = make([]string, cfg.n)
cfg.rafts = make([]*Raft, cfg.n)
cfg.connected = make([]bool, cfg.n)
cfg.saved = make([]*Persister, cfg.n)
cfg.endnames = make([][]string, cfg.n)
cfg.logs = make([]map[int]interface{}, cfg.n)
cfg.lastApplied = make([]int, cfg.n)
cfg.start = time.Now()
cfg.setunreliable(unreliable)
cfg.net.LongDelays(true)
applier := cfg.applier
if snapshot {
applier = cfg.applierSnap
}
// create a full set of Rafts.
for i := 0; i < cfg.n; i++ {
cfg.logs[i] = map[int]interface{}{}
cfg.start1(i, applier)
}
// connect everyone
for i := 0; i < cfg.n; i++ {
cfg.connect(i)
}
return cfg
}
// shut down a Raft server but save its persistent state.
func (cfg *config) crash1(i int) {
cfg.disconnect(i)
cfg.net.DeleteServer(i) // disable client connections to the server.
cfg.mu.Lock()
defer cfg.mu.Unlock()
// a fresh persister, in case old instance
// continues to update the Persister.
// but copy old persister's content so that we always
// pass Make() the last persisted state.
if cfg.saved[i] != nil {
cfg.saved[i] = cfg.saved[i].Copy()
}
rf := cfg.rafts[i]
if rf != nil {
cfg.mu.Unlock()
rf.Kill()
cfg.mu.Lock()
cfg.rafts[i] = nil
}
if cfg.saved[i] != nil {
raftlog := cfg.saved[i].ReadRaftState()
snapshot := cfg.saved[i].ReadSnapshot()
cfg.saved[i] = &Persister{}
cfg.saved[i].Save(raftlog, snapshot)
}
}
func (cfg *config) checkLogs(i int, m ApplyMsg) (string, bool) {
err_msg := ""
v := m.Command
for j := 0; j < len(cfg.logs); j++ {
if old, oldok := cfg.logs[j][m.CommandIndex]; oldok && old != v {
log.Printf("%v: log %v; server %v\n", i, cfg.logs[i], cfg.logs[j])
// some server has already committed a different value for this entry!
err_msg = fmt.Sprintf("commit index=%v server=%v %v != server=%v %v",
m.CommandIndex, i, m.Command, j, old)
}
}
_, prevok := cfg.logs[i][m.CommandIndex-1]
cfg.logs[i][m.CommandIndex] = v
if m.CommandIndex > cfg.maxIndex {
cfg.maxIndex = m.CommandIndex
}
return err_msg, prevok
}
// applier reads message from apply ch and checks that they match the log
// contents
func (cfg *config) applier(i int, applyCh chan ApplyMsg) {
for m := range applyCh {
if m.CommandValid == false {
// ignore other types of ApplyMsg
} else {
cfg.mu.Lock()
err_msg, prevok := cfg.checkLogs(i, m)
cfg.mu.Unlock()
if m.CommandIndex > 1 && prevok == false {
err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex)
}
if err_msg != "" {
log.Fatalf("apply error: %v", err_msg)
cfg.applyErr[i] = err_msg
// keep reading after error so that Raft doesn't block
// holding locks...
}
}
}
}
// returns "" or error string
func (cfg *config) ingestSnap(i int, snapshot []byte, index int) string {
if snapshot == nil {
log.Fatalf("nil snapshot")
return "nil snapshot"
}
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
var lastIncludedIndex int
var xlog []interface{}
if d.Decode(&lastIncludedIndex) != nil ||
d.Decode(&xlog) != nil {
log.Fatalf("snapshot decode error")
return "snapshot Decode() error"
}
if index != -1 && index != lastIncludedIndex {
err := fmt.Sprintf("server %v snapshot doesn't match m.SnapshotIndex", i)
return err
}
cfg.logs[i] = map[int]interface{}{}
for j := 0; j < len(xlog); j++ {
cfg.logs[i][j] = xlog[j]
}
cfg.lastApplied[i] = lastIncludedIndex
return ""
}
const SnapShotInterval = 10
// periodically snapshot raft state
func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
cfg.mu.Lock()
rf := cfg.rafts[i]
cfg.mu.Unlock()
if rf == nil {
return // ???
}
for m := range applyCh {
err_msg := ""
if m.SnapshotValid {
cfg.mu.Lock()
err_msg = cfg.ingestSnap(i, m.Snapshot, m.SnapshotIndex)
cfg.mu.Unlock()
} else if m.CommandValid {
if m.CommandIndex != cfg.lastApplied[i]+1 {
err_msg = fmt.Sprintf("server %v apply out of order, expected index %v, got %v", i, cfg.lastApplied[i]+1, m.CommandIndex)
}
if err_msg == "" {
cfg.mu.Lock()
var prevok bool
err_msg, prevok = cfg.checkLogs(i, m)
cfg.mu.Unlock()
if m.CommandIndex > 1 && prevok == false {
err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex)
}
}
cfg.mu.Lock()
cfg.lastApplied[i] = m.CommandIndex
cfg.mu.Unlock()
if (m.CommandIndex+1)%SnapShotInterval == 0 {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(m.CommandIndex)
var xlog []interface{}
for j := 0; j <= m.CommandIndex; j++ {
xlog = append(xlog, cfg.logs[i][j])
}
e.Encode(xlog)
rf.Snapshot(m.CommandIndex, w.Bytes())
}
} else {
// Ignore other types of ApplyMsg.
}
if err_msg != "" {
log.Fatalf("apply error: %v", err_msg)
cfg.applyErr[i] = err_msg
// keep reading after error so that Raft doesn't block
// holding locks...
}
}
}
// start or re-start a Raft.
// if one already exists, "kill" it first.
// allocate new outgoing port file names, and a new
// state persister, to isolate previous instance of
// this server. since we cannot really kill it.
func (cfg *config) start1(i int, applier func(int, chan ApplyMsg)) {
cfg.crash1(i)
// a fresh set of outgoing ClientEnd names.
// so that old crashed instance's ClientEnds can't send.
cfg.endnames[i] = make([]string, cfg.n)
for j := 0; j < cfg.n; j++ {
cfg.endnames[i][j] = randstring(20)
}
// a fresh set of ClientEnds.
ends := make([]*labrpc.ClientEnd, cfg.n)
for j := 0; j < cfg.n; j++ {
ends[j] = cfg.net.MakeEnd(cfg.endnames[i][j])
cfg.net.Connect(cfg.endnames[i][j], j)
}
cfg.mu.Lock()
cfg.lastApplied[i] = 0
// a fresh persister, so old instance doesn't overwrite
// new instance's persisted state.
// but copy old persister's content so that we always
// pass Make() the last persisted state.
if cfg.saved[i] != nil {
cfg.saved[i] = cfg.saved[i].Copy()
snapshot := cfg.saved[i].ReadSnapshot()
if snapshot != nil && len(snapshot) > 0 {
// mimic KV server and process snapshot now.
// ideally Raft should send it up on applyCh...
err := cfg.ingestSnap(i, snapshot, -1)
if err != "" {
cfg.t.Fatal(err)
}
}
} else {
cfg.saved[i] = MakePersister()
}
cfg.mu.Unlock()
applyCh := make(chan ApplyMsg)
rf := Make(ends, i, cfg.saved[i], applyCh)
cfg.mu.Lock()
cfg.rafts[i] = rf
cfg.mu.Unlock()
go applier(i, applyCh)
svc := labrpc.MakeService(rf)
srv := labrpc.MakeServer()
srv.AddService(svc)
cfg.net.AddServer(i, srv)
}
func (cfg *config) checkTimeout() {
// enforce a two minute real-time limit on each test
if !cfg.t.Failed() && time.Since(cfg.start) > 120*time.Second {
cfg.t.Fatal("test took longer than 120 seconds")
}
}
func (cfg *config) checkFinished() bool {
z := atomic.LoadInt32(&cfg.finished)
return z != 0
}
func (cfg *config) cleanup() {
atomic.StoreInt32(&cfg.finished, 1)
for i := 0; i < len(cfg.rafts); i++ {
if cfg.rafts[i] != nil {
cfg.rafts[i].Kill()
}
}
cfg.net.Cleanup()
cfg.checkTimeout()
}
// attach server i to the net.
func (cfg *config) connect(i int) {
// fmt.Printf("connect(%d)\n", i)
cfg.connected[i] = true
// outgoing ClientEnds
for j := 0; j < cfg.n; j++ {
if cfg.connected[j] {
endname := cfg.endnames[i][j]
cfg.net.Enable(endname, true)
}
}
// incoming ClientEnds
for j := 0; j < cfg.n; j++ {
if cfg.connected[j] {
endname := cfg.endnames[j][i]
cfg.net.Enable(endname, true)
}
}
}
// detach server i from the net.
func (cfg *config) disconnect(i int) {
// fmt.Printf("disconnect(%d)\n", i)
cfg.connected[i] = false
// outgoing ClientEnds
for j := 0; j < cfg.n; j++ {
if cfg.endnames[i] != nil {
endname := cfg.endnames[i][j]
cfg.net.Enable(endname, false)
}
}
// incoming ClientEnds
for j := 0; j < cfg.n; j++ {
if cfg.endnames[j] != nil {
endname := cfg.endnames[j][i]
cfg.net.Enable(endname, false)
}
}
}
func (cfg *config) rpcCount(server int) int {
return cfg.net.GetCount(server)
}
func (cfg *config) rpcTotal() int {
return cfg.net.GetTotalCount()
}
func (cfg *config) setunreliable(unrel bool) {
cfg.net.Reliable(!unrel)
}
func (cfg *config) bytesTotal() int64 {
return cfg.net.GetTotalBytes()
}
func (cfg *config) setlongreordering(longrel bool) {
cfg.net.LongReordering(longrel)
}
// check that one of the connected servers thinks
// it is the leader, and that no other connected
// server thinks otherwise.
//
// try a few times in case re-elections are needed.
func (cfg *config) checkOneLeader() int {
for iters := 0; iters < 10; iters++ {
ms := 450 + (rand.Int63() % 100)
time.Sleep(time.Duration(ms) * time.Millisecond)
leaders := make(map[int][]int)
for i := 0; i < cfg.n; i++ {
if cfg.connected[i] {
if term, leader := cfg.rafts[i].GetState(); leader {
leaders[term] = append(leaders[term], i)
}
}
}
lastTermWithLeader := -1
for term, leaders := range leaders {
if len(leaders) > 1 {
cfg.t.Fatalf("term %d has %d (>1) leaders", term, len(leaders))
}
if term > lastTermWithLeader {
lastTermWithLeader = term
}
}
if len(leaders) != 0 {
return leaders[lastTermWithLeader][0]
}
}
cfg.t.Fatalf("expected one leader, got none")
return -1
}
// check that everyone agrees on the term.
func (cfg *config) checkTerms() int {
term := -1
for i := 0; i < cfg.n; i++ {
if cfg.connected[i] {
xterm, _ := cfg.rafts[i].GetState()
if term == -1 {
term = xterm
} else if term != xterm {
cfg.t.Fatalf("servers disagree on term")
}
}
}
return term
}
// check that none of the connected servers
// thinks it is the leader.
func (cfg *config) checkNoLeader() {
for i := 0; i < cfg.n; i++ {
if cfg.connected[i] {
_, is_leader := cfg.rafts[i].GetState()
if is_leader {
cfg.t.Fatalf("expected no leader among connected servers, but %v claims to be leader", i)
}
}
}
}
// how many servers think a log entry is committed?
func (cfg *config) nCommitted(index int) (int, interface{}) {
count := 0
var cmd interface{} = nil
for i := 0; i < len(cfg.rafts); i++ {
if cfg.applyErr[i] != "" {
cfg.t.Fatal(cfg.applyErr[i])
}
cfg.mu.Lock()
cmd1, ok := cfg.logs[i][index]
cfg.mu.Unlock()
if ok {
if count > 0 && cmd != cmd1 {
cfg.t.Fatalf("committed values do not match: index %v, %v, %v",
index, cmd, cmd1)
}
count += 1
cmd = cmd1
}
}
return count, cmd
}
// wait for at least n servers to commit.
// but don't wait forever.
func (cfg *config) wait(index int, n int, startTerm int) interface{} {
to := 10 * time.Millisecond
for iters := 0; iters < 30; iters++ {
nd, _ := cfg.nCommitted(index)
if nd >= n {
break
}
time.Sleep(to)
if to < time.Second {
to *= 2
}
if startTerm > -1 {
for _, r := range cfg.rafts {
if t, _ := r.GetState(); t > startTerm {
// someone has moved on
// can no longer guarantee that we'll "win"
return -1
}
}
}
}
nd, cmd := cfg.nCommitted(index)
if nd < n {
cfg.t.Fatalf("only %d decided for index %d; wanted %d",
nd, index, n)
}
return cmd
}
// do a complete agreement.
// it might choose the wrong leader initially,
// and have to re-submit after giving up.
// entirely gives up after about 10 seconds.
// indirectly checks that the servers agree on the
// same value, since nCommitted() checks this,
// as do the threads that read from applyCh.
// returns index.
// if retry==true, may submit the command multiple
// times, in case a leader fails just after Start().
// if retry==false, calls Start() only once, in order
// to simplify the early Lab 3B tests.
func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int {
t0 := time.Now()
starts := 0
for time.Since(t0).Seconds() < 10 && cfg.checkFinished() == false {
// try all the servers, maybe one is the leader.
index := -1
for si := 0; si < cfg.n; si++ {
starts = (starts + 1) % cfg.n
var rf *Raft
cfg.mu.Lock()
if cfg.connected[starts] {
rf = cfg.rafts[starts]
}
cfg.mu.Unlock()
if rf != nil {
index1, _, ok := rf.Start(cmd)
if ok {
index = index1
break
}
}
}
if index != -1 {
// somebody claimed to be the leader and to have
// submitted our command; wait a while for agreement.
t1 := time.Now()
for time.Since(t1).Seconds() < 2 {
nd, cmd1 := cfg.nCommitted(index)
if nd > 0 && nd >= expectedServers {
// committed
if cmd1 == cmd {
// and it was the command we submitted.
return index
}
}
time.Sleep(20 * time.Millisecond)
}
if retry == false {
cfg.t.Fatalf("one(%v) failed to reach agreement", cmd)
}
} else {
time.Sleep(50 * time.Millisecond)
}
}
if cfg.checkFinished() == false {
cfg.t.Fatalf("one(%v) failed to reach agreement", cmd)
}
return -1
}
// start a Test.
// print the Test message.
// e.g. cfg.begin("Test (3B): RPC counts aren't too high")
func (cfg *config) begin(description string) {
fmt.Printf("%s ...\n", description)
cfg.t0 = time.Now()
cfg.rpcs0 = cfg.rpcTotal()
cfg.bytes0 = cfg.bytesTotal()
cfg.cmds0 = 0
cfg.maxIndex0 = cfg.maxIndex
}
// end a Test -- the fact that we got here means there
// was no failure.
// print the Passed message,
// and some performance numbers.
func (cfg *config) end() {
cfg.checkTimeout()
if cfg.t.Failed() == false {
cfg.mu.Lock()
t := time.Since(cfg.t0).Seconds() // real time
npeers := cfg.n // number of Raft peers
nrpc := cfg.rpcTotal() - cfg.rpcs0 // number of RPC sends
nbytes := cfg.bytesTotal() - cfg.bytes0 // number of bytes
ncmds := cfg.maxIndex - cfg.maxIndex0 // number of Raft agreements reported
cfg.mu.Unlock()
fmt.Printf(" ... Passed --")
fmt.Printf(" %4.1f %d %4d %7d %4d\n", t, npeers, nrpc, nbytes, ncmds)
}
}
// Maximum log size across all servers
func (cfg *config) LogSize() int {
logsize := 0
for i := 0; i < cfg.n; i++ {
n := cfg.saved[i].RaftStateSize()
if n > logsize {
logsize = n
}
}
return logsize
}