MapReduce/mr/coordinator.go

171 lines
4.1 KiB
Go

package mr
import (
"log"
"net"
"net/http"
"net/rpc"
"os"
"sync"
"time"
)
type Coordinator struct {
// Your definitions here.
files []string // files to be processed
filesStatus map[string]string // status of files (0: not maped, 1: maped)
allMapsDone bool // all map tasks are done
nReduce int // number of reduce tasks
reduceTasks []int // reduce task
reduceStatus []string // status of reduce tasks
allReducesDone bool // all reduce tasks are done
mutex sync.Mutex // mutex
}
// Your code here -- RPC handlers for the worker to call.
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
return nil
}
func (c *Coordinator) Assign(args *AssignArgs, reply *AssignReply) error {
c.mutex.Lock()
defer c.mutex.Unlock()
// assign map task to worker
if len(c.files) > 0 {
reply.TaskType = 0
reply.NReduce = c.nReduce
reply.File = c.files[0]
c.filesStatus[c.files[0]] = time.Now().Format(time.RFC3339)
c.files = c.files[1:]
} else if c.allMapsDone && len(c.reduceTasks) > 0 {
reply.TaskType = 1
reply.NReduce = c.reduceTasks[0]
c.reduceStatus[c.reduceTasks[0]] = time.Now().Format(time.RFC3339)
c.reduceTasks = c.reduceTasks[1:]
} else {
reply.TaskType = 2
}
return nil
}
func (c *Coordinator) FinishMap(args *FinishMapArgs, reply *FinishMapReply) error {
c.mutex.Lock()
defer c.mutex.Unlock()
// mark map task as done
c.filesStatus[args.File] = "DONE"
// log.Println("FinishMap: ", args.File)
// check if all map tasks are done
for _, status := range c.filesStatus {
if status != "DONE" {
c.allMapsDone = false
return nil
}
}
c.allMapsDone = true
return nil
}
func (c *Coordinator) FinishReduce(args *FinishReduceArgs, reply *FinishReduceReply) error {
c.mutex.Lock()
defer c.mutex.Unlock()
// mark reduce task as done
c.reduceStatus[args.Index] = "DONE"
// log.Println("FinishReduce: ", args.Index)
// check if all reduce tasks are done
for _, status := range c.reduceStatus {
if status != "DONE" {
return nil
}
}
c.allReducesDone = true
return nil
}
// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
ret := false
// Your code here.
c.mutex.Lock()
defer c.mutex.Unlock()
ret = c.allReducesDone
return ret
}
func (c *Coordinator) checkWorkersStatus() {
for {
time.Sleep(1 * time.Second)
c.mutex.Lock()
if !c.allMapsDone {
for i, status := range c.filesStatus {
if status != "DONE" && status != "TODO" {
t, _ := time.Parse(time.RFC3339, status)
if time.Since(t) > 10*time.Second {
c.filesStatus[i] = "TODO"
c.files = append(c.files, i)
}
}
}
}
if c.allMapsDone && !c.allReducesDone {
for i, status := range c.reduceStatus {
if status != "DONE" && status != "TODO" {
t, _ := time.Parse(time.RFC3339, status)
if time.Since(t) > 10*time.Second {
c.reduceStatus[i] = "TODO"
c.reduceTasks = append(c.reduceTasks, i)
}
}
}
}
c.mutex.Unlock()
}
}
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
// Your code here.
c.files = files
c.filesStatus = make(map[string]string)
for _, file := range files {
c.filesStatus[file] = "TODO"
}
c.nReduce = nReduce
c.allMapsDone = false
c.reduceTasks = make([]int, nReduce)
c.reduceStatus = make([]string, nReduce)
for i := 0; i < nReduce; i++ {
c.reduceTasks[i] = i
c.reduceStatus[i] = "TODO"
}
go c.checkWorkersStatus()
c.server()
return &c
}