Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::CubeError;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::instrument;

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct JobProcessResult {
Expand Down Expand Up @@ -72,6 +73,7 @@ impl JobProcessor for JobProcessorImpl {
Ok(())
}

#[instrument(level = "trace", skip(self, job))]
async fn process_job(&self, job: Job) -> Result<JobProcessResult, CubeError> {
self.processor.process_separate_job(&job).await
}
Expand Down Expand Up @@ -110,6 +112,7 @@ impl JobIsolatedProcessor {
)
}

#[instrument(level = "trace", skip(self, job))]
pub async fn process_separate_job(&self, job: &Job) -> Result<JobProcessResult, CubeError> {
match job.job_type() {
JobType::PartitionCompaction => {
Expand Down
306 changes: 185 additions & 121 deletions rust/cubestore/cubestore/src/cluster/ingestion/job_runner.rs

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions rust/cubestore/cubestore/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ lazy_static! {

#[async_trait]
impl Cluster for ClusterImpl {
#[instrument(level = "trace", skip(self))]
async fn notify_job_runner(&self, node_name: String) -> Result<(), CubeError> {
if self.server_name == node_name || is_self_reference(&node_name) {
// TODO `notify_one()` was replaced by `notify_waiters()` here. Revisit in case of delays in job processing.
Expand Down Expand Up @@ -826,6 +827,7 @@ pub struct JobResultListener {
}

impl JobResultListener {
#[instrument(level = "trace", skip(self))]
pub async fn wait_for_job_result(
self,
row_key: RowKey,
Expand All @@ -839,6 +841,7 @@ impl JobResultListener {
.unwrap())
}

#[instrument(level = "trace", skip(self, results))]
pub async fn wait_for_job_results(
mut self,
mut results: Vec<(RowKey, JobType)>,
Expand Down
60 changes: 60 additions & 0 deletions rust/cubestore/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ pub trait ConfigObj: DIService {

fn import_job_timeout(&self) -> u64;

fn scheduled_job_orphaned_timeout(&self) -> u64;

fn meta_store_snapshot_interval(&self) -> u64;

fn meta_store_log_upload_interval(&self) -> u64;
Expand Down Expand Up @@ -550,6 +552,14 @@ pub trait ConfigObj: DIService {
fn remote_files_cleanup_batch_size(&self) -> u64;

fn create_table_max_retries(&self) -> u64;

fn http_location_size_max_retries(&self) -> u64;

fn http_location_size_initial_sleep_ms(&self) -> u64;

fn http_location_size_sleep_multiplier(&self) -> u64;

fn http_location_size_timeout_secs(&self) -> u64;
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -584,6 +594,7 @@ pub struct ConfigObjImpl {
pub not_used_timeout: u64,
pub in_memory_not_used_timeout: u64,
pub import_job_timeout: u64,
pub scheduled_job_orphaned_timeout: u64,
pub meta_store_log_upload_interval: u64,
pub meta_store_log_upload_size_limit: u64,
pub meta_store_snapshot_interval: u64,
Expand Down Expand Up @@ -652,6 +663,10 @@ pub struct ConfigObjImpl {
pub remote_files_cleanup_delay_secs: u64,
pub remote_files_cleanup_batch_size: u64,
pub create_table_max_retries: u64,
pub http_location_size_max_retries: u64,
pub http_location_size_initial_sleep_ms: u64,
pub http_location_size_sleep_multiplier: u64,
pub http_location_size_timeout_secs: u64,
}

crate::di_service!(ConfigObjImpl, [ConfigObj]);
Expand Down Expand Up @@ -762,6 +777,10 @@ impl ConfigObj for ConfigObjImpl {
self.import_job_timeout
}

fn scheduled_job_orphaned_timeout(&self) -> u64 {
self.scheduled_job_orphaned_timeout
}

fn meta_store_snapshot_interval(&self) -> u64 {
self.meta_store_snapshot_interval
}
Expand Down Expand Up @@ -1027,6 +1046,22 @@ impl ConfigObj for ConfigObjImpl {
self.create_table_max_retries
}

fn http_location_size_max_retries(&self) -> u64 {
self.http_location_size_max_retries
}

fn http_location_size_initial_sleep_ms(&self) -> u64 {
self.http_location_size_initial_sleep_ms
}

fn http_location_size_sleep_multiplier(&self) -> u64 {
self.http_location_size_sleep_multiplier
}

fn http_location_size_timeout_secs(&self) -> u64 {
self.http_location_size_timeout_secs
}

fn cachestore_cache_eviction_below_threshold(&self) -> u8 {
self.cachestore_cache_eviction_below_threshold
}
Expand Down Expand Up @@ -1344,6 +1379,10 @@ impl Config {
not_used_timeout: 2 * query_timeout,
in_memory_not_used_timeout: 30,
import_job_timeout: env_parse("CUBESTORE_IMPORT_JOB_TIMEOUT", 600),
scheduled_job_orphaned_timeout: env_parse(
"CUBESTORE_SCHEDULED_JOB_ORPHANED_TIMEOUT",
query_timeout * 3,
),
meta_store_log_upload_interval: 30,
meta_store_log_upload_size_limit: env_parse_size(
"CUBESTORE_METASTORE_UPLOAD_LOG_SIZE_LIMIT",
Expand Down Expand Up @@ -1569,6 +1608,22 @@ impl Config {
50000,
),
create_table_max_retries: env_parse("CUBESTORE_CREATE_TABLE_MAX_RETRIES", 3),
http_location_size_max_retries: env_parse(
"CUBESTORE_HTTP_LOCATION_SIZE_MAX_RETRIES",
1,
),
http_location_size_initial_sleep_ms: env_parse(
"CUBESTORE_HTTP_LOCATION_SIZE_INITIAL_SLEEP_MS",
100,
),
http_location_size_sleep_multiplier: env_parse(
"CUBESTORE_HTTP_LOCATION_SIZE_SLEEP_MULTIPLIER",
2,
),
http_location_size_timeout_secs: env_parse(
"CUBESTORE_HTTP_LOCATION_SIZE_TIMEOUT_SECS",
60,
),
}),
}
}
Expand Down Expand Up @@ -1628,6 +1683,7 @@ impl Config {
not_used_timeout: 2 * query_timeout,
in_memory_not_used_timeout: 30,
import_job_timeout: 600,
scheduled_job_orphaned_timeout: query_timeout * 3,
stale_stream_timeout: 60,
select_workers: Vec::new(),
worker_bind_address: None,
Expand Down Expand Up @@ -1697,6 +1753,10 @@ impl Config {
remote_files_cleanup_delay_secs: 3600,
remote_files_cleanup_batch_size: 50000,
create_table_max_retries: 3,
http_location_size_max_retries: 1,
http_location_size_initial_sleep_ms: 100,
http_location_size_sleep_multiplier: 2,
http_location_size_timeout_secs: 60,
}
}
}
Expand Down
127 changes: 104 additions & 23 deletions rust/cubestore/cubestore/src/import/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,8 +866,15 @@ impl ImportService for ImportServiceImpl {
}

async fn estimate_location_row_count(&self, location: &str) -> Result<u64, CubeError> {
let file_size =
LocationHelper::location_file_size(location, self.remote_fs.clone()).await?;
let file_size = LocationHelper::location_file_size(
location,
self.remote_fs.clone(),
self.config_obj.http_location_size_max_retries(),
self.config_obj.http_location_size_initial_sleep_ms(),
self.config_obj.http_location_size_sleep_multiplier(),
self.config_obj.http_location_size_timeout_secs(),
)
.await?;
Ok(ImportServiceImpl::estimate_rows(location, file_size))
}

Expand All @@ -879,33 +886,107 @@ impl ImportService for ImportServiceImpl {
pub struct LocationHelper;

impl LocationHelper {
pub async fn location_file_size(
async fn try_get_http_location_size(
location: &str,
remote_fs: Arc<dyn RemoteFs>,
timeout_secs: u64,
) -> Result<Option<u64>, CubeError> {
let res = if location.starts_with("http") {
let client = reqwest::Client::new();
let req = client.head(location).build()?;

// S3 doesn't support HEAD for pre signed urls with GetObject command
if req
.url()
.domain()
.map(|v| v.contains("amazonaws.com"))
.unwrap_or(false)
{
return Ok(None);
}
let client = reqwest::ClientBuilder::new()
.timeout(Duration::from_secs(timeout_secs))
.build()?;
let req = client.head(location).build()?;

// S3 doesn't support HEAD for pre signed urls with GetObject command
if req
.url()
.domain()
.map(|v| v.contains("amazonaws.com"))
.unwrap_or(false)
{
return Ok(None);
}

let res = client.execute(req).await?;
let res = client.execute(req).await.map_err(|e| {
let kind = if e.is_timeout() {
"timeout"
} else if e.is_connect() {
"connect"
} else if e.is_body() {
"body"
} else if e.is_decode() {
"decode"
} else if e.is_redirect() {
"redirect"
} else if e.is_builder() {
"builder"
} else if e.is_request() {
"request"
} else {
"unknown"
};
CubeError::internal(format!("HTTP HEAD error (kind: {}): {}", kind, e))
})?;

let length = res.headers().get(reqwest::header::CONTENT_LENGTH);
let length = res.headers().get(reqwest::header::CONTENT_LENGTH);

if let Some(length) = length {
Some(length.to_str()?.parse::<u64>()?)
} else {
None
if let Some(length) = length {
Ok(Some(length.to_str()?.parse::<u64>()?))
} else {
Ok(None)
}
}

async fn get_http_location_size(
location: &str,
max_retries: u64,
initial_sleep_ms: u64,
sleep_multiplier: u64,
timeout_secs: u64,
) -> Result<Option<u64>, CubeError> {
let mut retry_attempts = max_retries as i32;
let mut retries_sleep = Duration::from_millis(initial_sleep_ms);
loop {
retry_attempts -= 1;
let result = Self::try_get_http_location_size(location, timeout_secs).await;

if retry_attempts <= 0 {
return result;
}
match result {
Ok(size) => {
return Ok(size);
}
Err(err) => {
log::error!(
"HEAD {} error: {}. Retrying {}/{}...",
location,
err,
max_retries as i32 - retry_attempts,
max_retries
);
sleep(retries_sleep).await;
retries_sleep *= sleep_multiplier as u32;
}
}
}
}

pub async fn location_file_size(
location: &str,
remote_fs: Arc<dyn RemoteFs>,
max_retries: u64,
initial_sleep_ms: u64,
sleep_multiplier: u64,
timeout_secs: u64,
) -> Result<Option<u64>, CubeError> {
let res = if location.starts_with("http") {
Self::get_http_location_size(
location,
max_retries,
initial_sleep_ms,
sleep_multiplier,
timeout_secs,
)
.await?
} else if location.starts_with("temp://") {
let remote_path = Self::temp_uploads_path(location);
match remote_fs.list_with_metadata(remote_path).await {
Expand Down
11 changes: 7 additions & 4 deletions rust/cubestore/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,7 @@ pub trait MetaStore: DIService + Send + Sync {
async fn get_orphaned_jobs(
&self,
orphaned_timeout: Duration,
scheduled_orphaned_timeout: Duration,
) -> Result<Vec<IdRow<Job>>, CubeError>;
async fn get_jobs_on_non_exists_nodes(&self) -> Result<Vec<IdRow<Job>>, CubeError>;
async fn delete_job(&self, job_id: u64) -> Result<IdRow<Job>, CubeError>;
Expand Down Expand Up @@ -3971,21 +3972,23 @@ impl MetaStore for RocksMetaStore {
async fn get_orphaned_jobs(
&self,
orphaned_timeout: Duration,
scheduled_orphaned_timeout: Duration,
) -> Result<Vec<IdRow<Job>>, CubeError> {
let duration = chrono::Duration::from_std(orphaned_timeout).unwrap();
let scheduled_duration = chrono::Duration::from_std(scheduled_orphaned_timeout).unwrap();
self.read_operation_out_of_queue("get_orphaned_jobs", move |db_ref| {
let jobs_table = JobRocksTable::new(db_ref);
let time = Utc::now();
let all_jobs = jobs_table
.all_rows()?
.into_iter()
.filter(|j| {
let elapsed = time.signed_duration_since(j.get_row().last_heart_beat().clone());
if let JobStatus::Scheduled(_) = j.get_row().status() {
return false;
elapsed > scheduled_duration
} else {
elapsed > duration
}
let duration1 =
time.signed_duration_since(j.get_row().last_heart_beat().clone());
duration1 > duration
})
.collect::<Vec<_>>();
Ok(all_jobs)
Expand Down
1 change: 1 addition & 0 deletions rust/cubestore/cubestore/src/queryplanner/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ impl MetaStore for MetaStoreMock {
async fn get_orphaned_jobs(
&self,
_orphaned_timeout: Duration,
_scheduled_orphaned_timeout: Duration,
) -> Result<Vec<IdRow<Job>>, CubeError> {
panic!("MetaStore mock!")
}
Expand Down
8 changes: 7 additions & 1 deletion rust/cubestore/cubestore/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,9 +681,15 @@ impl SchedulerImpl {
}

async fn remove_orphaned_jobs(&self) -> Result<(), CubeError> {
let query_timeout = self.config.query_timeout();
// Use 2x query_timeout for processing jobs that lost heartbeat
let processing_orphaned_timeout = Duration::from_secs(query_timeout * 2);
// Configurable timeout for scheduled jobs that were never picked up (default: 3x query_timeout)
let scheduled_orphaned_timeout =
Duration::from_secs(self.config.scheduled_job_orphaned_timeout());
let orphaned_jobs = self
.meta_store
.get_orphaned_jobs(Duration::from_secs(120)) // TODO config
.get_orphaned_jobs(processing_orphaned_timeout, scheduled_orphaned_timeout)
.await?;
for job in orphaned_jobs {
log::info!("Removing orphaned job: {:?}", job);
Expand Down
Loading