Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 40 additions & 30 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{

use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
use futures_util::stream::FuturesUnordered;
use metrics::gauge;
use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
use tokio::{
select,
Expand Down Expand Up @@ -51,7 +50,7 @@ use crate::{
spawn_named,
topology::task::TaskError,
transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
utilization::{UtilizationComponentSender, UtilizationEmitter, wrap},
utilization::wrap,
};

static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
Expand Down Expand Up @@ -83,7 +82,6 @@ struct Builder<'a> {
healthchecks: HashMap<ComponentKey, Task>,
detach_triggers: HashMap<ComponentKey, Trigger>,
extra_context: ExtraContext,
utilization_emitter: UtilizationEmitter,
}

impl<'a> Builder<'a> {
Expand All @@ -105,7 +103,6 @@ impl<'a> Builder<'a> {
healthchecks: HashMap::new(),
detach_triggers: HashMap::new(),
extra_context,
utilization_emitter: UtilizationEmitter::new(),
}
}

Expand All @@ -129,7 +126,6 @@ impl<'a> Builder<'a> {
healthchecks: self.healthchecks,
shutdown_coordinator: self.shutdown_coordinator,
detach_triggers: self.detach_triggers,
utilization_emitter: Some(self.utilization_emitter),
})
} else {
Err(self.errors)
Expand Down Expand Up @@ -515,7 +511,7 @@ impl<'a> Builder<'a> {

let (transform_task, transform_outputs) = {
let _span = span.enter();
build_transform(transform, node, input_rx, &mut self.utilization_emitter)
build_transform(transform, node, input_rx)
};

self.outputs.extend(transform_outputs);
Expand Down Expand Up @@ -619,10 +615,9 @@ impl<'a> Builder<'a> {

let (trigger, tripwire) = Tripwire::new();

let utilization_sender = self
.utilization_emitter
.add_component(key.clone(), gauge!("utilization"));
#[cfg(debug_assertions)]
let component_key = key.clone();

let sink = async move {
debug!("Sink starting.");

Expand All @@ -638,7 +633,11 @@ impl<'a> Builder<'a> {
.take()
.expect("Task started but input has been taken.");

let mut rx = wrap(utilization_sender, component_key.clone(), rx);
let mut rx = wrap(
rx,
#[cfg(debug_assertions)]
component_key.clone(),
);

let events_received = register!(EventsReceived);
sink.run(
Expand Down Expand Up @@ -768,7 +767,6 @@ pub struct TopologyPieces {
pub(super) healthchecks: HashMap<ComponentKey, Task>,
pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
pub(crate) utilization_emitter: Option<UtilizationEmitter>,
}

impl TopologyPieces {
Expand Down Expand Up @@ -847,22 +845,18 @@ fn build_transform(
transform: Transform,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
match transform {
// TODO: avoid the double boxing for function transforms here
Transform::Function(t) => {
build_sync_transform(Box::new(t), node, input_rx, utilization_emitter)
}
Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_emitter),
Transform::Function(t) => build_sync_transform(Box::new(t), node, input_rx),
Transform::Synchronous(t) => build_sync_transform(t, node, input_rx),
Transform::Task(t) => build_task_transform(
t,
input_rx,
node.input_details.data_type(),
node.typetag,
&node.key,
&node.outputs,
utilization_emitter,
),
}
}
Expand All @@ -871,12 +865,17 @@ fn build_sync_transform(
t: Box<dyn SyncTransform>,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);

let sender = utilization_emitter.add_component(node.key.clone(), gauge!("utilization"));
let runner = Runner::new(t, input_rx, sender, node.input_details.data_type(), outputs);
let runner = Runner::new(
t,
input_rx,
node.input_details.data_type(),
outputs,
#[cfg(debug_assertions)]
node.key.clone(),
);
let transform = if node.enable_concurrency {
runner.run_concurrently().boxed()
} else {
Expand Down Expand Up @@ -916,30 +915,39 @@ struct Runner {
input_rx: Option<BufferReceiver<EventArray>>,
input_type: DataType,
outputs: TransformOutputs,
timer_tx: UtilizationComponentSender,
timer: crate::utilization::Timer,
last_report: Instant,
events_received: Registered<EventsReceived>,
}

impl Runner {
fn new(
transform: Box<dyn SyncTransform>,
input_rx: BufferReceiver<EventArray>,
timer_tx: UtilizationComponentSender,
input_type: DataType,
outputs: TransformOutputs,
#[cfg(debug_assertions)] component_key: ComponentKey,
) -> Self {
Self {
transform,
input_rx: Some(input_rx),
input_type,
outputs,
timer_tx,
timer: crate::utilization::Timer::new(
#[cfg(debug_assertions)]
Arc::from(component_key.id()),
),
last_report: Instant::now(),
events_received: register!(EventsReceived),
}
}

fn on_events_received(&mut self, events: &EventArray) {
self.timer_tx.try_send_stop_wait();
let stopped = self.timer.stop_wait();
if stopped.duration_since(self.last_report).as_secs() >= 5 {
self.timer.report();
self.last_report = stopped;
}

self.events_received.emit(CountByteSize(
events.len(),
Expand All @@ -948,7 +956,7 @@ impl Runner {
}

async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
self.timer_tx.try_send_start_wait();
self.timer.start_wait();
self.outputs.send(outputs_buf).await
}

Expand All @@ -965,7 +973,7 @@ impl Runner {
.into_stream()
.filter(move |events| ready(filter_events_type(events, self.input_type)));

self.timer_tx.try_send_start_wait();
self.timer.start_wait();
while let Some(events) = input_rx.next().await {
self.on_events_received(&events);
self.transform.transform_all(events, &mut outputs_buf);
Expand All @@ -991,7 +999,7 @@ impl Runner {
let mut in_flight = FuturesOrdered::new();
let mut shutting_down = false;

self.timer_tx.try_send_start_wait();
self.timer.start_wait();
loop {
tokio::select! {
biased;
Expand Down Expand Up @@ -1052,12 +1060,14 @@ fn build_task_transform(
typetag: &str,
key: &ComponentKey,
outputs: &[TransformOutput],
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (mut fanout, control) = Fanout::new();

let sender = utilization_emitter.add_component(key.clone(), gauge!("utilization"));
let input_rx = wrap(sender, key.clone(), input_rx.into_stream());
let input_rx = crate::utilization::wrap(
input_rx.into_stream(),
#[cfg(debug_assertions)]
key.clone(),
);

let events_received = register!(EventsReceived);
let filtered = input_rx
Expand Down
41 changes: 2 additions & 39 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ use std::{
};

use futures::{Future, FutureExt, future};
use stream_cancel::Trigger;
use tokio::{
sync::{mpsc, watch},
time::{Duration, Instant, interval, sleep_until},
};
use tracing::Instrument;
use vector_lib::{
buffers::topology::channel::BufferSender,
shutdown::ShutdownSignal,
tap::topology::{TapOutput, TapResource, WatchRx, WatchTx},
trigger::DisabledTrigger,
};
Expand All @@ -25,7 +23,7 @@ use super::{
builder::{self, TopologyPieces, reload_enrichment_tables},
fanout::{ControlChannel, ControlMessage},
handle_errors, retain, take_healthchecks,
task::{Task, TaskOutput},
task::TaskOutput,
};
use crate::{
config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
Expand All @@ -36,7 +34,6 @@ use crate::{
signal::ShutdownError,
spawn_named,
};

pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;

#[allow(dead_code)]
Expand All @@ -54,8 +51,6 @@ pub struct RunningTopology {
watch: (WatchTx, WatchRx),
pub(crate) running: Arc<AtomicBool>,
graceful_shutdown_duration: Option<Duration>,
utilization_task: Option<TaskHandle>,
utilization_task_shutdown_trigger: Option<Trigger>,
pending_reload: Option<HashSet<ComponentKey>>,
}

Expand All @@ -75,8 +70,6 @@ impl RunningTopology {
running: Arc::new(AtomicBool::new(true)),
graceful_shutdown_duration: config.graceful_shutdown_duration,
config,
utilization_task: None,
utilization_task_shutdown_trigger: None,
pending_reload: None,
}
}
Expand Down Expand Up @@ -136,21 +129,15 @@ impl RunningTopology {
// pump in self.tasks, and the other for source in self.source_tasks.
let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();

let map_closure = |_result| ();

// We need to give some time to the sources to gracefully shutdown, so
// we will merge them with other tasks.
for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
let task = task.map(map_closure).shared();
let task = task.map(|_result| ()).shared();

wait_handles.push(task.clone());
check_handles.entry(key).or_default().push(task);
}

if let Some(utilization_task) = self.utilization_task {
wait_handles.push(utilization_task.map(map_closure).shared());
}

// If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
let deadline = self
.graceful_shutdown_duration
Expand Down Expand Up @@ -238,9 +225,6 @@ impl RunningTopology {

// Now kick off the shutdown process by shutting down the sources.
let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
if let Some(trigger) = self.utilization_task_shutdown_trigger {
trigger.cancel();
}

futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
}
Expand Down Expand Up @@ -1287,10 +1271,6 @@ impl RunningTopology {
return None;
}

let mut utilization_emitter = pieces
.utilization_emitter
.take()
.expect("Topology is missing the utilization metric emitter!");
let mut running_topology = Self::new(config, abort_tx);

if !running_topology
Expand All @@ -1302,23 +1282,6 @@ impl RunningTopology {
running_topology.connect_diff(&diff, &mut pieces).await;
running_topology.spawn_diff(&diff, pieces);

let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
ShutdownSignal::new_wired();
running_topology.utilization_task_shutdown_trigger =
Some(utilization_task_shutdown_trigger);
running_topology.utilization_task = Some(tokio::spawn(Task::new(
"utilization_heartbeat".into(),
"",
async move {
utilization_emitter
.run_utilization(utilization_shutdown_signal)
.await;
// TODO: new task output type for this? Or handle this task in a completely
// different way
Ok(TaskOutput::Healthcheck)
},
)));

Some((running_topology, abort_rx))
}
}
Expand Down
Loading
Loading