@@ -279,64 +279,64 @@ pub fn process_unsupported_file(
279279 // Increment the total chunks counter
280280 TOTAL_CHUNKS . fetch_add ( chunks. len ( ) , Ordering :: SeqCst ) ;
281281
282- // chunks.par_iter().for_each(|chunk| {
283- // let url = format!("http://localhost:{}/{}", port, "createSuperEntity");
284- // let payload = json!({
285- // "file_id": file_id,
286- // "entity_type": "chunk",
287- // "text": chunk,
288- // "start_byte": 0,
289- // "end_byte": chunk.len() as i64,
290- // "order": order_counter.fetch_add(1, Ordering::SeqCst),
291- // });
292-
293- // // Send request to create entity
294- // let entity_response = post_request(&url, payload, &runtime);
295- // let entity_id = match entity_response {
296- // Ok(response) => response.get("entity")
297- // .and_then(|v| v.get("id"))
298- // .and_then(|v| v.as_str())
299- // .map(|s| s.to_string()),
300- // Err(e) => {
301- // eprintln!("Failed to create entity: {}", e);
302- // None
303- // }
304- // };
282+ chunks. par_iter ( ) . for_each ( |chunk| {
283+ let url = format ! ( "http://localhost:{}/{}" , port, "createSuperEntity" ) ;
284+ let payload = json ! ( {
285+ "file_id" : file_id,
286+ "entity_type" : "chunk" ,
287+ "text" : chunk,
288+ "start_byte" : 0 ,
289+ "end_byte" : chunk. len( ) as i64 ,
290+ "order" : order_counter. fetch_add( 1 , Ordering :: SeqCst ) ,
291+ } ) ;
292+
293+ // Send request to create entity
294+ let entity_response = post_request ( & url, payload, & runtime) ;
295+ let entity_id = match entity_response {
296+ Ok ( response) => response. get ( "entity" )
297+ . and_then ( |v| v. get ( "id" ) )
298+ . and_then ( |v| v. as_str ( ) )
299+ . map ( |s| s. to_string ( ) ) ,
300+ Err ( e) => {
301+ eprintln ! ( "Failed to create entity: {}" , e) ;
302+ None
303+ }
304+ } ;
305305
306- // // Increment thread counter for embedding
307- // ACTIVE_THREADS.fetch_add(1, Ordering::SeqCst);
306+ // Increment thread counter for embedding
307+ ACTIVE_THREADS . fetch_add ( 1 , Ordering :: SeqCst ) ;
308308
309- // // Increment pending embeddings counter
310- // PENDING_EMBEDDINGS.fetch_add(1, Ordering::SeqCst);
309+ // Increment pending embeddings counter
310+ PENDING_EMBEDDINGS . fetch_add ( 1 , Ordering :: SeqCst ) ;
311311
312- // // Use a guard to ensure counter is decremented when thread exits
313- // struct EmbedThreadGuard;
314- // impl Drop for EmbedThreadGuard {
315- // fn drop(&mut self) {
316- // ACTIVE_THREADS.fetch_sub(1, Ordering::SeqCst);
317- // }
318- // }
319- // let _embed_guard = EmbedThreadGuard;
312+ // Use a guard to ensure counter is decremented when thread exits
313+ struct EmbedThreadGuard ;
314+ impl Drop for EmbedThreadGuard {
315+ fn drop ( & mut self ) {
316+ ACTIVE_THREADS . fetch_sub ( 1 , Ordering :: SeqCst ) ;
317+ }
318+ }
319+ let _embed_guard = EmbedThreadGuard ;
320320
321- // // Generate embedding
322- // let job = EmbeddingJob {chunk: chunk.clone(), entity_id: entity_id.unwrap(), port};
323- // match tx.try_send(job) {
324- // Ok(_) => {},
325- // Err(err) => {
326- // // Channel is full; send asynchronously so this Rayon thread keeps working
327- // let job_back = err.into_inner();
328- // let tx_async = tx.clone();
329- // let rt = runtime.clone();
330- // rt.spawn(async move {
331- // if let Err(e) = tx_async.send(job_back).await {
332- // eprintln!("Failed to send embedding job asynchronously: {}", e);
333- // // If we failed to send the job, decrement the pending counter
334- // PENDING_EMBEDDINGS.fetch_sub(1, Ordering::SeqCst);
335- // }
336- // });
337- // }
338- // }
339- // });
321+ // Generate embedding
322+ let job = EmbeddingJob { chunk : chunk. clone ( ) , entity_id : entity_id. unwrap ( ) , port} ;
323+ match tx. try_send ( job) {
324+ Ok ( _) => { } ,
325+ Err ( err) => {
326+ // Channel is full; send asynchronously so this Rayon thread keeps working
327+ let job_back = err. into_inner ( ) ;
328+ let tx_async = tx. clone ( ) ;
329+ let rt = runtime. clone ( ) ;
330+ rt. spawn ( async move {
331+ if let Err ( e) = tx_async. send ( job_back) . await {
332+ eprintln ! ( "Failed to send embedding job asynchronously: {}" , e) ;
333+ // If we failed to send the job, decrement the pending counter
334+ PENDING_EMBEDDINGS . fetch_sub ( 1 , Ordering :: SeqCst ) ;
335+ }
336+ } ) ;
337+ }
338+ }
339+ } ) ;
340340 Ok ( ( ) )
341341}
342342
@@ -410,45 +410,45 @@ pub fn ingest_entities(
410410 TOTAL_CHUNKS . fetch_add ( chunks. len ( ) , Ordering :: SeqCst ) ;
411411
412412 // Process chunks in parallel
413- // chunks.par_iter().for_each(|chunk| {
414- // let chunk_clone = chunk.clone();
415- // let entity_id_clone = entity_id.to_string();
413+ chunks. par_iter ( ) . for_each ( |chunk| {
414+ let chunk_clone = chunk. clone ( ) ;
415+ let entity_id_clone = entity_id. to_string ( ) ;
416416
417- // // Increment thread counter for embedding
418- // ACTIVE_THREADS.fetch_add(1, Ordering::SeqCst);
417+ // Increment thread counter for embedding
418+ ACTIVE_THREADS . fetch_add ( 1 , Ordering :: SeqCst ) ;
419419
420- // // Increment pending embeddings counter
421- // PENDING_EMBEDDINGS.fetch_add(1, Ordering::SeqCst);
420+ // Increment pending embeddings counter
421+ PENDING_EMBEDDINGS . fetch_add ( 1 , Ordering :: SeqCst ) ;
422422
423- // // Use a guard to ensure counter is decremented when thread exits
424- // struct EmbedThreadGuard;
425- // impl Drop for EmbedThreadGuard {
426- // fn drop(&mut self) {
427- // ACTIVE_THREADS.fetch_sub(1, Ordering::SeqCst);
428- // }
429- // }
430- // let _embed_guard = EmbedThreadGuard;
423+ // Use a guard to ensure counter is decremented when thread exits
424+ struct EmbedThreadGuard ;
425+ impl Drop for EmbedThreadGuard {
426+ fn drop ( & mut self ) {
427+ ACTIVE_THREADS . fetch_sub ( 1 , Ordering :: SeqCst ) ;
428+ }
429+ }
430+ let _embed_guard = EmbedThreadGuard ;
431431
432- // // Generate embedding
433- // let job = EmbeddingJob {chunk: chunk_clone, entity_id: entity_id_clone, port};
434- // let tx_clone = tx.clone();
435- // match tx_clone.try_send(job) {
436- // Ok(_) => {},
437- // Err(err) => {
438- // // Channel is full; send asynchronously so this Rayon thread keeps working
439- // let job_back = err.into_inner();
440- // let tx_async = tx_clone.clone();
441- // let rt = runtime_clone.clone();
442- // rt.spawn(async move {
443- // if let Err(e) = tx_async.send(job_back).await {
444- // eprintln!("Failed to send embedding job asynchronously: {}", e);
445- // // If we failed to send the job, decrement the pending counter
446- // PENDING_EMBEDDINGS.fetch_sub(1, Ordering::SeqCst);
447- // }
448- // });
449- // }
450- // }
451- // });
432+ // Generate embedding
433+ let job = EmbeddingJob { chunk : chunk_clone, entity_id : entity_id_clone, port} ;
434+ let tx_clone = tx. clone ( ) ;
435+ match tx_clone. try_send ( job) {
436+ Ok ( _) => { } ,
437+ Err ( err) => {
438+ // Channel is full; send asynchronously so this Rayon thread keeps working
439+ let job_back = err. into_inner ( ) ;
440+ let tx_async = tx_clone. clone ( ) ;
441+ let rt = runtime_clone. clone ( ) ;
442+ rt. spawn ( async move {
443+ if let Err ( e) = tx_async. send ( job_back) . await {
444+ eprintln ! ( "Failed to send embedding job asynchronously: {}" , e) ;
445+ // If we failed to send the job, decrement the pending counter
446+ PENDING_EMBEDDINGS . fetch_sub ( 1 , Ordering :: SeqCst ) ;
447+ }
448+ } ) ;
449+ }
450+ }
451+ } ) ;
452452 }
453453 }
454454 }
@@ -555,38 +555,38 @@ fn process_entity(
555555 TOTAL_CHUNKS . fetch_add ( chunks. len ( ) , Ordering :: SeqCst ) ;
556556
557557 // Process chunks in parallel using rayon
558- // chunks.par_iter().for_each(|chunk| {
559- // let chunk_clone = chunk.clone();
560- // let entity_id_clone = entity_id.clone();
558+ chunks. par_iter ( ) . for_each ( |chunk| {
559+ let chunk_clone = chunk. clone ( ) ;
560+ let entity_id_clone = entity_id. clone ( ) ;
561561
562- // // Increment thread counter
563- // ACTIVE_THREADS.fetch_add(1, Ordering::SeqCst);
562+ // Increment thread counter
563+ ACTIVE_THREADS . fetch_add ( 1 , Ordering :: SeqCst ) ;
564564
565- // // Increment pending embeddings counter
566- // PENDING_EMBEDDINGS.fetch_add(1, Ordering::SeqCst);
567-
568- // // Send embedding job to async background worker via mpsc channel
569- // let job = EmbeddingJob {chunk: chunk_clone, entity_id: entity_id_clone, port};
570- // let tx_clone = tx.clone();
571- // match tx_clone.try_send(job) {
572- // Ok(_) => {},
573- // Err(err) => {
574- // let job_back = err.into_inner();
575- // let tx_async = tx_clone.clone();
576- // let rt = runtime.clone();
577- // rt.spawn(async move {
578- // if let Err(e) = tx_async.send(job_back).await {
579- // eprintln!("Failed to send embedding job asynchronously: {}", e);
580- // // If we failed to send the job, decrement the pending counter
581- // PENDING_EMBEDDINGS.fetch_sub(1, Ordering::SeqCst);
582- // }
583- // });
584- // }
585- // }
586-
587- // // Decrement counters
588- // ACTIVE_THREADS.fetch_sub(1, Ordering::SeqCst);
589- // });
565+ // Increment pending embeddings counter
566+ PENDING_EMBEDDINGS . fetch_add ( 1 , Ordering :: SeqCst ) ;
567+
568+ // Send embedding job to async background worker via mpsc channel
569+ let job = EmbeddingJob { chunk : chunk_clone, entity_id : entity_id_clone, port} ;
570+ let tx_clone = tx. clone ( ) ;
571+ match tx_clone. try_send ( job) {
572+ Ok ( _) => { } ,
573+ Err ( err) => {
574+ let job_back = err. into_inner ( ) ;
575+ let tx_async = tx_clone. clone ( ) ;
576+ let rt = runtime. clone ( ) ;
577+ rt. spawn ( async move {
578+ if let Err ( e) = tx_async. send ( job_back) . await {
579+ eprintln ! ( "Failed to send embedding job asynchronously: {}" , e) ;
580+ // If we failed to send the job, decrement the pending counter
581+ PENDING_EMBEDDINGS . fetch_sub ( 1 , Ordering :: SeqCst ) ;
582+ }
583+ } ) ;
584+ }
585+ }
586+
587+ // Decrement counters
588+ ACTIVE_THREADS . fetch_sub ( 1 , Ordering :: SeqCst ) ;
589+ } ) ;
590590 }
591591
592592 // Recursively process children of entity in parallel
0 commit comments