Skip to content

Commit

Permalink
[kv store] add timeout to bigtable ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenix-o committed Nov 7, 2024
1 parent 7758e33 commit b0a3046
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 1 deletion.
3 changes: 3 additions & 0 deletions crates/sui-data-ingestion-core/src/progress_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ impl<P: ProgressStore> ProgressStore for ProgressStoreWrapper<P> {
task_name: String,
checkpoint_number: CheckpointSequenceNumber,
) -> Result<()> {
if checkpoint_number % 100 != 0 {
return Ok(());
}
self.progress_store
.save(task_name.clone(), checkpoint_number)
.await?;
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-data-ingestion-core/src/reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::collections::HashMap;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::info;

pub(crate) async fn reduce<W: Worker>(
task_name: String,
Expand Down Expand Up @@ -39,6 +40,7 @@ pub(crate) async fn reduce<W: Worker>(
}
current_checkpoint_number += 1;
}
info!("reducer batch size {}", batch.len());
match reducer {
Some(ref reducer) => {
if reducer.should_close_batch(&batch, None) {
Expand Down
9 changes: 8 additions & 1 deletion crates/sui-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use prometheus::Registry;
use serde::{Deserialize, Serialize};
use std::env;
use std::path::PathBuf;
use std::time::Duration;
use sui_data_ingestion::{
ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker,
DynamoDBProgressStore, KVStoreTaskConfig, KVStoreWorker,
Expand Down Expand Up @@ -45,6 +46,7 @@ struct ProgressStoreConfig {
#[derive(Serialize, Deserialize, Clone, Debug)]
struct BigTableTaskConfig {
instance_id: String,
timeout_secs: usize,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -154,7 +156,12 @@ async fn main() -> Result<()> {
executor.register(worker_pool).await?;
}
Task::BigTableKV(kv_config) => {
let client = BigTableClient::new_remote(kv_config.instance_id, false, None).await?;
let client = BigTableClient::new_remote(
kv_config.instance_id,
false,
Some(Duration::from_secs(kv_config.timeout_secs as u64)),
)
.await?;
let worker_pool = WorkerPool::new(
KvWorker { client },
task_config.name,
Expand Down

0 comments on commit b0a3046

Please sign in to comment.