Skip to content

Commit c7342d7

Browse files
committed
Refine text dataset streaming and restore reset semantics
1 parent 1ce702b commit c7342d7

File tree

2 files changed

+98
-33
lines changed

2 files changed

+98
-33
lines changed

kaun/lib/kaun/dataset/dataset.ml

Lines changed: 72 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -195,61 +195,100 @@ let from_text_file ?(encoding = `UTF8) ?(chunk_size = 65536) path =
195195
| `LATIN1 -> "iso-8859-1"
196196
in
197197
let uenc_opt = Uutf.encoding_of_string enc_name in
198-
let dec = Uutf.decoder ?encoding:uenc_opt `Manual in
199-
let handle = create_mmap path in
198+
let make_decoder () = Uutf.decoder ?encoding:uenc_opt `Manual in
199+
let handle_ref = ref None in
200+
let file_size = ref 0 in
200201
let offset = ref 0 in
201202
let closed = ref false in
202203
let buf = Buffer.create 512 in
203204
let lines_queue = Queue.create () in
205+
let decoder = ref (make_decoder ()) in
206+
207+
let open_handle () =
208+
let handle = create_mmap path in
209+
file_size := handle.size;
210+
handle_ref := Some handle;
211+
handle
212+
in
213+
let ensure_handle () =
214+
match !handle_ref with Some h -> h | None -> open_handle ()
215+
in
216+
let close_handle () =
217+
match !handle_ref with
218+
| None -> ()
219+
| Some h ->
220+
(* Closing twice raises EBADF; swallow it because reset can
221+
reopen. *)
222+
(try close_mmap h with
223+
| Unix.Unix_error (Unix.EBADF, _, _) -> ()
224+
| exn -> raise exn);
225+
handle_ref := None
226+
in
227+
ignore (open_handle ());
204228

205229
let push_line_from_buf () =
206-
let s = Buffer.contents buf in
230+
let line = Buffer.contents buf in
207231
Buffer.clear buf;
208-
Queue.add s lines_queue
232+
Queue.add line lines_queue
209233
in
210234

211-
let rec drain_decoder () =
212-
match Uutf.decode dec with
213-
| `Uchar u ->
214-
if Uchar.to_int u = 0x000A then push_line_from_buf ();
215-
if Uchar.to_int u <> 0x000A then Uutf.Buffer.add_utf_8 buf u;
216-
drain_decoder ()
217-
| `Malformed _ ->
218-
Uutf.Buffer.add_utf_8 buf Uutf.u_rep;
219-
drain_decoder ()
220-
| `Await ->
221-
if !offset >= handle.size then
222-
Uutf.Manual.src dec (Bytes.create 0) 0 0
223-
else
224-
let chunk =
225-
read_mmap_chunk handle ~offset:!offset ~length:chunk_size
226-
in
227-
offset := !offset + String.length chunk;
228-
let bytes = Bytes.of_string chunk in
229-
Uutf.Manual.src dec bytes 0 (Bytes.length bytes);
230-
drain_decoder ()
231-
| `End ->
232-
if Buffer.length buf > 0 then push_line_from_buf ();
233-
()
235+
let rec fill_queue () =
236+
if Queue.is_empty lines_queue && not !closed then
237+
match Uutf.decode !decoder with
238+
| `Uchar u ->
239+
if Uchar.to_int u = 0x000A then push_line_from_buf ()
240+
else Uutf.Buffer.add_utf_8 buf u;
241+
if Queue.is_empty lines_queue then fill_queue ()
242+
| `Malformed _ ->
243+
Uutf.Buffer.add_utf_8 buf Uutf.u_rep;
244+
fill_queue ()
245+
| `Await ->
246+
if !offset >= !file_size then (
247+
Uutf.Manual.src !decoder (Bytes.create 0) 0 0;
248+
fill_queue ())
249+
else
250+
let handle = ensure_handle () in
251+
let chunk =
252+
read_mmap_chunk handle ~offset:!offset ~length:chunk_size
253+
in
254+
offset := !offset + String.length chunk;
255+
if chunk = "" then (
256+
Uutf.Manual.src !decoder (Bytes.create 0) 0 0;
257+
fill_queue ())
258+
else
259+
let bytes = Bytes.of_string chunk in
260+
Uutf.Manual.src !decoder bytes 0 (Bytes.length bytes);
261+
fill_queue ()
262+
| `End ->
263+
if Buffer.length buf > 0 then push_line_from_buf ();
264+
close_handle ();
265+
closed := true
234266
in
235267

236268
let rec next_line () =
237269
if not (Queue.is_empty lines_queue) then Some (Queue.take lines_queue)
238270
else if !closed then None
239271
else (
240-
drain_decoder ();
272+
fill_queue ();
241273
if not (Queue.is_empty lines_queue) then Some (Queue.take lines_queue)
242-
else if !offset >= handle.size then (
243-
close_mmap handle;
244-
closed := true;
245-
None)
274+
else if !closed then None
246275
else next_line ())
247276
in
248277

278+
let reset () =
279+
Buffer.clear buf;
280+
Queue.clear lines_queue;
281+
offset := 0;
282+
closed := false;
283+
decoder := make_decoder ();
284+
close_handle ();
285+
ignore (open_handle ())
286+
in
287+
249288
{
250289
next = next_line;
251290
cardinality = (fun () -> Unknown);
252-
reset = None;
291+
reset = Some reset;
253292
spec = (fun () -> Scalar "string");
254293
}
255294

kaun/test/test_dataset.ml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,29 @@ let test_from_text_file_large_lines () =
9797
(fun l -> Alcotest.(check int) "line length" 1000 (String.length l))
9898
collected)
9999

100+
let test_from_text_file_reset () =
101+
let content = "line1\nline2\n" in
102+
with_temp_file content (fun path ->
103+
let dataset = from_text_file path in
104+
let expected = [ "line1"; "line2" ] in
105+
let first_pass = collect_dataset dataset in
106+
Alcotest.(check (list string)) "first pass" expected first_pass;
107+
reset dataset;
108+
let second_pass = collect_dataset dataset in
109+
Alcotest.(check (list string)) "after reset" expected second_pass)
110+
111+
let test_from_text_file_reset_mid_stream () =
112+
let content = "alpha\nbeta\ngamma\n" in
113+
with_temp_file content (fun path ->
114+
let dataset = from_text_file path in
115+
let first_chunk = collect_n 1 dataset in
116+
Alcotest.(check (list string))
117+
"consumed first element" [ "alpha" ] first_chunk;
118+
reset dataset;
119+
let refreshed = collect_n 2 dataset in
120+
Alcotest.(check (list string))
121+
"after reset first two elements" [ "alpha"; "beta" ] refreshed)
122+
100123
let test_from_text_files () =
101124
let content1 = "file1_line1\nfile1_line2\n" in
102125
let content2 = "file2_line1\nfile2_line2\n" in
@@ -639,6 +662,9 @@ let () =
639662
test_case "from_text_file_latin1" `Quick test_from_text_file_latin1;
640663
test_case "from_text_file_large_lines" `Quick
641664
test_from_text_file_large_lines;
665+
test_case "from_text_file_reset" `Quick test_from_text_file_reset;
666+
test_case "from_text_file_reset_mid_stream" `Quick
667+
test_from_text_file_reset_mid_stream;
642668
test_case "from_text_files" `Quick test_from_text_files;
643669
test_case "from_jsonl" `Quick test_from_jsonl;
644670
test_case "from_jsonl_custom_field" `Quick

0 commit comments

Comments
 (0)