Skip to content

Commit

Permalink
Queue based on batch size, rather than batch count (#556)
Browse files Browse the repository at this point in the history
* Limit queues based on number of rows instead of number of batches, and make it configurable
* Parallelize startup
* More queue metrics
  • Loading branch information
mwylde authored Mar 12, 2024
1 parent 6bbcb8c commit 40ea665
Show file tree
Hide file tree
Showing 18 changed files with 464 additions and 238 deletions.
6 changes: 3 additions & 3 deletions crates/arroyo-connectors/src/kafka/source/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};

use crate::kafka::SourceOffset;
use arroyo_operator::context::{ArrowContext, QueueItem};
use arroyo_operator::context::{batch_bounded, ArrowContext, BatchReceiver};
use arroyo_operator::operator::SourceOperator;
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{Format, RawStringFormat};
Expand Down Expand Up @@ -91,7 +91,7 @@ impl KafkaTopicTester {

let (to_control_tx, control_rx) = channel(128);
let (command_tx, from_control_rx) = channel(128);
let (data_tx, recv) = channel(128);
let (data_tx, recv) = batch_bounded(128);

let checkpoint_metadata = restore_from.map(|epoch| CheckpointMetadata {
job_id: task_info.job_id.to_string(),
Expand Down Expand Up @@ -169,7 +169,7 @@ impl KafkaTopicProducer {
struct KafkaSourceWithReads {
to_control_tx: Sender<ControlMessage>,
from_control_rx: Receiver<ControlResp>,
data_recv: Receiver<QueueItem>,
data_recv: BatchReceiver,
}

impl KafkaSourceWithReads {
Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-connectors/src/mqtt/source/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;

use crate::mqtt::{create_connection, MqttConfig, Tls};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use arroyo_operator::context::{ArrowContext, QueueItem};
use arroyo_operator::context::{batch_bounded, ArrowContext, BatchReceiver};
use arroyo_operator::operator::SourceOperator;
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{Format, JsonFormat};
Expand All @@ -28,7 +28,7 @@ struct MqttSourceWithReads {
to_control_tx: Sender<ControlMessage>,
#[allow(dead_code)]
from_control_rx: Receiver<ControlResp>,
data_recv: Receiver<QueueItem>,
data_recv: BatchReceiver,
subscribed: Arc<AtomicBool>,
}

Expand Down Expand Up @@ -129,7 +129,7 @@ impl MqttTopicTester {

let (to_control_tx, control_rx) = channel(128);
let (command_tx, from_control_rx) = channel(128);
let (data_tx, recv) = channel(128);
let (data_tx, recv) = batch_bounded(128);

let mut ctx = ArrowContext::new(
task_info,
Expand Down
4 changes: 1 addition & 3 deletions crates/arroyo-controller/src/states/checkpoint_stopping.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arroyo_rpc::grpc;
use tracing::{debug, info};
use tracing::debug;

use crate::{states::StateError, JobMessage};

Expand All @@ -22,8 +22,6 @@ impl State for CheckpointStopping {

let mut final_checkpoint_started = false;

info!("in a checkpoint stopping");

loop {
match job_controller.checkpoint_finished().await {
Ok(done) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-controller/src/states/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn handle_worker_connect<'a>(
for i in 0..3 {
match Channel::from_shared(rpc_address.clone())
.unwrap()
.timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(90))
.connect()
.await
{
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-df/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ fn convert_simple_data_type(
{
Ok(DataType::Time64(TimeUnit::Nanosecond))
} else {
// We dont support TIMETZ and TIME WITH TIME ZONE for now
// We don't support TIMETZ and TIME WITH TIME ZONE for now
bail!("Unsupported SQL type {sql_type:?}")
}
}
Expand Down
37 changes: 8 additions & 29 deletions crates/arroyo-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,32 +143,13 @@ impl TaskCounters {

pub type QueueGauges = Vec<Vec<Option<IntGauge>>>;

pub fn register_queue_gauges<T>(
pub fn register_queue_gauge<T>(
name: &'static str,
help: &'static str,
task_info: &TaskInfo,
out_qs: &Vec<Vec<T>>,
) -> (QueueGauges, QueueGauges) {
let tx_queue_size_gauges = out_qs
.iter()
.enumerate()
.map(|(i, qs)| {
qs.iter()
.enumerate()
.map(|(j, _)| {
gauge_for_task(
task_info,
"arroyo_worker_tx_queue_size",
"Size of a tx queue",
labels! {
"next_node".to_string() => format!("{}", i),
"next_node_idx".to_string() => format!("{}", j)
},
)
})
.collect()
})
.collect();

let tx_queue_rem_gauges = out_qs
) -> QueueGauges {
out_qs
.iter()
.enumerate()
.map(|(i, qs)| {
Expand All @@ -177,8 +158,8 @@ pub fn register_queue_gauges<T>(
.map(|(j, _)| {
gauge_for_task(
task_info,
"arroyo_worker_tx_queue_rem",
"Remaining space in a tx queue",
name,
help,
labels! {
"next_node".to_string() => format!("{}", i),
"next_node_idx".to_string() => format!("{}", j)
Expand All @@ -187,7 +168,5 @@ pub fn register_queue_gauges<T>(
})
.collect()
})
.collect();

(tx_queue_size_gauges, tx_queue_rem_gauges)
.collect()
}
Loading

0 comments on commit 40ea665

Please sign in to comment.