package mr import ( "encoding/json" "fmt" "hash/fnv" "io" "log" "net/rpc" "os" "path/filepath" "sort" "strings" "time" ) // Map functions return a slice of KeyValue. type KeyValue struct { Key string Value string } type ByKey []KeyValue // for sorting by key. func (a ByKey) Len() int { return len(a) } func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key } // use ihash(key) % NReduce to choose the reduce // task number for each KeyValue emitted by Map. func ihash(key string) int { h := fnv.New32a() h.Write([]byte(key)) return int(h.Sum32() & 0x7fffffff) } // main/mrworker.go calls this function. func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { // Your worker implementation here. // CallAssign() for { CallAssign(mapf, reducef) time.Sleep(100 * time.Millisecond) } // uncomment to send the Example RPC to the coordinator. // CallExample() } // example function to show how to make an RPC call to the coordinator. // // the RPC argument and reply types are defined in rpc.go. func CallExample() { // declare an argument structure. args := ExampleArgs{} // fill in the argument(s). args.X = 99 // declare a reply structure. reply := ExampleReply{} // send the RPC request, wait for the reply. // the "Coordinator.Example" tells the // receiving server that we'd like to call // the Example() method of struct Coordinator. ok := call("Coordinator.Example", &args, &reply) if ok { // reply.Y should be 100. fmt.Printf("reply.Y %v\n", reply.Y) } else { fmt.Printf("call failed!\n") } } func CallAssign(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { args := AssignArgs{} reply := AssignReply{} ok := call("Coordinator.Assign", &args, &reply) if !ok { fmt.Printf("call failed!\n") } if reply.TaskType == 0 { // fmt.Printf("Map task: %v\n", reply.File) mapTask(mapf, reply.File, reply.NReduce) } else if reply.TaskType == 1 { // fmt.Printf("Reduce task: %v\n", reply.NReduce) ReduceTask(reducef, reply.NReduce) } else if reply.TaskType == 2 { return } } func CallFinishMap(filename string) { args := FinishMapArgs{filename} reply := FinishMapReply{} ok := call("Coordinator.FinishMap", &args, &reply) if !ok { fmt.Printf("call failed!\n") } } func CallFinishReduce(index int) { args := FinishReduceArgs{index} reply := FinishReduceReply{} ok := call("Coordinator.FinishReduce", &args, &reply) if !ok { fmt.Printf("call failed!\n") } } func mapTask(mapf func(string, string) []KeyValue, path string, nReduce int) { file, err := os.Open(path) filename := filepath.Base(path) filename = strings.TrimSuffix(filename, filepath.Ext(filename)) if err != nil { log.Fatalf("cannot open %v", filename) } content, err := io.ReadAll(file) if err != nil { log.Fatalf("cannot read %v", filename) } file.Close() kva := mapf(path, string(content)) // Create intermediate files intermediate := make([]*os.File, nReduce) for i := 0; i < nReduce; i++ { oname := fmt.Sprintf("mr-%v-%v.json", i, filename) ofile, err := os.Create(oname) if err != nil { log.Fatalf("cannot create %v", oname) } intermediate[i] = ofile } for _, kv := range kva { i := ihash(kv.Key) % nReduce encoder := json.NewEncoder(intermediate[i]) err := encoder.Encode(&kv) if err != nil { log.Print(err) log.Fatalf("cannot encode %v", kv) } } CallFinishMap(path) } func ReduceTask(reducef func(string, []string) string, nReduce int) { target := fmt.Sprintf("mr-%v-*", nReduce) filenames, err := filepath.Glob(target) if err != nil { log.Fatalf("cannot glob %v", target) return } intermediate := []KeyValue{} for _, filename := range filenames { file, err := os.Open(filename) if err != nil { log.Fatalf("cannot open %v", filename) } decoder := json.NewDecoder(file) for { var kv KeyValue if err := decoder.Decode(&kv); err != nil { break } intermediate = append(intermediate, kv) } file.Close() } // shuffer sort.Sort(ByKey(intermediate)) oname := fmt.Sprintf("mr-out-%v", nReduce) ofile, _ := os.Create(oname) i := 0 for i < len(intermediate) { j := i + 1 for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key { j++ } values := []string{} for k := i; k < j; k++ { values = append(values, intermediate[k].Value) } output := reducef(intermediate[i].Key, values) // this is the correct format for each line of Reduce output. fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output) i = j } ofile.Close() CallFinishReduce(nReduce) } // send an RPC request to the coordinator, wait for the response. // usually returns true. // returns false if something goes wrong. func call(rpcname string, args interface{}, reply interface{}) bool { // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234") sockname := coordinatorSock() c, err := rpc.DialHTTP("unix", sockname) if err != nil { // log.Fatal("dialing:", err) os.Exit(0) } defer c.Close() err = c.Call(rpcname, args, reply) if err == nil { return true } fmt.Println(err) return false }