MapReduce/shardctrler/config.go

358 lines
8.1 KiB
Go

package shardctrler
import "6.5840/labrpc"
import "6.5840/raft"
import "testing"
import "os"
// import "log"
import crand "crypto/rand"
import "math/rand"
import "encoding/base64"
import "sync"
import "runtime"
import "time"
func randstring(n int) string {
b := make([]byte, 2*n)
crand.Read(b)
s := base64.URLEncoding.EncodeToString(b)
return s[0:n]
}
// Randomize server handles
func random_handles(kvh []*labrpc.ClientEnd) []*labrpc.ClientEnd {
sa := make([]*labrpc.ClientEnd, len(kvh))
copy(sa, kvh)
for i := range sa {
j := rand.Intn(i + 1)
sa[i], sa[j] = sa[j], sa[i]
}
return sa
}
type config struct {
mu sync.Mutex
t *testing.T
net *labrpc.Network
n int
servers []*ShardCtrler
saved []*raft.Persister
endnames [][]string // names of each server's sending ClientEnds
clerks map[*Clerk][]string
nextClientId int
start time.Time // time at which make_config() was called
}
func (cfg *config) checkTimeout() {
// enforce a two minute real-time limit on each test
if !cfg.t.Failed() && time.Since(cfg.start) > 120*time.Second {
cfg.t.Fatal("test took longer than 120 seconds")
}
}
func (cfg *config) cleanup() {
cfg.mu.Lock()
defer cfg.mu.Unlock()
for i := 0; i < len(cfg.servers); i++ {
if cfg.servers[i] != nil {
cfg.servers[i].Kill()
}
}
cfg.net.Cleanup()
cfg.checkTimeout()
}
// Maximum log size across all servers
func (cfg *config) LogSize() int {
logsize := 0
for i := 0; i < cfg.n; i++ {
n := cfg.saved[i].RaftStateSize()
if n > logsize {
logsize = n
}
}
return logsize
}
// attach server i to servers listed in to
// caller must hold cfg.mu
func (cfg *config) connectUnlocked(i int, to []int) {
// log.Printf("connect peer %d to %v\n", i, to)
// outgoing socket files
for j := 0; j < len(to); j++ {
endname := cfg.endnames[i][to[j]]
cfg.net.Enable(endname, true)
}
// incoming socket files
for j := 0; j < len(to); j++ {
endname := cfg.endnames[to[j]][i]
cfg.net.Enable(endname, true)
}
}
func (cfg *config) connect(i int, to []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.connectUnlocked(i, to)
}
// detach server i from the servers listed in from
// caller must hold cfg.mu
func (cfg *config) disconnectUnlocked(i int, from []int) {
// log.Printf("disconnect peer %d from %v\n", i, from)
// outgoing socket files
for j := 0; j < len(from); j++ {
if cfg.endnames[i] != nil {
endname := cfg.endnames[i][from[j]]
cfg.net.Enable(endname, false)
}
}
// incoming socket files
for j := 0; j < len(from); j++ {
if cfg.endnames[j] != nil {
endname := cfg.endnames[from[j]][i]
cfg.net.Enable(endname, false)
}
}
}
func (cfg *config) disconnect(i int, from []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.disconnectUnlocked(i, from)
}
func (cfg *config) All() []int {
all := make([]int, cfg.n)
for i := 0; i < cfg.n; i++ {
all[i] = i
}
return all
}
func (cfg *config) ConnectAll() {
cfg.mu.Lock()
defer cfg.mu.Unlock()
for i := 0; i < cfg.n; i++ {
cfg.connectUnlocked(i, cfg.All())
}
}
// Sets up 2 partitions with connectivity between servers in each partition.
func (cfg *config) partition(p1 []int, p2 []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
// log.Printf("partition servers into: %v %v\n", p1, p2)
for i := 0; i < len(p1); i++ {
cfg.disconnectUnlocked(p1[i], p2)
cfg.connectUnlocked(p1[i], p1)
}
for i := 0; i < len(p2); i++ {
cfg.disconnectUnlocked(p2[i], p1)
cfg.connectUnlocked(p2[i], p2)
}
}
// Create a clerk with clerk specific server names.
// Give it connections to all of the servers, but for
// now enable only connections to servers in to[].
func (cfg *config) makeClient(to []int) *Clerk {
cfg.mu.Lock()
defer cfg.mu.Unlock()
// a fresh set of ClientEnds.
ends := make([]*labrpc.ClientEnd, cfg.n)
endnames := make([]string, cfg.n)
for j := 0; j < cfg.n; j++ {
endnames[j] = randstring(20)
ends[j] = cfg.net.MakeEnd(endnames[j])
cfg.net.Connect(endnames[j], j)
}
ck := MakeClerk(random_handles(ends))
cfg.clerks[ck] = endnames
cfg.nextClientId++
cfg.ConnectClientUnlocked(ck, to)
return ck
}
func (cfg *config) deleteClient(ck *Clerk) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
v := cfg.clerks[ck]
for i := 0; i < len(v); i++ {
os.Remove(v[i])
}
delete(cfg.clerks, ck)
}
// caller should hold cfg.mu
func (cfg *config) ConnectClientUnlocked(ck *Clerk, to []int) {
// log.Printf("ConnectClient %v to %v\n", ck, to)
endnames := cfg.clerks[ck]
for j := 0; j < len(to); j++ {
s := endnames[to[j]]
cfg.net.Enable(s, true)
}
}
func (cfg *config) ConnectClient(ck *Clerk, to []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.ConnectClientUnlocked(ck, to)
}
// caller should hold cfg.mu
func (cfg *config) DisconnectClientUnlocked(ck *Clerk, from []int) {
// log.Printf("DisconnectClient %v from %v\n", ck, from)
endnames := cfg.clerks[ck]
for j := 0; j < len(from); j++ {
s := endnames[from[j]]
cfg.net.Enable(s, false)
}
}
func (cfg *config) DisconnectClient(ck *Clerk, from []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.DisconnectClientUnlocked(ck, from)
}
// Shutdown a server by isolating it
func (cfg *config) ShutdownServer(i int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.disconnectUnlocked(i, cfg.All())
// disable client connections to the server.
// it's important to do this before creating
// the new Persister in saved[i], to avoid
// the possibility of the server returning a
// positive reply to an Append but persisting
// the result in the superseded Persister.
cfg.net.DeleteServer(i)
// a fresh persister, in case old instance
// continues to update the Persister.
// but copy old persister's content so that we always
// pass Make() the last persisted state.
if cfg.saved[i] != nil {
cfg.saved[i] = cfg.saved[i].Copy()
}
kv := cfg.servers[i]
if kv != nil {
cfg.mu.Unlock()
kv.Kill()
cfg.mu.Lock()
cfg.servers[i] = nil
}
}
// If restart servers, first call ShutdownServer
func (cfg *config) StartServer(i int) {
cfg.mu.Lock()
// a fresh set of outgoing ClientEnd names.
cfg.endnames[i] = make([]string, cfg.n)
for j := 0; j < cfg.n; j++ {
cfg.endnames[i][j] = randstring(20)
}
// a fresh set of ClientEnds.
ends := make([]*labrpc.ClientEnd, cfg.n)
for j := 0; j < cfg.n; j++ {
ends[j] = cfg.net.MakeEnd(cfg.endnames[i][j])
cfg.net.Connect(cfg.endnames[i][j], j)
}
// a fresh persister, so old instance doesn't overwrite
// new instance's persisted state.
// give the fresh persister a copy of the old persister's
// state, so that the spec is that we pass StartKVServer()
// the last persisted state.
if cfg.saved[i] != nil {
cfg.saved[i] = cfg.saved[i].Copy()
} else {
cfg.saved[i] = raft.MakePersister()
}
cfg.mu.Unlock()
cfg.servers[i] = StartServer(ends, i, cfg.saved[i])
kvsvc := labrpc.MakeService(cfg.servers[i])
rfsvc := labrpc.MakeService(cfg.servers[i].rf)
srv := labrpc.MakeServer()
srv.AddService(kvsvc)
srv.AddService(rfsvc)
cfg.net.AddServer(i, srv)
}
func (cfg *config) Leader() (bool, int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
for i := 0; i < cfg.n; i++ {
if cfg.servers[i] != nil {
_, is_leader := cfg.servers[i].rf.GetState()
if is_leader {
return true, i
}
}
}
return false, 0
}
// Partition servers into 2 groups and put current leader in minority
func (cfg *config) make_partition() ([]int, []int) {
_, l := cfg.Leader()
p1 := make([]int, cfg.n/2+1)
p2 := make([]int, cfg.n/2)
j := 0
for i := 0; i < cfg.n; i++ {
if i != l {
if j < len(p1) {
p1[j] = i
} else {
p2[j-len(p1)] = i
}
j++
}
}
p2[len(p2)-1] = l
return p1, p2
}
func make_config(t *testing.T, n int, unreliable bool) *config {
runtime.GOMAXPROCS(4)
cfg := &config{}
cfg.t = t
cfg.net = labrpc.MakeNetwork()
cfg.n = n
cfg.servers = make([]*ShardCtrler, cfg.n)
cfg.saved = make([]*raft.Persister, cfg.n)
cfg.endnames = make([][]string, cfg.n)
cfg.clerks = make(map[*Clerk][]string)
cfg.nextClientId = cfg.n + 1000 // client ids start 1000 above the highest serverid
cfg.start = time.Now()
// create a full set of KV servers.
for i := 0; i < cfg.n; i++ {
cfg.StartServer(i)
}
cfg.ConnectAll()
cfg.net.Reliable(!unreliable)
return cfg
}