Skip to content

Commit

Permalink
Adapt word_count example to pipeline model.
Browse files Browse the repository at this point in the history
  • Loading branch information
pmundkur committed Jul 27, 2012
1 parent bd373af commit a9de13f
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions tests/word_count.ml
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,18 @@ let strip_word = function
let output oc ~key value =
output_string oc (Printf.sprintf "%s\xff%s\x00" key value)

module TestTask = struct
type map_init = int ref * (string, int) Hashtbl.t

let map_init _ = ref 0, (Hashtbl.create (1024 * 1024) : (string, int) Hashtbl.t)

let map (cnt, tbl) disco in_chan =
disco.Task.log (Printf.sprintf "Mapping %s (%d bytes) on %s ...\n"
disco.Task.input_url disco.Task.input_size disco.Task.hostname);
module MapTask = struct
module T = Task
type init = int ref * (string, int) Hashtbl.t

let task_init _ =
ref 0, (Hashtbl.create (1024 * 1024) : (string, int) Hashtbl.t)

let task_process (cnt, tbl) disco in_chan =
disco.T.log (Printf.sprintf
"Mapping %s (%d bytes) with label %d on %s ...\n"
disco.T.input_url disco.T.input_size
disco.T.group_label disco.T.hostname);
let rec loop () =
List.iter (fun w ->
match strip_word w with
Expand All @@ -64,18 +68,21 @@ module TestTask = struct
loop ()
in try loop () with End_of_file -> ()

let map_done (cnt, tbl) disco =
let task_done (cnt, tbl) disco =
Hashtbl.iter
(fun k v ->
output (disco.Task.out_channel ~label:0) k (Printf.sprintf "%d" v)
output (disco.T.out_channel ~label:0) k (Printf.sprintf "%d" v)
) tbl;
disco.Task.log (Printf.sprintf "Mapped %d entries.\n" !cnt)
disco.T.log (Printf.sprintf "Mapped %d entries.\n" !cnt)
end

type reduce_init = string list ref
module ReduceTask = struct
module T = Task
type init = string list ref

let reduce_init _ = ref ([] : string list)
let task_init _ = ref ([] : string list)

let reduce in_files disco _ =
let task_process in_files disco _ =
in_files := disco.Task.input_path :: !in_files

(* Invoke external sort *)
Expand Down Expand Up @@ -153,7 +160,7 @@ module TestTask = struct
si.records_parsed <- si.records_parsed + 1;
r

let reduce_done in_files disco =
let task_done in_files disco =
let sort_out = Filename.temp_file ~temp_dir:disco.Task.temp_dir "sorted-" "" in
disco.Task.log (Printf.sprintf "Starting sort (=>%s)\n" sort_out);
unix_sort !in_files sort_out disco;
Expand All @@ -179,4 +186,5 @@ module TestTask = struct
end

let _ =
Worker.start (module TestTask : Task.TASK)
Worker.start [("map", (module MapTask : Task.TASK));
("reduce", (module ReduceTask : Task.TASK))]

0 comments on commit a9de13f

Please sign in to comment.