This project attempts to achieve a building block for complex data processing pipelines in the form of a pipeline executor, which can then bind inputs and outputs with other executors via stdin/stdout, HTTP, Redis, AMQP, or MQTT. Data processing tasks can be written inline using jq or Jsonnet, or be delegated to external services.
CDPs are programs that take data events from a source, maybe transform them, and maybe send them to specific targets. Two definitions are required to make this concrete:
All handled data in CDP comes in the form of events. Events are objects with a fixed enveloping structure and a free inner structure. They're always JSON-encodable and decodable. The envelope is comprised of the following fields:
n, which holds the event's name. The meaning assigned to the name of an event is for the user to decide, although it's useful to link it to the notion of stream or time series, where it could identify a set of events. Names are mandatory to provide and are restricted to a specific scheme.d, which holds the event's contents. The contents may be any value, or they could even be missing.t, which holds the event's trace. This is optional to provide as the input, but CDP programs will always create or extend this field on each event they process. The value assigned to t will be a list of processing history, with each new entry added to the end of the list.t[].i, which holds a trace entry's timestamp, which corresponds to the unix timestamp (in seconds) at which a CDP program first received the event.t[].p, which holds a trace entry's pipeline name, which corresponds to the declared name of the CDP program which handled the event.t[].h, which holds a SHA-1 signature of the CDP program that received the event.
An example of a JSON-encoded event is the following:
{
"n": "madhava",
"d": {"value": 3.1426047456630846, "terms": 5, "sqrt12": 3.4641016151377544},
"t": [
{"i": 1640465107, "p": "Madhava series", "h": "03a98d0890dcd7ba2ab25928e81fb94e6a778166"},
{"i": 1640465318, "p": "PI approximations", "h": "df0673ccd8e0e7fba18c71648b37d4c1570e93f8"}
]
}A CDP program or pipeline is a program built using a definition file and a fixed structure. Pipelines are programs that take events from a source and process them using user-defined steps. They're structured as follows:
- They have a name, which could be human-friendly or not as CDP only uses this for generating hashes.
- They have an input, which defines the source of the event stream. Examples of inputs are STDIN or HTTP RESTy endpoints.
- They have zero or more processing steps.
Each processing step in the pipeline is a function of a vector of events, which in turn returns another (possibly modified) vector of events for the other steps of the pipeline to process. To define a step, the user may define five components:
- The step dependency (with the
afterkeyword), which forces a step to feed on events from the specified steps it depends on. By default, all steps run in parallel unless the step dependency is specified, so this is the main way of specifying sequential processes. CDP will prevent the user from specifying a graph cycle through this mechanism. - The pattern to match event names (with the
match/dropandmatch/passkeywords). The pattern allows the user to limit even more which events are to be handled by this step. This can be combined with step dependency for more control over the pipeline graph's edges. - The mode of stream processing, one of two alternatives: (a)
flatmap, which are applied to all events incoming from previous steps, or (b)reduce, which are applied only once per event vector. - The vector definition (with the
windowkeyword) that indicates how CDP is to assemble each event vector for the processing step. This can be omitted to indicate that each vector should contain a single event. - And finally the processing function itself (keyed by either
flatmaporreduce), which will take an event vector and generate another event vector. It's here that a CDP user can choose to filter events out of the pipeline, enforce event structure, compute results inline, forward events to another program for external processing, or any combination of those.
CDP programs are written as YAML-formatted files. An example of a trivial CDP program is the following:
---
name: "pipe"
input:
stdin:
steps:
print:
flatmap:
send-stdout:
Less trivial examples can be found in the examples directory.
docker run --rm plotter/cdp:latest --helpOnce you've created a pipeline file (say: pipeline.yaml), the
easiest way to run CDP is with Docker, Podman, or any OCI-compatible
software:
docker run \
--rm \
-v $(pwd)/pipeline.yaml:/app/pipeline.yaml \
plotter/cdp:latest /app/pipeline.yamlAlternatively you can run CDP using your own NodeJS installation by extracting the source code from the container image. For example, using Docker:
# Extract the source into cdp.js
export container=$(docker create plotter/cdp:latest)
docker cp ${container}:/src/index.js cdp.js
docker rm ${container}
unset container
# Use it
node cdp.js --helpThe structure of a pipeline file is described below. The source code of the structure validation may also be followed to verify the implementation of any given field or option: here.
Input forms follow the schema:
input required object, a structure containing a single input
form.
Most input forms implement an event-generating channel that can be paused by backpressure signals. Some input forms only support pausing for a few configuration permutations. Check each input form for details.
To configure backpressure triggering, check the Backpressure section below.
input.generator object or string or null, the input
form that generates events for the pipeline at a fixed rate. This is
most useful for testing pipelines before using the definitive input
form. If given as a string, it's set to be the name of the events
generated. If given as empty or null, the name of the events is set to
"_".
The generator input form reacts to backpressure signals by skipping
event generation.
input.generator.name optional string, the name of the events
that the input form will generate. It defaults to "_".
input.generator.seconds optional number or string, the
interval between two consecutive events generated by the input
form. Defaults to 1 for one second.
input.stdin object or null, the input form that makes a
pipeline read source data from standard input.
The stdin input form doesn't react to backpressure signals.
input.stdin.wrap optional string or object, a wrapping
directive which specifies that incoming data is not encoded events,
and thus should be wrapped.
input.stdin.wrap.name required string, the name given to the
events that wrap the input data.
input.stdin.wrap.raw optional boolean, whether to treat
incoming data as plain text, not JSON.
input.tail string or object, the input form that makes a
pipeline read source data from (the tail of) a file. If given a
string, it will be interpreted as the path to the file to be read.
The tail input form doesn't react to backpressure signals.
input.tail.path required string, the path to the file to be
read.
input.tail.start-at optional "start" or "end", a mode
indicating whether the file should first be read from the beginning or
the end. To prevent event duplication after a restart of CDP, this is
set to "end" by default. Note: this doesn't alter the direction of
reading (which is always "forward"), only the point in the target file
where reading should begin.
input.tail.wrap optional string or object, a wrapping
directive which specifies that incoming data is not encoded events,
and thus should be wrapped.
input.tail.wrap.name required string, the name given to the
events that wrap the input data.
input.tail.wrap.raw optional boolean, whether to treat
incoming data as plain text, not JSON.
input.http object or string, the input form that makes a
pipeline receive source data from HTTP POST requests. If given as a
string, it indicates the path that will receive requests with source data.
The http input form reacts to backpressure signals by responding to
requests with a 503 response. Clients should interpret such responses
as cues to retry the request for a while, e.g. using exponential
backoff.
input.http.endpoint required string, indicates the path that
will receive requests with source data.
input.http.port optional number or string, indicates the
numeric TCP port to listen on. The default value is determined by the
HTTP_SERVER_DEFAULT_PORT variable, and it has a default value of
8000.
input.http.wrap optional string or object, a wrapping
directive which specifies that incoming data is not encoded events,
and thus should be wrapped.
input.http.wrap.name required string, the name given to the
events that wrap the input data.
input.http.wrap.raw optional boolean, whether to treat
incoming data as plain text, not JSON.
input.poll object or string, the input form that makes a
pipeline actively fetch data periodically from a remote source using
HTTP requests. If given as a string, it indicates the URI of the
remote event source.
The poll input form reacts to backpressure signals by skipping
polls.
input.poll.target required string, the target URI to use for
the event-fetching request.
input.poll.seconds optional number or string, the time
interval between two consecutive fetch requests. If omitted it will
default to the value of the POLL_INPUT_DEFAULT_INTERVAL environment
variable, or 5 if the variable is not set.
input.poll.headers optional object, HTTP headers to use for
each request.
input.poll.wrap optional string or object, a wrapping
directive which specifies that incoming data is not encoded events,
and thus should be wrapped.
input.poll.wrap.name required string, the name given to the
events that wrap the input data.
input.poll.wrap.raw optional boolean, whether to treat
incoming data as plain text, not JSON.
input.amqp string or object, the input form that makes
the pipeline receive data from an AMQP broker using the AMQP 0-9-1
protocol (e.g. RabbitMQ). If given a
string, it will be interpreted as a URL and all other parameters will
be set to their respective defaults.
The amqp input form reacts to backpressure signals by not sending
the message ACK back to the broker. This causes messages to be kept in
queue. The amqp input form also instructs the broker to "recover"
non-acked messages once backpressure is turned off, so that they can
be eventually consumed again.
input.amqp.url required string, the URL of the broker to
connect to.
input.amqp.exchange optional object, the description of the
AMQP exchange to assert. For a description of AMQP exchanges, you may
read the AMQP 0-9-1 Model
explanation by
RabbitMQ.
input.amqp.exchange.name optional string, the name of the
AMQP exchange to assert. If omitted, it will default to "cdp".
input.amqp.exchange.type required "direct", "fanout" or
"topic", the type of AMQP exchange to assert. If omitted, it will
default to "topic".
input.amqp.exchange.durable optional boolean, "true" or
"false", whether the exchange should be declared as durable or
not (default is true).
input.amqp.exchange.auto-delete optional boolean, "true"
or "false", whether the exchange should be automatically deleted
once no more queues are bound to it (default is false).
input.amqp.binding-pattern optional string, the pattern used
by the binding between channel and queue. The meaning of the pattern
depends on the type of exchange. Check the AMQP documentation for
details. If omitted, it will default to "cdp" for direct
exchanges, the empty string for fanout exchanges, and "#" (the
match-all pattern) for topic exchanges.
input.amqp.queue optional object, the description of the
AMQP queue to assert. If omitted, all configuration values (including
the queue's name) will be assigned by the broker.
input.amqp.queue.name optional string, the name of the queue
to assert.
input.amqp.queue.durable optional boolean, "true", or
"false", whether the queue should be declared as durable or not
(default is true).
input.amqp.queue.auto-delete optional boolean, "true",
or "false", whether the queue should be automatically deleted once
no more consumers are present (default is false).
input.amqp.queue.message-ttl optional number or string,
the maximum amount of time in milliseconds a message can stay in the
queue unconsumed.
input.amqp.queue.expires optional number or string, the
maximum amount of time in milliseconds the queue can survive without
consumers active. Similar to auto-delete, which applies immediately
if set to true.
input.amqp.queue.dead-letter-exchange optional string, the
name of an exchange to send messages to once they expire.
input.amqp.queue.max-length optional number or string,
the maximum size of the queue. Old messages pushed out of the queue
will be sent to the dead-letter exchange, if set.
input.amqp.queue.max-priority optional number or string,
the maximum value for priority, if used. Check the
documentation for more
information.
input.amqp.wrap optional string or object, a wrapping
directive which specifies that incoming data is not encoded events,
and thus should be wrapped.
input.amqp.wrap.name required string, the name given to the
events that wrap the input data.
input.amqp.wrap.raw optional boolean, whether to treat
incoming data as plain text, not JSON.
input.mqtt string or object, the input form that makes
the pipeline receive data from a MQTT broker. If given a string, it
will be interpreted as a URL and all other parameters will be set to
their respective defaults.
The mqtt input form reacts to backpressure signals by delaying the
message handling through the MQTT.js library's
handleMessage
hook.
input.mqtt.url required string, the URL of the broker to
connect to.
input.mqtt.options optional object, the connection
options. Check the
MQTT.js
library for details.
input.mqtt.topic optional string, list of string or
object, the topic or topics to subscribe to. If given as an
object, the topic names will be the keys, and the values are QoS
specifications of the form {qos: 0 | 1 | 2}. If omitted, the client
will subscribe to "cdp/#".
input.mqtt.wrap optional string or object, a wrapping
directive which specifies that incoming data is not encoded events,
and thus should be wrapped.
input.mqtt.wrap.name required string, the name given to the
events that wrap the input data.
input.mqtt.wrap.raw optional boolean, whether to treat
incoming data as plain text, not JSON.
input.redis object, the input form that makes the pipeline
receive data from a redis instance or cluster. The options for
receiving data are one of
subscribe,
psubscribe,
blpop or
brpop for each of the
corresponding redis commands.
The redis input form can react to backpressure signals when
configured with the blpop or brpop options. The subscribe and
psubscribe options don't support pausing. When reacting to
backpressure, the input channel will skip the execution of blpop or
brpop commands.
input.redis.instance optional string or object,
parameters required to connect to a single redis instance. If using a
plain string, it must match the same restrictions as the path
parameter described below.
input.redis.instance.path required string, a redis
URL. Check te ioredis
documentation for
details.
input.redis.instance.options optional object, connection
options as given to the ioredis library.
input.redis.cluster optional list of ClusterNode or
object, parameters required to connect to a redis cluster. If
using a list of cluster nodes, they must match the same restrictions
as the nodes parameter described below.
input.redis.cluster.nodes required list of ClusterNode, a
list of nodes to connect to initially. Check te ioredis
documentation for details.
input.redis.cluster.options optional object, connection
options as given to the ioredis library.
One of input.redis.instance or input.redis.cluster must be used.
input.redis.subscribe optional string or list of string,
the channel key or keys to subscribe to if using subscribe.
input.redis.psubscribe optional string or list of
string, the channel pattern or patterns to subscribe to if using
psubscribe.
input.redis.blpop optional string or list of string, the
key or keys to pop items from if using blpop.
input.redis.brpop optional string or list of string, the
key or keys to pop items from if using brpop.
One of the modes subscribe, psubscribe, blpop and brpop must
be used.
input.redis.wrap optional string or object, a wrapping
directive which specifies that incoming data is not encoded events,
and thus should be wrapped.
input.redis.wrap.name required string, the name given to the
events that wrap the input data.
input.redis.wrap.raw optional boolean, whether to treat
incoming data as plain text, not JSON.
All input forms and some step functions offer the option of wrapping
the captured data with the wrap option. It indicates whether the
data captured is considered to be the raw JSON-encoded data or raw
UTF-8 encoded strings and should be wrapped in events with the
specified name. If not given, captured data must be fully JSON-encoded
events.
For example, if receiving data such as {"this": "is my data"} is to
be supported, a wrapper would need to be used since the data doesn't
comply with the event
format. Moreover, if the data received
is something like this is my data (a plain UTF-8 text), then the
raw wrapping would be needed.
Step dependencies are speficied as a list of step names that provide events for the one including the dependencies.
steps.<name>.after optional list of string, names of steps
that will be run before this step, and which will feed their output
events to this step. The $input name can be used in this list to
refer to the pipeline's input.
Not specifying any dependency or leaving it as an empty list is
equivalent to the singleton list ["$input"].
Any two steps that aren't in a direct or transitive dependency relationship can process events in parallel.
In a pipeline, steps form a DAG: no cycles are allowed.
Steps can be set to filter events before processing by using patterns. Filtering can also be set to drop events entirely for any following step, or to simply skip the current step's processing but fast-forward to the following steps.
steps.<name>.match/drop optional pattern, configures the
step to drop events with names not matching the pattern
specified. Events dropped this way won't be received by steps that
follow the one containing the pattern.
steps.<name>.match/pass optional pattern, configures the
step to skip events with names not matching the pattern
specified. Events skipped this way will still be received by steps
that follow the one containing the pattern.
Any pipeline step can specify at most one of match/drop or
match/pass.
A pattern is a structure defined inductively:
- A string is a pattern that matches event names equal to it,
considering that
*can be used in a pattern as a wildcard for any word in an event name, and#can be used in a pattern as a wildcard for any sequence of words in an event name (including a zero-length sequence). Event names and string patterns can be understood as the same as RabbitMQ's binding and routing keys. - An object with an
orkey mapped to a list of pattern is a pattern, that matches if any of the patterns in the list mapped toormatches. - An object with an
andkey mapped to a list of pattern is a pattern, that matches if all of the patterns in the list mapped toandmatches. - An object with a
notkey mapped to a pattern is a pattern, that matches if the pattern mapped tonotdoesn't match.
A few examples:
steps:
foo:
# A string pattern
match/pass: "foo.#.bar.*"
# ...
bar:
# A composite pattern
match/drop:
not:
and:
- "foo.bar.*.*"
- "#.baz"
- or:
- "#.hey.*"
- "#.hi.*"
# ...
All steps in CDP operate over vectors (i.e. groups, or windows) of
events. If not configured, a step will operate over singleton
vectors. Using the window field, however, the pipeline author may
configure a step to process more than one event at a time.
steps.<name>.window optional object, contains the
specification for assembling event vectors for processing.
steps.<name>.window.events required number or string, a
maximum quantity of events to accumulate in each vector before sending
it to be processed.
steps.<name>.window.seconds required number or string, a
maximum number of seconds to wait after receiving the first event of
the vector, for the vector to "fill up". The vector will be sent for
processing after either reaching the cardinality specified in
window.events or this time interval.
When using a non-default configuration for vector construction, a
pipeline's author should consider the "main event of the vector" to be
the first one, especially when using the flatmap processing mode
(explained further below).
An example:
steps:
foo:
# Wait for 100 events or 1.5 seconds, whatever happens first.
window:
events: 100
seconds: 1.5
# ...
Vector construction is mainly a tool to control flow rate, but can also be used to compute moving aggregates over your data.
A pipeline step can be set to process event vectors in one of two ways: by operating on disjoint vectors, or by sliding through superimposed vectors. These modes of processing are called reduce and flatmap respectively.
steps.<name>.reduce object, indicates the processing
function to use in reduce mode.
steps.<name>.flatmap object, indicates the processing
function to use in flatmap mode.
One of the modes must be used.
The following example illustrates the difference between the two modes. Given the partial pipeline file:
steps:
foo:
window:
events: 3
seconds: 1
reduce:
send-stdout:
jq-expr: .
bar:
window:
events: 3
seconds: 1
flatmap:
send-stdout:
jq-expr: .
The only difference between foo and bar is the operation mode. If
receiving as input events A, B, C, D, and E, the
step foo would print to stdout two vectors: (A, B, C) and (D,
E). The step bar would print five vectors: (A, B, C), (B, C,
D), (C, D, E), (D, E) and finally (E).
In general, the use of flatmap implies much more processing load.
The step functions themselves (keyed under reduce or flatmap) come
from a fixed list of options:
steps.<name>.(reduce|flatmap).rename object, a function that
renames events it receives.
steps.<name>.(reduce|flatmap).rename.replace string, the
name that will be assigned to events going through this step.
steps.<name>.(reduce|flatmap).rename.append optional string,
a suffix to add to event names going though this step.
steps.<name>.(reduce|flatmap).rename.prepend optional
string, a prefix to add to event names going through this step.
The rename function can only be given the replace option or a
combination of the append and prepend options.
steps.<name>.(reduce|flatmap).deduplicate object or
null, a function that removes duplicate events from vectors.
steps.<name>.(reduce|flatmap).deduplicate.consider-name optional
boolean, defaults to true, indicates whether deduplication
should consider the name of events.
steps.<name>.(reduce|flatmap).deduplicate.consider-data optional
boolean, defaults to true, indicates whether deduplication
should consider the data contained in events.
steps.<name>.(reduce|flatmap).deduplicate.consider-trace
optional boolean, defaults to false, indicates whether
deduplication should consider the trace of events.
Setting all three of these to false is equivalent to using the
below-explained keep with value 1, that is, dropping all events
from each group except for the first one.
steps.<name>.(reduce|flatmap).keep number or string or
object, a function that selects the first few events from an event
vector, the number of events kept being the specified value.
steps.<name>.(reduce|flatmap).keep.first number or
string, the value indicating the maximum amount of events to keep
from the start of the event vector.
steps.<name>.(reduce|flatmap).keep.last number or
string, the value indicating the maximum amount of events to keep
from the end of the event vector.
Only one of first or last may be used. Using a value directly is
equivalent to using first.
steps.<name>.(reduce|flatmap).keep-when object, a function that
selects events from an event vector, according to whether their data
complies with the schema given. The schema should be a valid JSON
Schema object.
steps.<name>.(reduce|flatmap).send-stdout object or
null, a function that always sends forward the events in the
vectors it receives, unmodified. It also prints the events to STDOUT.
steps.<name>.(reduce|flatmap).send-stdout.jq-expr optional
string, specifies a jq filter to apply before sending events to
STDOUT.
steps.<name>.(reduce|flatmap).send-stdout.jsonnet-expr optional
string, specifies a jsonnet function code to apply before
sending events to STDOUT.
steps.<name>.(reduce|flatmap).send-file object or
string, a function that always sends forward the events in the
vectors it receives, unmodified. It also appends the events to the
specified file, which is given directly as a path or a configuration
object.
steps.<name>.(reduce|flatmap).send-file.path required
string, the path to the file that will receive events.
steps.<name>.(reduce|flatmap).send-file.jq-expr optional
string, specifies a jq filter to apply before appending events
to the specified file.
steps.<name>.(reduce|flatmap).send-file.jsonnet-expr optional
string, specifies a jsonnet function code to apply before
appending events to the specified file.
steps.<name>.(reduce|flatmap).send-http string or
object, a function that always sends forward the events in the
vectors it receives, unmodified. It also sends those vectors to the
specified HTTP target, using a POST request. If given a string, the
value is taken to be target URI to use for the event-sending request.
steps.<name>.(reduce|flatmap).send-http.target required
string, the target URI to use for the event-sending request.
steps.<name>.(reduce|flatmap).send-http.method optional
"POST" or "PUT" or "PATCH", the HTTP method to use for the
event-sending request. Defaults to "POST".
steps.<name>.(reduce|flatmap).send-http.jq-expr optional
string, an optional jq filter to apply to events before creating
the request. If this option is used, each distinct value produced by
the filter is used for a separate request. If this option is not used
(and neither jsonnet-expr), each event vector produces a request,
and the content type header of the request is forced to
application/x-ndjson.
steps.<name>.(reduce|flatmap).send-http.jsonnet-expr optional
string, an optional jsonnet function code to apply to events
before creating the request. If this option is not used (and neither
is jq-expr), each event vector produces a request, and the content
type header of the request is forced to application/x-ndjson.
steps.<name>.(reduce|flatmap).send-http.headers optional
object, additional HTTP headers to use for the request. If not
using the jq-expr option, the request content type cannot be
altered.
steps.<name>.(reduce|flatmap).send-http.concurrent optional
number or string, the maximum amount of concurrent HTTP
requests for the step. If omitted, it is set to the value of the
HTTP_CLIENT_DEFAULT_CONCURRENCY environment variable or 10.
steps.<name>.(reduce|flatmap).send-amqp string or
object, a function that always sends forward the events in the
vectors it receives, unmodified. It also sends those vectors to the
specified AMQP broker. If given a string, it will be interpreted as a
URL and all other parameters will be set to their respective defaults.
steps.<name>.(reduce|flatmap).send-amqp.url required string,
the URL of the broker to connect to.
steps.<name>.(reduce|flatmap).send-amqp.exchange optional
object, the description of the AMQP exchange to assert, in the
same shape as the amqp input form's.
steps.<name>.(reduce|flatmap).send-amqp.exchange.name optional
string, the name of the AMQP exchange to assert. If omitted, it
will default to "cdp".
steps.<name>.(reduce|flatmap).send-amqp.exchange.type optional
"direct", "fanout" or "topic", the type of AMQP exchange
to assert. If omitted, it will default to "topic".
steps.<name>.(reduce|flatmap).send-amqp.exchange.durable
optional boolean, "true" or "false", whether the exchange
should be declared as durable or not (default is true).
steps.<name>.(reduce|flatmap).send-amqp.exchange.auto-delete
optional boolean, "true" or "false", whether the exchange
should be automatically deleted once no more queues are bound to it
(default is false).
steps.<name>.(reduce|flatmap).send-amqp.exchange.routing-key
optional string, the routing key used to publish each
message. Defaults to "cdp" for both direct and topic exchange
types, and the empty string for the fanout exchange type.
steps.<name>.(reduce|flatmap).send-amqp.exchange.expiration
optional number or string, the expiration set for each
message, in milliseconds.
steps.<name>.(reduce|flatmap).send-amqp.exchange.priority
optional number or string, the priority set for each message.
steps.<name>.(reduce|flatmap).send-amqp.exchange.persistent
optional boolean, "true" or "false", whether the message
should survive broker restarts (provided the queue does too).
steps.<name>.(reduce|flatmap).send-amqp.jq-expr optional
string, an optional jq filter to apply to events before
publishing them. If this option is used, each distinct value produced
by the filter is used for a separate publish. If this option is not
used (and neither is jsonnet-expr), each event vector is published
wholly, and the content type header of the message is forced to
application/x-ndjson.
steps.<name>.(reduce|flatmap).send-amqp.jsonnet-expr optional
string, an optional jsonnet function code to apply to events
before publishing them. If this option is not used (and neither is
jq-expr), each event vector is published wholly, and the content
type header of the message is forced to application/x-ndjson.
steps.<name>.(reduce|flatmap).send-mqtt string or
object, a function that always sends forward the events in the
vectors it receives, unmodified. It also sends those vectors to the
specified MQTT broker. If given a string, it will be interpreted as a
URL and all other parameters will be set to their respective defaults.
steps.<name>.(reduce|flatmap).send-mqtt.url required string,
the URL of the broker to connect to.
steps.<name>.(reduce|flatmap).send-mqtt.options optional
object, the connection options. Check the
MQTT.js
library for details.
steps.<name>.(reduce|flatmap).send-mqtt.topic optional
string the topic to publish messages to. If omitted, the client
will publish to "cdp/pipeline name/step name".
steps.<name>.(reduce|flatmap).send-mqtt.qos optional 0,
1 or 2, the quality of service to use when publishing
messages. Defaults to 0.
steps.<name>.(reduce|flatmap).send-mqtt.jq-expr optional
string, an optional jq filter to apply to events before
publishing them. If this option is used, each distinct value produced
by the filter is used for a separate publish. If this option is not
used (and neither is jsonnet-expr, each event vector is published
wholly, and the content type header of the message is forced to
application/x-ndjson.
steps.<name>.(reduce|flatmap).send-mqtt.jsonnet-expr optional
string, an optional jsonnet function code to apply to events
before publishing them. If this option is not used (and neither is
jq-expr), each event vector is published wholly, and the content
type header of the message is forced to application/x-ndjson.
steps.<name>.(reduce|flatmap).send-redis object, a function
that always sends forward the events in the vectors it receives,
unmodified. It also sends those vectors to the specified redis
instance or cluster.
steps.<name>.(reduce|flatmap).send-redis.instance optional
string or object, parameters required to connect to a single
redis instance. If using a plain string, it must match the same
restrictions as the path parameter described below.
steps.<name>.(reduce|flatmap).send-redis.instance.path required
string, a redis URL. Check te ioredis
documentation for
details.
steps.<name>.(reduce|flatmap).send-redis.instance.options
optional object, connection options as given to the ioredis
library.
steps.<name>.(reduce|flatmap).send-redis.cluster optional list
of ClusterNode or object, parameters required to connect to a
redis cluster. If using a list of cluster nodes, they must match the
same restrictions as the nodes parameter described below.
steps.<name>.(reduce|flatmap).send-redis.cluster.nodes required
list of ClusterNode, a list of nodes to connect to
initially. Check te ioredis
documentation for details.
steps.<name>.(reduce|flatmap).send-redis.cluster.options
optional object, connection options as given to the ioredis
library.
One of instance or cluster must be used.
steps.<name>.(reduce|flatmap).send-redis.publish optional
string denoting the step function will forward events using the
PUBLISH command to a channel, specified by the value given. The
PUBLISH command is issued once for every event received.
steps.<name>.(reduce|flatmap).send-redis.rpush optional
string denoting the step function will forward events using the
RPUSH command to a key, specified by the value given. The RPUSH
command is issued once for every vector received.
steps.<name>.(reduce|flatmap).send-redis.lpush optional
string denoting the step function will forward events using the
LPUSH command to a key, specified by the value given. The LPUSH
command is issued once for every vector received.
One of publish, rpush or lpush must be used.
steps.<name>.(reduce|flatmap).send-redis.jq-expr optional
string, specifies a jq filter to apply before forwarding
events. If using a jq filter with rpush or lpush, the results of
the filter will always be mapped on to different invocations of the
corresponding command. This means that a trivial filter like . can
be used to store lists of events instead of plain events in each
element of a redis list.
steps.<name>.(reduce|flatmap).send-redis.jsonnet-expr optional
string, specifies a jsonnet function code to apply before
forwarding events.
steps.<name>.(reduce|flatmap).expose-http object, a function
that always sends forward the events in the vectors it receives,
unmodified. It also exposes those vectors in an HTTP server.
steps.<name>.(reduce|flatmap).expose-http.endpoint required
string, an URL path that will be used to expose windows. The
latest window at any given time will be accesible through this path,
and previous windows will be reachable using the URI found in the
Link
header
of the response, if present.
steps.<name>.(reduce|flatmap).expose-http.port required
number or string, the TCP port used to listen for
requests. May not be the same used by the http input form.
steps.<name>.(reduce|flatmap).expose-http.responses required
number or string, the total amount of responses to keep in
memory for exposition.
steps.<name>.(reduce|flatmap).expose-http.headers optional
object, the extra HTTP response headers to use on successful
responses. The Content-Type header can be set only if the jq-expr
option is used, else it will be set to application/x-ndjson. The
ETag and Link headers can never be changed.
steps.<name>.(reduce|flatmap).expose-http.jq-expr optional
string, an optional jq filter to apply to event windows before
creating the responses. If this option is used, each distinct value
produced by the filter is used for a separate response. For example,
if using the filter .[], then the even window is split into each
event it contains, which are each in turn kept in the buffer of
maximum size responses. It might be preferrable to use jq's map()
instead to process and keeping them in a single response.
steps.<name>.(reduce|flatmap).expose-http.jsonnet-expr optional
string, an optional jsonnet function code to apply to event
windows before creating the responses.
steps.<name>.(reduce|flatmap).send-receive-jq string or
object, a function that sends the event vector to jq for
processing, and parses its output and produces new events. If given a
string, it's used as the jq filter.
steps.<name>.(reduce|flatmap).send-receive-jq.jq-expr required
string, the jq filter to use.
steps.<name>.(reduce|flatmap).send-receive-jq.wrap optional
string or object, a wrapping directive which specifies that
incoming data from jq is not encoded events, and thus should be
wrapped. See wrapping.
steps.<name>.(reduce|flatmap).send-receive-jq.wrap.name required
string, the name given to the events that wrap the received data.
steps.<name>.(reduce|flatmap).send-receive-jq.wrap.raw optional
boolean, whether to treat received data as plain text, not JSON.
steps.<name>.(reduce|flatmap).send-receive-jsonnet string or
object, a function that sends the event vector to jsonnet for
processing, and parses its output and produces new events. If given a
string, it's used as the jsonnet function code.
steps.<name>.(reduce|flatmap).send-receive-jsonnet.jsonnet-expr
required string, the jsonnet function code to use.
steps.<name>.(reduce|flatmap).send-receive-jsonnet.wrap optional
string or object, a wrapping directive which specifies that
incoming data from jsonnet is not encoded events, and thus should be
wrapped. See wrapping.
steps.<name>.(reduce|flatmap).send-receive-jsonnet.wrap.name
required string, the name given to the events that wrap the
received data.
steps.<name>.(reduce|flatmap).send-receive-jsonnet.wrap.raw
optional boolean, whether to treat received data as plain text,
not JSON.
steps.<name>.(reduce|flatmap).send-receive-http string or
object, a function that sends event vectors to the specified HTTP
target, using a POST request. If given a string, the value is taken to
be target URI to use for the event-sending request. The response
received is parsed to be the transformed events, which will continue
the next steps in the pipeline.
steps.<name>.(reduce|flatmap).send-receive-http.target required
string, the target URI to use for the event-sending request.
steps.<name>.(reduce|flatmap).send-receive-http.method optional
"POST" or "PUT" or "PATCH", the HTTP method to use for the
event-sending request. Defaults to "POST".
steps.<name>.(reduce|flatmap).send-receive-http.jq-expr optional
string, an optional jq filter to apply to events before creating
the request. If this option is used, each distinct value produced by
the filter is used for a separate request. If this option is not used
(and neither is jsonnet-expr), each event vector produces a request,
and the content type header of the request is forced to
application/x-ndjson.
steps.<name>.(reduce|flatmap).send-receive-http.jsonnet-expr
optional string, an optional jsonnet function to apply to events
before creating the request. If this option is not used (and neither
is jq-expr), the content type header of the request is forced to
application/x-ndjson.
steps.<name>.(reduce|flatmap).send-receive-http.headers optional
object, additional HTTP headers to use for the request. If not
using the jq-expr option, the request content type cannot be
altered.
steps.<name>.(reduce|flatmap).send-receive-http.wrap optional
string or object, a wrapping directive which specifies that
incoming data from the HTTP response is not encoded events, and thus
should be wrapped. See wrapping.
steps.<name>.(reduce|flatmap).send-receive-http.wrap.name
required string, the name given to the events that wrap the
received data.
steps.<name>.(reduce|flatmap).send-receive-http.wrap.raw
optional boolean, whether to treat received data as plain text,
not JSON.
Several step processing functions have the option of using
jq as a pre-processing step
(typically under a jq-expr option). This can be used to change the
format of events ahead of time, and can also be used to communicate in
plain text formats (i.e. non-JSON). To do that, simply return string
values from your jq filters.
Note: CDP tries to protect the adjacent jq processes by wrapping
all filters with a
try form. Runtime
errors will thus be silently skipped over, so it can be very important
to always test your jq filters in controlled environments.
As an alternative to jq in most step processing functions,
jsonnet may be used instead (typically under
a jsonnet-expr option). When writing jsonnet expressions, consider
that the events will be given as top-level
arguments,
so the expression should reduce to a single function that receives an
events argument.
Opposite jq-based processing, jsonnet will emit error messages to
stderr when runtime errors are encountered. Still, it's good practice
to test your jsonnet code in a controlled environment.
Since jq is favoured as an outwards-facing filter and most step
functions support it, it's common to find repeated logic in
expressions used for various steps. To prevent code duplication, jq
functions
should be used and placed in the top-level jq-prelude section. jq
code defined there will be shared between all jq expressions, so it's
the ideal place to place functions in.
An example of usage can be found in the stress test example.
As with jq, jsonnet expressions can also be prepended with common
definitions. jsonnet-prelude provides an in-pipeline place for
placing common definitions, such as functions.
An example of usage can be found in the exposition example.
Any running instance of CDP can expose operation metrics, which can be
checked by accessing http://localhost:8001/metrics by default (the
path and port can be changed with the METRICS_EXPOSITION_PATH and
METRICS_EXPOSITION_PORT variables; set METRICS_EXPOSITION_PATH to
an empty string to disable exposition). Metrics are exposed in the
open metrics
format, so they
should be able to be scraped by a Prometheus instance without issue.
Backpressure (for CDP) is a signal emitted internally when a specific resource consumption metric reaches a threshold. The signal is interpreted as a warning that the pipeline will fail by means of resource exhaustion if it continues to receive input. Some input forms capture the backpressure signal to pause the ingestion of events, and resume ingestion once the signal is turned off.
The backpressure signal is disabled by default. To enable it, at least one of four environment variables need to be set:
BACKPRESSURE_RSS, which should hold a number representing the upper threshold of the rss metric, in bytes.BACKPRESSURE_HEAP_TOTAL, which should hold a number representing the upper threshold of the heapTotal metric in bytes.BACKPRESSURE_HEAP_USED, which should hold a number representing the upper threshold of the heapUsed metric, in bytes.BACKPRESSURE_QUEUED_EVENTS, which should hold a number representing the upper threshold of the total count of queued events, everywhere in the pipeline.
If more than one threshold is configured, the signal will be triggered
as soon as one metric surpasses its corresponding
threshold. Measurements are taken periodically according to the
BACKPRESSURE_INTERVAL environment variable, which holds the time in
seconds between two measurements. By default, the interval is set to 5
seconds.
A CDP program can be further configured with certain environment variables. These parameters can't be placed inside the pipeline file. The whole list of environment variables read and used can be found in the source.