package kvraft import "6.5840/porcupine" import "6.5840/models" import "testing" import "strconv" import "time" import "math/rand" import "strings" import "sync" import "sync/atomic" import "fmt" import "io/ioutil" // The tester generously allows solutions to complete elections in one second // (much more than the paper's range of timeouts). const electionTimeout = 1 * time.Second const linearizabilityCheckTimeout = 1 * time.Second type OpLog struct { operations []porcupine.Operation sync.Mutex } func (log *OpLog) Append(op porcupine.Operation) { log.Lock() defer log.Unlock() log.operations = append(log.operations, op) } func (log *OpLog) Read() []porcupine.Operation { log.Lock() defer log.Unlock() ops := make([]porcupine.Operation, len(log.operations)) copy(ops, log.operations) return ops } // to make sure timestamps use the monotonic clock, instead of computing // absolute timestamps with `time.Now().UnixNano()` (which uses the wall // clock), we measure time relative to `t0` using `time.Since(t0)`, which uses // the monotonic clock var t0 = time.Now() // get/put/putappend that keep counts func Get(cfg *config, ck *Clerk, key string, log *OpLog, cli int) string { start := int64(time.Since(t0)) v := ck.Get(key) end := int64(time.Since(t0)) cfg.op() if log != nil { log.Append(porcupine.Operation{ Input: models.KvInput{Op: 0, Key: key}, Output: models.KvOutput{Value: v}, Call: start, Return: end, ClientId: cli, }) } return v } func Put(cfg *config, ck *Clerk, key string, value string, log *OpLog, cli int) { start := int64(time.Since(t0)) ck.Put(key, value) end := int64(time.Since(t0)) cfg.op() if log != nil { log.Append(porcupine.Operation{ Input: models.KvInput{Op: 1, Key: key, Value: value}, Output: models.KvOutput{}, Call: start, Return: end, ClientId: cli, }) } } func Append(cfg *config, ck *Clerk, key string, value string, log *OpLog, cli int) { start := int64(time.Since(t0)) ck.Append(key, value) end := int64(time.Since(t0)) cfg.op() if log != nil { log.Append(porcupine.Operation{ Input: models.KvInput{Op: 2, Key: key, Value: value}, Output: models.KvOutput{}, Call: start, Return: end, ClientId: cli, }) } } func check(cfg *config, t *testing.T, ck *Clerk, key string, value string) { v := Get(cfg, ck, key, nil, -1) if v != value { t.Fatalf("Get(%v): expected:\n%v\nreceived:\n%v", key, value, v) } } // a client runs the function f and then signals it is done func run_client(t *testing.T, cfg *config, me int, ca chan bool, fn func(me int, ck *Clerk, t *testing.T)) { ok := false defer func() { ca <- ok }() ck := cfg.makeClient(cfg.All()) fn(me, ck, t) ok = true cfg.deleteClient(ck) } // spawn ncli clients and wait until they are all done func spawn_clients_and_wait(t *testing.T, cfg *config, ncli int, fn func(me int, ck *Clerk, t *testing.T)) { ca := make([]chan bool, ncli) for cli := 0; cli < ncli; cli++ { ca[cli] = make(chan bool) go run_client(t, cfg, cli, ca[cli], fn) } // log.Printf("spawn_clients_and_wait: waiting for clients") for cli := 0; cli < ncli; cli++ { ok := <-ca[cli] // log.Printf("spawn_clients_and_wait: client %d is done\n", cli) if ok == false { t.Fatalf("failure") } } } // predict effect of Append(k, val) if old value is prev. func NextValue(prev string, val string) string { return prev + val } // check that for a specific client all known appends are present in a value, // and in order func checkClntAppends(t *testing.T, clnt int, v string, count int) { lastoff := -1 for j := 0; j < count; j++ { wanted := "x " + strconv.Itoa(clnt) + " " + strconv.Itoa(j) + " y" off := strings.Index(v, wanted) if off < 0 { t.Fatalf("%v missing element %v in Append result %v", clnt, wanted, v) } off1 := strings.LastIndex(v, wanted) if off1 != off { t.Fatalf("duplicate element %v in Append result", wanted) } if off <= lastoff { t.Fatalf("wrong order for element %v in Append result", wanted) } lastoff = off } } // check that all known appends are present in a value, // and are in order for each concurrent client. func checkConcurrentAppends(t *testing.T, v string, counts []int) { nclients := len(counts) for i := 0; i < nclients; i++ { lastoff := -1 for j := 0; j < counts[i]; j++ { wanted := "x " + strconv.Itoa(i) + " " + strconv.Itoa(j) + " y" off := strings.Index(v, wanted) if off < 0 { t.Fatalf("%v missing element %v in Append result %v", i, wanted, v) } off1 := strings.LastIndex(v, wanted) if off1 != off { t.Fatalf("duplicate element %v in Append result", wanted) } if off <= lastoff { t.Fatalf("wrong order for element %v in Append result", wanted) } lastoff = off } } } // repartition the servers periodically func partitioner(t *testing.T, cfg *config, ch chan bool, done *int32) { defer func() { ch <- true }() for atomic.LoadInt32(done) == 0 { a := make([]int, cfg.n) for i := 0; i < cfg.n; i++ { a[i] = (rand.Int() % 2) } pa := make([][]int, 2) for i := 0; i < 2; i++ { pa[i] = make([]int, 0) for j := 0; j < cfg.n; j++ { if a[j] == i { pa[i] = append(pa[i], j) } } } cfg.partition(pa[0], pa[1]) time.Sleep(electionTimeout + time.Duration(rand.Int63()%200)*time.Millisecond) } } // Basic test is as follows: one or more clients submitting Append/Get // operations to set of servers for some period of time. After the period is // over, test checks that all appended values are present and in order for a // particular key. If unreliable is set, RPCs may fail. If crash is set, the // servers crash after the period is over and restart. If partitions is set, // the test repartitions the network concurrently with the clients and servers. If // maxraftstate is a positive number, the size of the state for Raft (i.e., log // size) shouldn't exceed 8*maxraftstate. If maxraftstate is negative, // snapshots shouldn't be used. func GenericTest(t *testing.T, part string, nclients int, nservers int, unreliable bool, crash bool, partitions bool, maxraftstate int, randomkeys bool) { title := "Test: " if unreliable { // the network drops RPC requests and replies. title = title + "unreliable net, " } if crash { // peers re-start, and thus persistence must work. title = title + "restarts, " } if partitions { // the network may partition title = title + "partitions, " } if maxraftstate != -1 { title = title + "snapshots, " } if randomkeys { title = title + "random keys, " } if nclients > 1 { title = title + "many clients" } else { title = title + "one client" } title = title + " (" + part + ")" // 4A or 4B cfg := make_config(t, nservers, unreliable, maxraftstate) defer cfg.cleanup() cfg.begin(title) opLog := &OpLog{} ck := cfg.makeClient(cfg.All()) done_partitioner := int32(0) done_clients := int32(0) ch_partitioner := make(chan bool) clnts := make([]chan int, nclients) for i := 0; i < nclients; i++ { clnts[i] = make(chan int) } for i := 0; i < 3; i++ { // log.Printf("Iteration %v\n", i) atomic.StoreInt32(&done_clients, 0) atomic.StoreInt32(&done_partitioner, 0) go spawn_clients_and_wait(t, cfg, nclients, func(cli int, myck *Clerk, t *testing.T) { j := 0 defer func() { clnts[cli] <- j }() last := "" // only used when not randomkeys if !randomkeys { Put(cfg, myck, strconv.Itoa(cli), last, opLog, cli) } for atomic.LoadInt32(&done_clients) == 0 { var key string if randomkeys { key = strconv.Itoa(rand.Intn(nclients)) } else { key = strconv.Itoa(cli) } nv := "x " + strconv.Itoa(cli) + " " + strconv.Itoa(j) + " y" if (rand.Int() % 1000) < 500 { // log.Printf("%d: client new append %v\n", cli, nv) Append(cfg, myck, key, nv, opLog, cli) if !randomkeys { last = NextValue(last, nv) } j++ } else if randomkeys && (rand.Int()%1000) < 100 { // we only do this when using random keys, because it would break the // check done after Get() operations Put(cfg, myck, key, nv, opLog, cli) j++ } else { // log.Printf("%d: client new get %v\n", cli, key) v := Get(cfg, myck, key, opLog, cli) // the following check only makes sense when we're not using random keys if !randomkeys && v != last { t.Fatalf("get wrong value, key %v, wanted:\n%v\n, got\n%v\n", key, last, v) } } } }) if partitions { // Allow the clients to perform some operations without interruption time.Sleep(1 * time.Second) go partitioner(t, cfg, ch_partitioner, &done_partitioner) } time.Sleep(5 * time.Second) atomic.StoreInt32(&done_clients, 1) // tell clients to quit atomic.StoreInt32(&done_partitioner, 1) // tell partitioner to quit if partitions { // log.Printf("wait for partitioner\n") <-ch_partitioner // reconnect network and submit a request. A client may // have submitted a request in a minority. That request // won't return until that server discovers a new term // has started. cfg.ConnectAll() // wait for a while so that we have a new term time.Sleep(electionTimeout) } if crash { // log.Printf("shutdown servers\n") for i := 0; i < nservers; i++ { cfg.ShutdownServer(i) } // Wait for a while for servers to shutdown, since // shutdown isn't a real crash and isn't instantaneous time.Sleep(electionTimeout) // log.Printf("restart servers\n") // crash and re-start all for i := 0; i < nservers; i++ { cfg.StartServer(i) } cfg.ConnectAll() } // log.Printf("wait for clients\n") for i := 0; i < nclients; i++ { // log.Printf("read from clients %d\n", i) j := <-clnts[i] // if j < 10 { // log.Printf("Warning: client %d managed to perform only %d put operations in 1 sec?\n", i, j) // } key := strconv.Itoa(i) // log.Printf("Check %v for client %d\n", j, i) v := Get(cfg, ck, key, opLog, 0) if !randomkeys { checkClntAppends(t, i, v, j) } } if maxraftstate > 0 { // Check maximum after the servers have processed all client // requests and had time to checkpoint. sz := cfg.LogSize() if sz > 8*maxraftstate { t.Fatalf("logs were not trimmed (%v > 8*%v)", sz, maxraftstate) } } if maxraftstate < 0 { // Check that snapshots are not used ssz := cfg.SnapshotSize() if ssz > 0 { t.Fatalf("snapshot too large (%v), should not be used when maxraftstate = %d", ssz, maxraftstate) } } } res, info := porcupine.CheckOperationsVerbose(models.KvModel, opLog.Read(), linearizabilityCheckTimeout) if res == porcupine.Illegal { file, err := ioutil.TempFile("", "*.html") if err != nil { fmt.Printf("info: failed to create temp file for visualization") } else { err = porcupine.Visualize(models.KvModel, info, file) if err != nil { fmt.Printf("info: failed to write history visualization to %s\n", file.Name()) } else { fmt.Printf("info: wrote history visualization to %s\n", file.Name()) } } t.Fatal("history is not linearizable") } else if res == porcupine.Unknown { fmt.Println("info: linearizability check timed out, assuming history is ok") } cfg.end() } // Check that ops are committed fast enough, better than 1 per heartbeat interval func GenericTestSpeed(t *testing.T, part string, maxraftstate int) { const nservers = 3 const numOps = 1000 cfg := make_config(t, nservers, false, maxraftstate) defer cfg.cleanup() ck := cfg.makeClient(cfg.All()) cfg.begin(fmt.Sprintf("Test: ops complete fast enough (%s)", part)) // wait until first op completes, so we know a leader is elected // and KV servers are ready to process client requests ck.Get("x") start := time.Now() for i := 0; i < numOps; i++ { ck.Append("x", "x 0 "+strconv.Itoa(i)+" y") } dur := time.Since(start) v := ck.Get("x") checkClntAppends(t, 0, v, numOps) // heartbeat interval should be ~ 100 ms; require at least 3 ops per const heartbeatInterval = 100 * time.Millisecond const opsPerInterval = 3 const timePerOp = heartbeatInterval / opsPerInterval if dur > numOps*timePerOp { t.Fatalf("Operations completed too slowly %v/op > %v/op\n", dur/numOps, timePerOp) } cfg.end() } func TestBasic4A(t *testing.T) { // Test: one client (4A) ... GenericTest(t, "4A", 1, 5, false, false, false, -1, false) } func TestSpeed4A(t *testing.T) { GenericTestSpeed(t, "4A", -1) } func TestConcurrent4A(t *testing.T) { // Test: many clients (4A) ... GenericTest(t, "4A", 5, 5, false, false, false, -1, false) } func TestUnreliable4A(t *testing.T) { // Test: unreliable net, many clients (4A) ... GenericTest(t, "4A", 5, 5, true, false, false, -1, false) } func TestUnreliableOneKey4A(t *testing.T) { const nservers = 3 cfg := make_config(t, nservers, true, -1) defer cfg.cleanup() ck := cfg.makeClient(cfg.All()) cfg.begin("Test: concurrent append to same key, unreliable (4A)") Put(cfg, ck, "k", "", nil, -1) const nclient = 5 const upto = 10 spawn_clients_and_wait(t, cfg, nclient, func(me int, myck *Clerk, t *testing.T) { n := 0 for n < upto { Append(cfg, myck, "k", "x "+strconv.Itoa(me)+" "+strconv.Itoa(n)+" y", nil, -1) n++ } }) var counts []int for i := 0; i < nclient; i++ { counts = append(counts, upto) } vx := Get(cfg, ck, "k", nil, -1) checkConcurrentAppends(t, vx, counts) cfg.end() } // Submit a request in the minority partition and check that the requests // doesn't go through until the partition heals. The leader in the original // network ends up in the minority partition. func TestOnePartition4A(t *testing.T) { const nservers = 5 cfg := make_config(t, nservers, false, -1) defer cfg.cleanup() ck := cfg.makeClient(cfg.All()) Put(cfg, ck, "1", "13", nil, -1) cfg.begin("Test: progress in majority (4A)") p1, p2 := cfg.make_partition() cfg.partition(p1, p2) ckp1 := cfg.makeClient(p1) // connect ckp1 to p1 ckp2a := cfg.makeClient(p2) // connect ckp2a to p2 ckp2b := cfg.makeClient(p2) // connect ckp2b to p2 Put(cfg, ckp1, "1", "14", nil, -1) check(cfg, t, ckp1, "1", "14") cfg.end() done0 := make(chan bool) done1 := make(chan bool) cfg.begin("Test: no progress in minority (4A)") go func() { Put(cfg, ckp2a, "1", "15", nil, -1) done0 <- true }() go func() { Get(cfg, ckp2b, "1", nil, -1) // different clerk in p2 done1 <- true }() select { case <-done0: t.Fatalf("Put in minority completed") case <-done1: t.Fatalf("Get in minority completed") case <-time.After(time.Second): } check(cfg, t, ckp1, "1", "14") Put(cfg, ckp1, "1", "16", nil, -1) check(cfg, t, ckp1, "1", "16") cfg.end() cfg.begin("Test: completion after heal (4A)") cfg.ConnectAll() cfg.ConnectClient(ckp2a, cfg.All()) cfg.ConnectClient(ckp2b, cfg.All()) time.Sleep(electionTimeout) select { case <-done0: case <-time.After(30 * 100 * time.Millisecond): t.Fatalf("Put did not complete") } select { case <-done1: case <-time.After(30 * 100 * time.Millisecond): t.Fatalf("Get did not complete") default: } check(cfg, t, ck, "1", "15") cfg.end() } func TestManyPartitionsOneClient4A(t *testing.T) { // Test: partitions, one client (4A) ... GenericTest(t, "4A", 1, 5, false, false, true, -1, false) } func TestManyPartitionsManyClients4A(t *testing.T) { // Test: partitions, many clients (4A) ... GenericTest(t, "4A", 5, 5, false, false, true, -1, false) } func TestPersistOneClient4A(t *testing.T) { // Test: restarts, one client (4A) ... GenericTest(t, "4A", 1, 5, false, true, false, -1, false) } func TestPersistConcurrent4A(t *testing.T) { // Test: restarts, many clients (4A) ... GenericTest(t, "4A", 5, 5, false, true, false, -1, false) } func TestPersistConcurrentUnreliable4A(t *testing.T) { // Test: unreliable net, restarts, many clients (4A) ... GenericTest(t, "4A", 5, 5, true, true, false, -1, false) } func TestPersistPartition4A(t *testing.T) { // Test: restarts, partitions, many clients (4A) ... GenericTest(t, "4A", 5, 5, false, true, true, -1, false) } func TestPersistPartitionUnreliable4A(t *testing.T) { // Test: unreliable net, restarts, partitions, many clients (4A) ... GenericTest(t, "4A", 5, 5, true, true, true, -1, false) } func TestPersistPartitionUnreliableLinearizable4A(t *testing.T) { // Test: unreliable net, restarts, partitions, random keys, many clients (4A) ... GenericTest(t, "4A", 15, 7, true, true, true, -1, true) } // if one server falls behind, then rejoins, does it // recover by using the InstallSnapshot RPC? // also checks that majority discards committed log entries // even if minority doesn't respond. func TestSnapshotRPC4B(t *testing.T) { const nservers = 3 maxraftstate := 1000 cfg := make_config(t, nservers, false, maxraftstate) defer cfg.cleanup() ck := cfg.makeClient(cfg.All()) cfg.begin("Test: InstallSnapshot RPC (4B)") Put(cfg, ck, "a", "A", nil, -1) check(cfg, t, ck, "a", "A") // a bunch of puts into the majority partition. cfg.partition([]int{0, 1}, []int{2}) { ck1 := cfg.makeClient([]int{0, 1}) for i := 0; i < 50; i++ { Put(cfg, ck1, strconv.Itoa(i), strconv.Itoa(i), nil, -1) } time.Sleep(electionTimeout) Put(cfg, ck1, "b", "B", nil, -1) } // check that the majority partition has thrown away // most of its log entries. sz := cfg.LogSize() if sz > 8*maxraftstate { t.Fatalf("logs were not trimmed (%v > 8*%v)", sz, maxraftstate) } // now make group that requires participation of // lagging server, so that it has to catch up. cfg.partition([]int{0, 2}, []int{1}) { ck1 := cfg.makeClient([]int{0, 2}) Put(cfg, ck1, "c", "C", nil, -1) Put(cfg, ck1, "d", "D", nil, -1) check(cfg, t, ck1, "a", "A") check(cfg, t, ck1, "b", "B") check(cfg, t, ck1, "1", "1") check(cfg, t, ck1, "49", "49") } // now everybody cfg.partition([]int{0, 1, 2}, []int{}) Put(cfg, ck, "e", "E", nil, -1) check(cfg, t, ck, "c", "C") check(cfg, t, ck, "e", "E") check(cfg, t, ck, "1", "1") cfg.end() } // are the snapshots not too huge? 500 bytes is a generous bound for the // operations we're doing here. func TestSnapshotSize4B(t *testing.T) { const nservers = 3 maxraftstate := 1000 maxsnapshotstate := 500 cfg := make_config(t, nservers, false, maxraftstate) defer cfg.cleanup() ck := cfg.makeClient(cfg.All()) cfg.begin("Test: snapshot size is reasonable (4B)") for i := 0; i < 200; i++ { Put(cfg, ck, "x", "0", nil, -1) check(cfg, t, ck, "x", "0") Put(cfg, ck, "x", "1", nil, -1) check(cfg, t, ck, "x", "1") } // check that servers have thrown away most of their log entries sz := cfg.LogSize() if sz > 8*maxraftstate { t.Fatalf("logs were not trimmed (%v > 8*%v)", sz, maxraftstate) } // check that the snapshots are not unreasonably large ssz := cfg.SnapshotSize() if ssz > maxsnapshotstate { t.Fatalf("snapshot too large (%v > %v)", ssz, maxsnapshotstate) } cfg.end() } func TestSpeed4B(t *testing.T) { GenericTestSpeed(t, "4B", 1000) } func TestSnapshotRecover4B(t *testing.T) { // Test: restarts, snapshots, one client (4B) ... GenericTest(t, "4B", 1, 5, false, true, false, 1000, false) } func TestSnapshotRecoverManyClients4B(t *testing.T) { // Test: restarts, snapshots, many clients (4B) ... GenericTest(t, "4B", 20, 5, false, true, false, 1000, false) } func TestSnapshotUnreliable4B(t *testing.T) { // Test: unreliable net, snapshots, many clients (4B) ... GenericTest(t, "4B", 5, 5, true, false, false, 1000, false) } func TestSnapshotUnreliableRecover4B(t *testing.T) { // Test: unreliable net, restarts, snapshots, many clients (4B) ... GenericTest(t, "4B", 5, 5, true, true, false, 1000, false) } func TestSnapshotUnreliableRecoverConcurrentPartition4B(t *testing.T) { // Test: unreliable net, restarts, partitions, snapshots, many clients (4B) ... GenericTest(t, "4B", 5, 5, true, true, true, 1000, false) } func TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable4B(t *testing.T) { // Test: unreliable net, restarts, partitions, snapshots, random keys, many clients (4B) ... GenericTest(t, "4B", 15, 7, true, true, true, 1000, true) }