171 lines
4.1 KiB
Go
171 lines
4.1 KiB
Go
package mr
|
|
|
|
import (
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"net/rpc"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type Coordinator struct {
|
|
// Your definitions here.
|
|
files []string // files to be processed
|
|
filesStatus map[string]string // status of files (0: not maped, 1: maped)
|
|
allMapsDone bool // all map tasks are done
|
|
nReduce int // number of reduce tasks
|
|
reduceTasks []int // reduce task
|
|
reduceStatus []string // status of reduce tasks
|
|
allReducesDone bool // all reduce tasks are done
|
|
mutex sync.Mutex // mutex
|
|
}
|
|
|
|
// Your code here -- RPC handlers for the worker to call.
|
|
|
|
// an example RPC handler.
|
|
//
|
|
// the RPC argument and reply types are defined in rpc.go.
|
|
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
|
|
reply.Y = args.X + 1
|
|
return nil
|
|
}
|
|
|
|
func (c *Coordinator) Assign(args *AssignArgs, reply *AssignReply) error {
|
|
c.mutex.Lock()
|
|
defer c.mutex.Unlock()
|
|
// assign map task to worker
|
|
if len(c.files) > 0 {
|
|
reply.TaskType = 0
|
|
reply.NReduce = c.nReduce
|
|
reply.File = c.files[0]
|
|
c.filesStatus[c.files[0]] = time.Now().Format(time.RFC3339)
|
|
c.files = c.files[1:]
|
|
} else if c.allMapsDone && len(c.reduceTasks) > 0 {
|
|
reply.TaskType = 1
|
|
reply.NReduce = c.reduceTasks[0]
|
|
c.reduceStatus[c.reduceTasks[0]] = time.Now().Format(time.RFC3339)
|
|
c.reduceTasks = c.reduceTasks[1:]
|
|
} else {
|
|
reply.TaskType = 2
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Coordinator) FinishMap(args *FinishMapArgs, reply *FinishMapReply) error {
|
|
c.mutex.Lock()
|
|
defer c.mutex.Unlock()
|
|
// mark map task as done
|
|
c.filesStatus[args.File] = "DONE"
|
|
// log.Println("FinishMap: ", args.File)
|
|
// check if all map tasks are done
|
|
for _, status := range c.filesStatus {
|
|
if status != "DONE" {
|
|
c.allMapsDone = false
|
|
return nil
|
|
}
|
|
}
|
|
c.allMapsDone = true
|
|
return nil
|
|
}
|
|
|
|
func (c *Coordinator) FinishReduce(args *FinishReduceArgs, reply *FinishReduceReply) error {
|
|
c.mutex.Lock()
|
|
defer c.mutex.Unlock()
|
|
// mark reduce task as done
|
|
c.reduceStatus[args.Index] = "DONE"
|
|
// log.Println("FinishReduce: ", args.Index)
|
|
// check if all reduce tasks are done
|
|
for _, status := range c.reduceStatus {
|
|
if status != "DONE" {
|
|
return nil
|
|
}
|
|
}
|
|
c.allReducesDone = true
|
|
return nil
|
|
}
|
|
|
|
// start a thread that listens for RPCs from worker.go
|
|
func (c *Coordinator) server() {
|
|
rpc.Register(c)
|
|
rpc.HandleHTTP()
|
|
//l, e := net.Listen("tcp", ":1234")
|
|
sockname := coordinatorSock()
|
|
os.Remove(sockname)
|
|
l, e := net.Listen("unix", sockname)
|
|
if e != nil {
|
|
log.Fatal("listen error:", e)
|
|
}
|
|
go http.Serve(l, nil)
|
|
}
|
|
|
|
// main/mrcoordinator.go calls Done() periodically to find out
|
|
// if the entire job has finished.
|
|
func (c *Coordinator) Done() bool {
|
|
ret := false
|
|
|
|
// Your code here.
|
|
c.mutex.Lock()
|
|
defer c.mutex.Unlock()
|
|
ret = c.allReducesDone
|
|
|
|
return ret
|
|
}
|
|
|
|
func (c *Coordinator) checkWorkersStatus() {
|
|
for {
|
|
time.Sleep(1 * time.Second)
|
|
c.mutex.Lock()
|
|
if !c.allMapsDone {
|
|
for i, status := range c.filesStatus {
|
|
if status != "DONE" && status != "TODO" {
|
|
t, _ := time.Parse(time.RFC3339, status)
|
|
if time.Since(t) > 10*time.Second {
|
|
c.filesStatus[i] = "TODO"
|
|
c.files = append(c.files, i)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if c.allMapsDone && !c.allReducesDone {
|
|
for i, status := range c.reduceStatus {
|
|
if status != "DONE" && status != "TODO" {
|
|
t, _ := time.Parse(time.RFC3339, status)
|
|
if time.Since(t) > 10*time.Second {
|
|
c.reduceStatus[i] = "TODO"
|
|
c.reduceTasks = append(c.reduceTasks, i)
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
c.mutex.Unlock()
|
|
}
|
|
}
|
|
|
|
// create a Coordinator.
|
|
// main/mrcoordinator.go calls this function.
|
|
// nReduce is the number of reduce tasks to use.
|
|
func MakeCoordinator(files []string, nReduce int) *Coordinator {
|
|
c := Coordinator{}
|
|
|
|
// Your code here.
|
|
c.files = files
|
|
c.filesStatus = make(map[string]string)
|
|
for _, file := range files {
|
|
c.filesStatus[file] = "TODO"
|
|
}
|
|
c.nReduce = nReduce
|
|
c.allMapsDone = false
|
|
c.reduceTasks = make([]int, nReduce)
|
|
c.reduceStatus = make([]string, nReduce)
|
|
for i := 0; i < nReduce; i++ {
|
|
c.reduceTasks[i] = i
|
|
c.reduceStatus[i] = "TODO"
|
|
}
|
|
go c.checkWorkersStatus()
|
|
c.server()
|
|
return &c
|
|
}
|