358 lines
8.1 KiB
Go
358 lines
8.1 KiB
Go
package shardctrler
|
|
|
|
import "6.5840/labrpc"
|
|
import "6.5840/raft"
|
|
import "testing"
|
|
import "os"
|
|
|
|
// import "log"
|
|
import crand "crypto/rand"
|
|
import "math/rand"
|
|
import "encoding/base64"
|
|
import "sync"
|
|
import "runtime"
|
|
import "time"
|
|
|
|
func randstring(n int) string {
|
|
b := make([]byte, 2*n)
|
|
crand.Read(b)
|
|
s := base64.URLEncoding.EncodeToString(b)
|
|
return s[0:n]
|
|
}
|
|
|
|
// Randomize server handles
|
|
func random_handles(kvh []*labrpc.ClientEnd) []*labrpc.ClientEnd {
|
|
sa := make([]*labrpc.ClientEnd, len(kvh))
|
|
copy(sa, kvh)
|
|
for i := range sa {
|
|
j := rand.Intn(i + 1)
|
|
sa[i], sa[j] = sa[j], sa[i]
|
|
}
|
|
return sa
|
|
}
|
|
|
|
type config struct {
|
|
mu sync.Mutex
|
|
t *testing.T
|
|
net *labrpc.Network
|
|
n int
|
|
servers []*ShardCtrler
|
|
saved []*raft.Persister
|
|
endnames [][]string // names of each server's sending ClientEnds
|
|
clerks map[*Clerk][]string
|
|
nextClientId int
|
|
start time.Time // time at which make_config() was called
|
|
}
|
|
|
|
func (cfg *config) checkTimeout() {
|
|
// enforce a two minute real-time limit on each test
|
|
if !cfg.t.Failed() && time.Since(cfg.start) > 120*time.Second {
|
|
cfg.t.Fatal("test took longer than 120 seconds")
|
|
}
|
|
}
|
|
|
|
func (cfg *config) cleanup() {
|
|
cfg.mu.Lock()
|
|
defer cfg.mu.Unlock()
|
|
for i := 0; i < len(cfg.servers); i++ {
|
|
if cfg.servers[i] != nil {
|
|
cfg.servers[i].Kill()
|
|
}
|
|
}
|
|
cfg.net.Cleanup()
|
|
cfg.checkTimeout()
|
|
}
|
|
|
|
// Maximum log size across all servers
|
|
func (cfg *config) LogSize() int {
|
|
logsize := 0
|
|
for i := 0; i < cfg.n; i++ {
|
|
n := cfg.saved[i].RaftStateSize()
|
|
if n > logsize {
|
|
logsize = n
|
|
}
|
|
}
|
|
return logsize
|
|
}
|
|
|
|
// attach server i to servers listed in to
|
|
// caller must hold cfg.mu
|
|
func (cfg *config) connectUnlocked(i int, to []int) {
|
|
// log.Printf("connect peer %d to %v\n", i, to)
|
|
|
|
// outgoing socket files
|
|
for j := 0; j < len(to); j++ {
|
|
endname := cfg.endnames[i][to[j]]
|
|
cfg.net.Enable(endname, true)
|
|
}
|
|
|
|
// incoming socket files
|
|
for j := 0; j < len(to); j++ {
|
|
endname := cfg.endnames[to[j]][i]
|
|
cfg.net.Enable(endname, true)
|
|
}
|
|
}
|
|
|
|
func (cfg *config) connect(i int, to []int) {
|
|
cfg.mu.Lock()
|
|
defer cfg.mu.Unlock()
|
|
cfg.connectUnlocked(i, to)
|
|
}
|
|
|
|
// detach server i from the servers listed in from
|
|
// caller must hold cfg.mu
|
|
func (cfg *config) disconnectUnlocked(i int, from []int) {
|
|
// log.Printf("disconnect peer %d from %v\n", i, from)
|
|
|
|
// outgoing socket files
|
|
for j := 0; j < len(from); j++ {
|
|
if cfg.endnames[i] != nil {
|
|
endname := cfg.endnames[i][from[j]]
|
|
cfg.net.Enable(endname, false)
|
|
}
|
|
}
|
|
|
|
// incoming socket files
|
|
for j := 0; j < len(from); j++ {
|
|
if cfg.endnames[j] != nil {
|
|
endname := cfg.endnames[from[j]][i]
|
|
cfg.net.Enable(endname, false)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cfg *config) disconnect(i int, from []int) {
|
|
cfg.mu.Lock()
|
|
defer cfg.mu.Unlock()
|
|
cfg.disconnectUnlocked(i, from)
|
|
}
|
|
|
|
func (cfg *config) All() []int {
|
|
all := make([]int, cfg.n)
|
|
for i := 0; i < cfg.n; i++ {
|
|
all[i] = i
|
|
}
|
|
return all
|
|
}
|
|
|
|
func (cfg *config) ConnectAll() {
|
|
cfg.mu.Lock()
|
|
defer cfg.mu.Unlock()
|
|
for i := 0; i < cfg.n; i++ {
|
|
cfg.connectUnlocked(i, cfg.All())
|
|
}
|
|
}
|
|
|
|
// Sets up 2 partitions with connectivity between servers in each partition.
|
|
func (cfg *config) partition(p1 []int, p2 []int) {
|
|
cfg.mu.Lock()
|
|
defer cfg.mu.Unlock()
|
|
// log.Printf("partition servers into: %v %v\n", p1, p2)
|
|
for i := 0; i < len(p1); i++ {
|
|
cfg.disconnectUnlocked(p1[i], p2)
|
|
cfg.connectUnlocked(p1[i], p1)
|
|
}
|
|
for i := 0; i < len(p2); i++ {
|
|
cfg.disconnectUnlocked(p2[i], p1)
|
|
cfg.connectUnlocked(p2[i], p2)
|
|
}
|
|
}
|
|
|
|
// Create a clerk with clerk specific server names.
|
|
// Give it connections to all of the servers, but for
|
|
// now enable only connections to servers in to[].
|
|
func (cfg *config) makeClient(to []int) *Clerk {
|
|
cfg.mu.Lock()
|
|
defer cfg.mu.Unlock()
|
|
|
|
// a fresh set of ClientEnds.
|
|
ends := make([]*labrpc.ClientEnd, cfg.n)
|
|
endnames := make([]string, cfg.n)
|
|
for j := 0; j < cfg.n; j++ {
|
|
endnames[j] = randstring(20)
|
|
ends[j] = cfg.net.MakeEnd(endnames[j])
|
|
cfg.net.Connect(endnames[j], j)
|
|
}
|
|
|
|
ck := MakeClerk(random_handles(ends))
|
|
cfg.clerks[ck] = endnames
|
|
cfg.nextClientId++
|
|
cfg.ConnectClientUnlocked(ck, to)
|
|
return ck
|
|
}
|
|
|
|
func (cfg *config) deleteClient(ck *Clerk) {
|
|
cfg.mu.Lock()
|
|
defer cfg.mu.Unlock()
|
|
|
|
v := cfg.clerks[ck]
|
|
for i := 0; i < len(v); i++ {
|
|
os.Remove(v[i])
|
|
}
|
|
delete(cfg.clerks, ck)
|
|
}
|
|
|
|
// caller should hold cfg.mu
|
|
func (cfg *config) ConnectClientUnlocked(ck *Clerk, to []int) {
|
|
// log.Printf("ConnectClient %v to %v\n", ck, to)
|
|
endnames := cfg.clerks[ck]
|
|
for j := 0; j < len(to); j++ {
|
|
s := endnames[to[j]]
|
|
cfg.net.Enable(s, true)
|
|
}
|
|
}
|
|
|
|
func (cfg *config) ConnectClient(ck *Clerk, to []int) {
|
|
cfg.mu.Lock()
|
|
defer cfg.mu.Unlock()
|
|
cfg.ConnectClientUnlocked(ck, to)
|
|
}
|
|
|
|
// caller should hold cfg.mu
|
|
func (cfg *config) DisconnectClientUnlocked(ck *Clerk, from []int) {
|
|
// log.Printf("DisconnectClient %v from %v\n", ck, from)
|
|
endnames := cfg.clerks[ck]
|
|
for j := 0; j < len(from); j++ {
|
|
s := endnames[from[j]]
|
|
cfg.net.Enable(s, false)
|
|
}
|
|
}
|
|
|
|
func (cfg *config) DisconnectClient(ck *Clerk, from []int) {
|
|
cfg.mu.Lock()
|
|
defer cfg.mu.Unlock()
|
|
cfg.DisconnectClientUnlocked(ck, from)
|
|
}
|
|
|
|
// Shutdown a server by isolating it
|
|
func (cfg *config) ShutdownServer(i int) {
|
|
cfg.mu.Lock()
|
|
defer cfg.mu.Unlock()
|
|
|
|
cfg.disconnectUnlocked(i, cfg.All())
|
|
|
|
// disable client connections to the server.
|
|
// it's important to do this before creating
|
|
// the new Persister in saved[i], to avoid
|
|
// the possibility of the server returning a
|
|
// positive reply to an Append but persisting
|
|
// the result in the superseded Persister.
|
|
cfg.net.DeleteServer(i)
|
|
|
|
// a fresh persister, in case old instance
|
|
// continues to update the Persister.
|
|
// but copy old persister's content so that we always
|
|
// pass Make() the last persisted state.
|
|
if cfg.saved[i] != nil {
|
|
cfg.saved[i] = cfg.saved[i].Copy()
|
|
}
|
|
|
|
kv := cfg.servers[i]
|
|
if kv != nil {
|
|
cfg.mu.Unlock()
|
|
kv.Kill()
|
|
cfg.mu.Lock()
|
|
cfg.servers[i] = nil
|
|
}
|
|
}
|
|
|
|
// If restart servers, first call ShutdownServer
|
|
func (cfg *config) StartServer(i int) {
|
|
cfg.mu.Lock()
|
|
|
|
// a fresh set of outgoing ClientEnd names.
|
|
cfg.endnames[i] = make([]string, cfg.n)
|
|
for j := 0; j < cfg.n; j++ {
|
|
cfg.endnames[i][j] = randstring(20)
|
|
}
|
|
|
|
// a fresh set of ClientEnds.
|
|
ends := make([]*labrpc.ClientEnd, cfg.n)
|
|
for j := 0; j < cfg.n; j++ {
|
|
ends[j] = cfg.net.MakeEnd(cfg.endnames[i][j])
|
|
cfg.net.Connect(cfg.endnames[i][j], j)
|
|
}
|
|
|
|
// a fresh persister, so old instance doesn't overwrite
|
|
// new instance's persisted state.
|
|
// give the fresh persister a copy of the old persister's
|
|
// state, so that the spec is that we pass StartKVServer()
|
|
// the last persisted state.
|
|
if cfg.saved[i] != nil {
|
|
cfg.saved[i] = cfg.saved[i].Copy()
|
|
} else {
|
|
cfg.saved[i] = raft.MakePersister()
|
|
}
|
|
|
|
cfg.mu.Unlock()
|
|
|
|
cfg.servers[i] = StartServer(ends, i, cfg.saved[i])
|
|
|
|
kvsvc := labrpc.MakeService(cfg.servers[i])
|
|
rfsvc := labrpc.MakeService(cfg.servers[i].rf)
|
|
srv := labrpc.MakeServer()
|
|
srv.AddService(kvsvc)
|
|
srv.AddService(rfsvc)
|
|
cfg.net.AddServer(i, srv)
|
|
}
|
|
|
|
func (cfg *config) Leader() (bool, int) {
|
|
cfg.mu.Lock()
|
|
defer cfg.mu.Unlock()
|
|
|
|
for i := 0; i < cfg.n; i++ {
|
|
if cfg.servers[i] != nil {
|
|
_, is_leader := cfg.servers[i].rf.GetState()
|
|
if is_leader {
|
|
return true, i
|
|
}
|
|
}
|
|
}
|
|
return false, 0
|
|
}
|
|
|
|
// Partition servers into 2 groups and put current leader in minority
|
|
func (cfg *config) make_partition() ([]int, []int) {
|
|
_, l := cfg.Leader()
|
|
p1 := make([]int, cfg.n/2+1)
|
|
p2 := make([]int, cfg.n/2)
|
|
j := 0
|
|
for i := 0; i < cfg.n; i++ {
|
|
if i != l {
|
|
if j < len(p1) {
|
|
p1[j] = i
|
|
} else {
|
|
p2[j-len(p1)] = i
|
|
}
|
|
j++
|
|
}
|
|
}
|
|
p2[len(p2)-1] = l
|
|
return p1, p2
|
|
}
|
|
|
|
func make_config(t *testing.T, n int, unreliable bool) *config {
|
|
runtime.GOMAXPROCS(4)
|
|
cfg := &config{}
|
|
cfg.t = t
|
|
cfg.net = labrpc.MakeNetwork()
|
|
cfg.n = n
|
|
cfg.servers = make([]*ShardCtrler, cfg.n)
|
|
cfg.saved = make([]*raft.Persister, cfg.n)
|
|
cfg.endnames = make([][]string, cfg.n)
|
|
cfg.clerks = make(map[*Clerk][]string)
|
|
cfg.nextClientId = cfg.n + 1000 // client ids start 1000 above the highest serverid
|
|
cfg.start = time.Now()
|
|
|
|
// create a full set of KV servers.
|
|
for i := 0; i < cfg.n; i++ {
|
|
cfg.StartServer(i)
|
|
}
|
|
|
|
cfg.ConnectAll()
|
|
|
|
cfg.net.Reliable(!unreliable)
|
|
|
|
return cfg
|
|
}
|