Skip to content

Commit b6dca54

Browse files
drew-harrisdensumesh
authored andcommitted
feat: add dataset event types for crawl success/fail
1 parent c52da89 commit b6dca54

File tree

2 files changed

+111
-8
lines changed

2 files changed

+111
-8
lines changed

server/src/bin/crawl-worker.rs

Lines changed: 93 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,13 @@ use std::sync::{
77
Arc,
88
};
99
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
10-
use trieve_server::operators::{
11-
dataset_operator::get_dataset_by_id_query, user_operator::hash_function,
10+
use trieve_server::{
11+
data::models::{self, WorkerEvent},
12+
operators::{
13+
clickhouse_operator::{ClickHouseEvent, EventQueue},
14+
dataset_operator::get_dataset_by_id_query,
15+
user_operator::hash_function,
16+
},
1217
};
1318
use trieve_server::{
1419
data::models::{CrawlRequest, DatasetConfiguration, RedisPool},
@@ -29,12 +34,18 @@ use trieve_server::{
2934
};
3035
use ureq::json;
3136

37+
#[derive(Debug)]
38+
struct ScrapeReport {
39+
request_id: uuid::Uuid,
40+
pages_scraped: u32,
41+
}
42+
3243
#[allow(clippy::print_stdout)]
3344
async fn crawl(
3445
scrape_request: CrawlRequest,
3546
pool: web::Data<Pool>,
3647
redis_pool: web::Data<RedisPool>,
37-
) -> Result<uuid::Uuid, ServiceError> {
48+
) -> Result<ScrapeReport, ServiceError> {
3849
let ingest_result;
3950
loop {
4051
let temp_result = get_crawl_from_firecrawl(scrape_request.scrape_id)
@@ -84,12 +95,16 @@ async fn crawl(
8495

8596
log::info!("Processing {} documents from scrape", data.len());
8697

98+
let mut page_count = 0;
99+
87100
for page in data {
88101
let page = match page {
89102
Some(page) => page,
90103
None => continue,
91104
};
92105

106+
page_count = page_count + 1;
107+
93108
if page.metadata.status_code != Some(200) {
94109
log::error!("Error getting page metadata for chunk: {:?}", page.metadata);
95110
update_crawl_status(scrape_request.id, CrawlStatus::Failed, pool.clone())
@@ -237,14 +252,18 @@ async fn crawl(
237252
)
238253
.await?;
239254

240-
Ok(scrape_request.id)
255+
Ok(ScrapeReport {
256+
request_id: scrape_request.id,
257+
pages_scraped: page_count,
258+
})
241259
}
242260

243261
#[allow(clippy::print_stdout)]
244262
async fn scrape_worker(
245263
should_terminate: Arc<AtomicBool>,
246264
redis_pool: web::Data<RedisPool>,
247265
pool: web::Data<Pool>,
266+
event_queue: actix_web::web::Data<EventQueue>,
248267
) {
249268
log::info!("Starting scrape worker service thread");
250269

@@ -330,10 +349,30 @@ async fn scrape_worker(
330349
}
331350

332351
match crawl(crawl_request.clone(), pool.clone(), redis_pool.clone()).await {
333-
Ok(scrape_id) => {
334-
log::info!("Scrape job completed: {:?}", scrape_id);
352+
Ok(scrape_report) => {
353+
log::info!("Scrape job completed: {:?}", scrape_report);
354+
355+
event_queue
356+
.send(ClickHouseEvent::WorkerEvent(
357+
WorkerEvent::from_details(
358+
crawl_request.dataset_id,
359+
models::EventType::CrawlCompleted {
360+
scrape_id: scrape_report.request_id,
361+
pages_crawled: scrape_report.pages_scraped,
362+
crawl_options: crawl_request.crawl_options,
363+
},
364+
)
365+
.into(),
366+
))
367+
.await;
335368

336-
match update_crawl_status(scrape_id, CrawlStatus::Completed, pool.clone()).await {
369+
match update_crawl_status(
370+
scrape_report.request_id,
371+
CrawlStatus::Completed,
372+
pool.clone(),
373+
)
374+
.await
375+
{
337376
Ok(_) => {}
338377
Err(err) => {
339378
log::error!("Failed to update crawl status: {:?}", err);
@@ -351,6 +390,20 @@ async fn scrape_worker(
351390
Err(err) => {
352391
log::error!("Failed to scrape website: {:?}", err);
353392

393+
event_queue
394+
.send(ClickHouseEvent::WorkerEvent(
395+
WorkerEvent::from_details(
396+
crawl_request.dataset_id,
397+
models::EventType::CrawlFailed {
398+
scrape_id: crawl_request.id,
399+
crawl_options: crawl_request.crawl_options.clone(),
400+
error: format!("{:?}", err),
401+
},
402+
)
403+
.into(),
404+
))
405+
.await;
406+
354407
let _ = readd_error_to_queue(crawl_request, err, redis_pool.clone()).await;
355408
}
356409
};
@@ -442,7 +495,39 @@ fn main() {
442495
signal_hook::flag::register(SIGTERM, Arc::clone(&should_terminate))
443496
.expect("Failed to register shutdown hook");
444497

445-
scrape_worker(should_terminate, web_redis_pool, web_pool).await
498+
let event_queue = if std::env::var("USE_ANALYTICS")
499+
.unwrap_or("false".to_string())
500+
.parse()
501+
.unwrap_or(false)
502+
{
503+
log::info!("Analytics enabled");
504+
505+
let clickhouse_client = clickhouse::Client::default()
506+
.with_url(
507+
std::env::var("CLICKHOUSE_URL")
508+
.unwrap_or("http://localhost:8123".to_string()),
509+
)
510+
.with_user(
511+
std::env::var("CLICKHOUSE_USER").unwrap_or("default".to_string()),
512+
)
513+
.with_password(
514+
std::env::var("CLICKHOUSE_PASSWORD").unwrap_or("".to_string()),
515+
)
516+
.with_database(
517+
std::env::var("CLICKHOUSE_DATABASE").unwrap_or("default".to_string()),
518+
)
519+
.with_option("async_insert", "1")
520+
.with_option("wait_for_async_insert", "0");
521+
522+
let mut event_queue = EventQueue::new(clickhouse_client.clone());
523+
event_queue.start_service();
524+
event_queue
525+
} else {
526+
log::info!("Analytics disabled");
527+
EventQueue::default()
528+
};
529+
let web_event_queue = actix_web::web::Data::new(event_queue);
530+
scrape_worker(should_terminate, web_redis_pool, web_pool, web_event_queue).await
446531
}
447532
.bind_hub(Hub::new_from_top(Hub::current())),
448533
);

server/src/data/models.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1767,6 +1767,18 @@ pub enum EventType {
17671767
GroupChunksUpdated { group_id: uuid::Uuid },
17681768
#[display(fmt = "group_chunks_action_failed")]
17691769
GroupChunksActionFailed { group_id: uuid::Uuid, error: String },
1770+
#[display(fmt = "crawl_completed")]
1771+
CrawlCompleted {
1772+
scrape_id: uuid::Uuid,
1773+
pages_crawled: u32,
1774+
crawl_options: CrawlOptions,
1775+
},
1776+
#[display(fmt = "crawl_failed")]
1777+
CrawlFailed {
1778+
scrape_id: uuid::Uuid,
1779+
crawl_options: CrawlOptions,
1780+
error: String,
1781+
},
17701782
}
17711783

17721784
impl EventType {
@@ -1782,6 +1794,8 @@ impl EventType {
17821794
EventTypeRequest::BulkChunkUploadFailed,
17831795
EventTypeRequest::GroupChunksUpdated,
17841796
EventTypeRequest::GroupChunksActionFailed,
1797+
EventTypeRequest::CrawlCompleted,
1798+
EventTypeRequest::CrawlFailed,
17851799
]
17861800
}
17871801
}
@@ -5341,6 +5355,10 @@ pub enum EventTypeRequest {
53415355
GroupChunksUpdated,
53425356
#[display(fmt = "group_chunks_action_failed")]
53435357
GroupChunksActionFailed,
5358+
#[display(fmt = "crawl_completed")]
5359+
CrawlCompleted,
5360+
#[display(fmt = "crawl_failed")]
5361+
CrawlFailed,
53445362
}
53455363

53465364
#[derive(Debug, Clone, Deserialize, Serialize)]

0 commit comments

Comments
 (0)