-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdispatcher_server.ml
87 lines (76 loc) · 3.43 KB
/
dispatcher_server.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
module M_p = Message_processor
module U = Util_shared
module type Processor_server_inst = sig
module Processor: M_p.Processor
val update: Processor.Server.t -> unit
val server: Processor.Server.t ref
end;;
module Server = struct
type t = (string, (module Processor_server_inst)) Hashtbl.t
let dump_to_file server filename =
let marsh_dict = server
|> U.table_to_pairs
|> List.map (fun (name, (module M: Processor_server_inst)) ->
(name, Marshal.to_string (!M.server) [])) in
let marsh_str = Marshal.to_string marsh_dict [] in
Core.Std.Out_channel.write_all filename ~data: marsh_str
let read_dump_from_file server filename =
let marsh_str = Core.Std.In_channel.read_all filename in
let marsh_dict = Marshal.from_string marsh_str 0 in
List.iter begin
fun (name, server_marsh_str) ->
if Hashtbl.mem server name then
let (module M: Processor_server_inst) = Hashtbl.find server name in
M.update (Marshal.from_string server_marsh_str 0)
end marsh_dict
let server_payload_to_string payload =
Yojson.Safe.to_string
(M_p.Server_payload.to_yojson payload)
let send_server_payload ws_server payload =
App_websocket.send_all ws_server (server_payload_to_string payload)
let create l =
let table: t = Hashtbl.create 10 in
List.iter begin
fun (module M: Processor_server_inst) ->
Hashtbl.add table (M.Processor.Server.get_name !(M.server)) (module M: Processor_server_inst)
end l;
table
let process_message ws_server t msg =
Hashtbl.filter_map_inplace begin
fun name (module M: Processor_server_inst) ->
let (server, maybe_payload) = M.Processor.process_message !M.server msg in
M.update server;
(match maybe_payload with
| None -> ()
| Some payload -> ignore (send_server_payload ws_server payload));
Some (module M)
end t
let process_client_payload t client_payload =
match client_payload with
| M_p.Client_payload.Full name
| M_p.Client_payload.Update (name, _) -> begin
try
let (module M: Processor_server_inst) = Hashtbl.find t name in
M.Processor.get_full_server_payload !M.server
with
Not_found -> M_p.Server_payload.Empty
end
end
let create_processor_server_inst
(type a)
?name
(module P: M_p.Processor with type config = a)
config =
(module struct
module Processor = P
let server = ref (Processor.Server.create ?name config)
let update t =
server := Processor.Server.set_config t config
end: Processor_server_inst)
let server = Server.create [
create_processor_server_inst (module Total_count_processor) { interval_s = 60 * 60 } ~name: "total_hour";
create_processor_server_inst (module Total_count_processor) { interval_s = 60 } ~name: "total_minute";
create_processor_server_inst (module Conversations_processor) { interval_s = 60 * 10; decay_s = 60 * 60 * 24; history_limit = -1 };
create_processor_server_inst (module Frequency_processor) { interval_s = 60 * 10; decay_s = 60 * 60 * 24 };
create_processor_server_inst (module Last_seen_processor) ();
]