@@ -185,72 +185,112 @@ let from_tensors (x, y) =
185185 }
186186
187187(* Text Data Sources *)
188+ let from_text_file ?(encoding = `UTF8 ) ?(chunk_size = 65536 ) path =
189+ match encoding with
190+ | `UTF8 | `ASCII | `LATIN1 ->
191+ let enc_name =
192+ match encoding with
193+ | `UTF8 -> " utf-8"
194+ | `ASCII -> " us-ascii"
195+ | `LATIN1 -> " iso-8859-1"
196+ in
197+ let uenc_opt = Uutf. encoding_of_string enc_name in
198+ let make_decoder () = Uutf. decoder ?encoding:uenc_opt `Manual in
199+ let handle_ref = ref None in
200+ let file_size = ref 0 in
201+ let offset = ref 0 in
202+ let closed = ref false in
203+ let buf = Buffer. create 512 in
204+ let lines_queue = Queue. create () in
205+ let decoder = ref (make_decoder () ) in
206+
207+ let open_handle () =
208+ let handle = create_mmap path in
209+ file_size := handle.size;
210+ handle_ref := Some handle;
211+ handle
212+ in
213+ let ensure_handle () =
214+ match ! handle_ref with Some h -> h | None -> open_handle ()
215+ in
216+ let close_handle () =
217+ match ! handle_ref with
218+ | None -> ()
219+ | Some h ->
220+ (* Closing twice raises EBADF; swallow it because reset can
221+ reopen. *)
222+ (try close_mmap h with
223+ | Unix. Unix_error (Unix. EBADF, _ , _ ) -> ()
224+ | exn -> raise exn );
225+ handle_ref := None
226+ in
227+ ignore (open_handle () );
188228
189- let from_text_file ?encoding ?(chunk_size = 65536 ) path =
190- let _ = encoding in
191- (* TODO: Handle different encodings *)
192- let handle = create_mmap path in
193- let offset = ref 0 in
194- let buffer = ref " " in
195- let buffer_pos = ref 0 in
196- let closed = ref false in
197-
198- let rec next_line () =
199- if ! closed then None
200- else
201- (* Look for newline in buffer *)
202- try
203- let nl_pos = String. index_from ! buffer ! buffer_pos '\n' in
204- let line = String. sub ! buffer ! buffer_pos (nl_pos - ! buffer_pos) in
205- buffer_pos := nl_pos + 1 ;
206- Some line
207- with Not_found ->
208- (* Need more data *)
209- if ! offset > = handle.size then
210- (* End of file - return remaining buffer if any *)
211- if ! buffer_pos < String. length ! buffer then (
212- let line =
213- String. sub ! buffer ! buffer_pos
214- (String. length ! buffer - ! buffer_pos)
215- in
216- buffer := " " ;
217- buffer_pos := 0 ;
218- Some line)
219- else (
220- close_mmap handle;
221- closed := true ;
222- None )
223- else
224- (* Read next chunk *)
225- let chunk =
226- read_mmap_chunk handle ~offset: ! offset ~length: chunk_size
227- in
228- offset := ! offset + String. length chunk;
229-
230- (* Append to remaining buffer *)
231- if ! buffer_pos < String. length ! buffer then
232- buffer :=
233- String. sub ! buffer ! buffer_pos
234- (String. length ! buffer - ! buffer_pos)
235- ^ chunk
236- else buffer := chunk;
237- buffer_pos := 0 ;
238- next_line ()
239- in
229+ let push_line_from_buf () =
230+ let line = Buffer. contents buf in
231+ Buffer. clear buf;
232+ Queue. add line lines_queue
233+ in
240234
241- let reset () =
242- offset := 0 ;
243- buffer := " " ;
244- buffer_pos := 0 ;
245- closed := false
246- in
235+ let rec fill_queue () =
236+ if Queue. is_empty lines_queue && not ! closed then
237+ match Uutf. decode ! decoder with
238+ | `Uchar u ->
239+ if Uchar. to_int u = 0x000A then push_line_from_buf ()
240+ else Uutf.Buffer. add_utf_8 buf u;
241+ if Queue. is_empty lines_queue then fill_queue ()
242+ | `Malformed _ ->
243+ Uutf.Buffer. add_utf_8 buf Uutf. u_rep;
244+ fill_queue ()
245+ | `Await ->
246+ if ! offset > = ! file_size then (
247+ Uutf.Manual. src ! decoder (Bytes. create 0 ) 0 0 ;
248+ fill_queue () )
249+ else
250+ let handle = ensure_handle () in
251+ let chunk =
252+ read_mmap_chunk handle ~offset: ! offset ~length: chunk_size
253+ in
254+ offset := ! offset + String. length chunk;
255+ if chunk = " " then (
256+ Uutf.Manual. src ! decoder (Bytes. create 0 ) 0 0 ;
257+ fill_queue () )
258+ else
259+ let bytes = Bytes. of_string chunk in
260+ Uutf.Manual. src ! decoder bytes 0 (Bytes. length bytes);
261+ fill_queue ()
262+ | `End ->
263+ if Buffer. length buf > 0 then push_line_from_buf () ;
264+ close_handle () ;
265+ closed := true
266+ in
247267
248- {
249- next = next_line;
250- cardinality = (fun () -> Unknown );
251- reset = Some reset;
252- spec = (fun () -> Scalar " string" );
253- }
268+ let rec next_line () =
269+ if not (Queue. is_empty lines_queue) then Some (Queue. take lines_queue)
270+ else if ! closed then None
271+ else (
272+ fill_queue () ;
273+ if not (Queue. is_empty lines_queue) then Some (Queue. take lines_queue)
274+ else if ! closed then None
275+ else next_line () )
276+ in
277+
278+ let reset () =
279+ Buffer. clear buf;
280+ Queue. clear lines_queue;
281+ offset := 0 ;
282+ closed := false ;
283+ decoder := make_decoder () ;
284+ close_handle () ;
285+ ignore (open_handle () )
286+ in
287+
288+ {
289+ next = next_line;
290+ cardinality = (fun () -> Unknown );
291+ reset = Some reset;
292+ spec = (fun () -> Scalar " string" );
293+ }
254294
255295let from_text_files ?(encoding = `UTF8 ) ?(chunk_size = 65536 ) paths =
256296 let current_file = ref 0 in
@@ -262,7 +302,8 @@ let from_text_files ?(encoding = `UTF8) ?(chunk_size = 65536) paths =
262302 if ! current_file > = List. length paths then None
263303 else
264304 let path = List. nth paths ! current_file in
265- current_dataset := Some (from_text_file ~encoding ~chunk_size path);
305+ let ds = from_text_file ~encoding ~chunk_size path in
306+ current_dataset := Some ds;
266307 incr current_file;
267308 next ()
268309 | Some ds -> (
0 commit comments