package mr import ( "log" "net" "net/http" "net/rpc" "os" "sync" "time" ) type Coordinator struct { // Your definitions here. files []string // files to be processed filesStatus map[string]string // status of files (0: not maped, 1: maped) allMapsDone bool // all map tasks are done nReduce int // number of reduce tasks reduceTasks []int // reduce task reduceStatus []string // status of reduce tasks allReducesDone bool // all reduce tasks are done mutex sync.Mutex // mutex } // Your code here -- RPC handlers for the worker to call. // an example RPC handler. // // the RPC argument and reply types are defined in rpc.go. func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error { reply.Y = args.X + 1 return nil } func (c *Coordinator) Assign(args *AssignArgs, reply *AssignReply) error { c.mutex.Lock() defer c.mutex.Unlock() // assign map task to worker if len(c.files) > 0 { reply.TaskType = 0 reply.NReduce = c.nReduce reply.File = c.files[0] c.filesStatus[c.files[0]] = time.Now().Format(time.RFC3339) c.files = c.files[1:] } else if c.allMapsDone && len(c.reduceTasks) > 0 { reply.TaskType = 1 reply.NReduce = c.reduceTasks[0] c.reduceStatus[c.reduceTasks[0]] = time.Now().Format(time.RFC3339) c.reduceTasks = c.reduceTasks[1:] } else { reply.TaskType = 2 } return nil } func (c *Coordinator) FinishMap(args *FinishMapArgs, reply *FinishMapReply) error { c.mutex.Lock() defer c.mutex.Unlock() // mark map task as done c.filesStatus[args.File] = "DONE" // log.Println("FinishMap: ", args.File) // check if all map tasks are done for _, status := range c.filesStatus { if status != "DONE" { c.allMapsDone = false return nil } } c.allMapsDone = true return nil } func (c *Coordinator) FinishReduce(args *FinishReduceArgs, reply *FinishReduceReply) error { c.mutex.Lock() defer c.mutex.Unlock() // mark reduce task as done c.reduceStatus[args.Index] = "DONE" // log.Println("FinishReduce: ", args.Index) // check if all reduce tasks are done for _, status := range c.reduceStatus { if status != "DONE" { return nil } } c.allReducesDone = true return nil } // start a thread that listens for RPCs from worker.go func (c *Coordinator) server() { rpc.Register(c) rpc.HandleHTTP() //l, e := net.Listen("tcp", ":1234") sockname := coordinatorSock() os.Remove(sockname) l, e := net.Listen("unix", sockname) if e != nil { log.Fatal("listen error:", e) } go http.Serve(l, nil) } // main/mrcoordinator.go calls Done() periodically to find out // if the entire job has finished. func (c *Coordinator) Done() bool { ret := false // Your code here. c.mutex.Lock() defer c.mutex.Unlock() ret = c.allReducesDone return ret } func (c *Coordinator) checkWorkersStatus() { for { time.Sleep(1 * time.Second) c.mutex.Lock() if !c.allMapsDone { for i, status := range c.filesStatus { if status != "DONE" && status != "TODO" { t, _ := time.Parse(time.RFC3339, status) if time.Since(t) > 10*time.Second { c.filesStatus[i] = "TODO" c.files = append(c.files, i) } } } } if c.allMapsDone && !c.allReducesDone { for i, status := range c.reduceStatus { if status != "DONE" && status != "TODO" { t, _ := time.Parse(time.RFC3339, status) if time.Since(t) > 10*time.Second { c.reduceStatus[i] = "TODO" c.reduceTasks = append(c.reduceTasks, i) } } } } c.mutex.Unlock() } } // create a Coordinator. // main/mrcoordinator.go calls this function. // nReduce is the number of reduce tasks to use. func MakeCoordinator(files []string, nReduce int) *Coordinator { c := Coordinator{} // Your code here. c.files = files c.filesStatus = make(map[string]string) for _, file := range files { c.filesStatus[file] = "TODO" } c.nReduce = nReduce c.allMapsDone = false c.reduceTasks = make([]int, nReduce) c.reduceStatus = make([]string, nReduce) for i := 0; i < nReduce; i++ { c.reduceTasks[i] = i c.reduceStatus[i] = "TODO" } go c.checkWorkersStatus() c.server() return &c }