-
Notifications
You must be signed in to change notification settings - Fork 892
Channel Configuration
Channels are specified via an Aeron URI provided when adding a publication or subscription. The Aeron URI has the following format:
aeron-uri = ["aeron-spy:"] "aeron:" media [ "?" param *( "|" param ) ]
media = *( "[^?:]" )
param = key "=" value
key = *( "[^=]" )
value = *( "[^|]" )
To help with building a channel URI string then use ChannelUriStringBuilder for Java or ChannelUriStringBuilder for C++, URIs can also be parsed with ChannelUri for Java and ChannelUri for C++.
The following parameters are available on all media transports when applied to a Publication:
-
alias - (optional) Reserved param which can contain a user provided string to identify a channel to for an application.
-
term-length - (optional) The term length to be applied to the log when adding a Publication. This parameter overrides the
aeron.term.buffer.length
oraeron.ipc.term.buffer.length
system properties. -
sparse - (optional) Boolean value to indicate if a sparse file should be used to back the log buffers. This parameter overrides the
aeron.term.buffer.sparse.file
system property. -
session-id - (optional) 32-bit signed integer value that is assigned for the session id to override the default generation for publications, or to restrict the scope of a subscription to a single session. Session ids additionally support tags to select a configuration from another entity using the
session-id=tag:<tag>
syntax. For example, a session specific subscription could tag reference a specific tagged publication.
The following 3 parameters can be applied as a group to set an initial position on an Exclusive Publication for replay purposes:
- init-term-id - (optional) The initial term id which can be applied to an Exclusive Publication.
- term-id - (optional) The current term id which can be applied to an Exclusive Publication.
- term-offset - (optional) The current term offset from the base of the term buffer which can be applied to an Exclusive Publication.
Subscriptions can be tethered for how they participate in local flow control.
-
tether - (optional) Boolean value to indicate if a subscription should be a permanent tether for local flow control.
-
untethered-window-limit-timeout - (optional, since 1.45.0) a duration in nanoseconds that sets the timeout for how long an untethered subscription that is outside the window will participate in local flow control. This overrides the
aeron.untethered.window.limit.timeout
media driver property. -
untethered-resting-timeout - (optional, since 1.45.0) a duration in nanoseconds that sets the timeout for how long an untethered subscription is resting after not being able to keep up before it is allowed to rejoin a stream. This overrides the
aeron.untethered.resting.timeout
media driver property.
If shared memory is to be used for communications on the same machine between threads or processes then the IPC media value can be applied. This is the highest throughput and lowest latency means of communicating between two process or threads on the same machine.
aeron:ipc
The Aeron URI uses a media value of udp
to determine that messages will be sent via the UDP protocol. To distinguish between Unicast and Multicast, the UdpChannel
will look at the specific parameters supplied and infer the appropriate choice.
Many of the values options are network addresses, in this case they can take for the form of a host name, IPv4, or IPv6 address enclosed in square brackets.
Both Unicast and Multicast channel configuration uses the same set of properties. Distinguishing between unicast and multicast is done by examining the endpoint address of the channel. The important configuration options are:
-
endpoint - (required unless MDC or MDS) The socket address to which publications will send messages and from which subscriptions will receive messages. This takes the
<host>:<port>
format, where both thehost
andport
are required. Theport
value can be a wildcard (0
), see section on wildcard ports below. When the endpoint is multicast it is important to also specify theinterface
. In the case of unicast the address will be the host on which the subscription is located. -
interface - (optional) The socket address that identifies that local interface that will be used for receiving messages (and sending messages in the multicast case). The value can take the form
<host>[:<port>][/<subnet mask>]
. When specified the system will scan the local interfaces for one that matches the specified pattern. The default value for this field is0.0.0.0:0/0
(or[::]:0/0
for IPv6). With a unicast configuration this will simply map to any local address and will listen for messages from all local network interfaces. This setting will only impact the behaviour of subscriptions for multicast, unicast publications will effectively ignore this value. With a multicast configuration this will scan all of the local network interfaces and will use the first non-loopback network interface that supports multicast, falling back to the local loopback if none are found. -
control - (optional) Multi-Destination-Cast (MDC) control address to be used for dynamically allocating new destination streams. The value will be an IP address and port, e.g.
192.168.0.1:40456
. When set for the publication with MDC thencontrol-mode
should also be set. -
control-mode - (optional) Valid value is one of the following:
-
manual
- if using thePublication.addDestination(String)
/Publication.removeDestination(String)
API. -
dynamic
- if initiated by connections to the control address. -
response
- used for response channel configuration.
-
-
response-correlation-id - (optional, since 1.44.0) The registration id of the resource (e.g.
Subscription
orImage
) linked to Response Channels. -
ttl - (optional) The TTL used for multicast traffic. Only used when the endpoint specified is a multicast address. This value is passed directly as the
IP_MULTICAST_TTL
and should be within the range0 - 255
. A value outside this range will generate an exception set as the socket option. -
mtu - (optional) The MTU is the Maximum Transmission Unit which sets the length limit for a UDP datagram payload on this stream. This overrides the
aeron.mtu.length
property for the driver when adding a new Publication. -
reliable - (optional) The reliable property when applied to a Subscription disables the NAK'ing of lost packets on the stream. The lost packets are gap filled so the stream can progress. Valid values are
true
andfalse
with the default beingtrue
. -
linger - (optional) Timeout in nanoseconds a network publication should linger around to service NAKs. This parameter overrides the
aeron.publication.linger.timeout
system property. -
tags - (optional) Identity and references for associating channels in simplifying configuration. See tags for more details.
-
eos - (optional) Indicates if a publication should send an End Of Stream (EOS) flag in its heartbeat messages when closed. Valid values are
true
andfalse
with the default beingtrue
. -
group - (optional) Indicates if a subscription should behave as part of a receiver group such as with multicast. This will impact NAK behaviour and related semantics. It will default to
true
for multicast andfalse
for unicast. For a subscriber to MDC unicast stream it will default totrue
as a result of being flagged by the originating publication that is part of MDC group. If the MDC subscription is going to be the only one within a group, then this should be set tofalse
. -
rejoin - (optional) Indicates if an image should rejoin a stream if it gets disconnected due to timeout or falling behind. Defaults to
true
. -
cc - (optional) Indicates what congestion control algorithm should be applied. Default to
static
which is static window for just flow control. Can be set tocubic
for the Cubic congestion control algorithm. Cubic is recommend over a WAN or congested networks. -
fc - (optional) Indicates what flow control algorithm should be used on the Publication side and the configuration for it. See Flow Control Configuration
-
gtag - (optional) Used to include a receiver in a specified group for tagged flow control. See Flow Control Configuration
-
ssc - (optional) Used on a publication to indicate is a spy subscription being present simulates a connection even if there are no receivers. Note that this can override a min groups size with
min
ortagged
flow control is configured. -
media-rcv-ts-offset - (optional) An offset within a incoming message to store a timestamp captured at the closest point to the networking hardware. The special value of "reserved" can be used to store the value in the message's reserved field. Only supported on the C media driver, will error with the Java Driver. See Timestamps.
-
channel-rcv-ts-offset - (optional) An offset within a incoming message to store a timestamp when received by receive channel endpoint within Aeron. The special value of "reserved" can be used to store the value in the message's reserved field. See Timestamps.
-
channel-snd-ts-offset - (optional) An offset within a outgoing message to store a timestamp captured by the send channel endpoint. The special value of "reserved" can be used to store the value in the message's reserved field. See Timestamps.
-
nak-delay - (optional, since 1.44.0) a duration in nanoseconds that sets the amount of time between the initial detection of a missing message and the sending of the nak for that range of data. This value overrides
aeron.nak.unicast.delay
for unicast streams.
Simple Unicast, with any local address used for binding subscriptions:
aeron:udp?endpoint=192.168.0.1:40456
Unicast with a specific address to receive messages:
aeron:udp?endpoint=192.168.0.1:40456|interface=192.168.0.3
Unicast with a search pattern to identify an address to receive messages. This is especially useful if you have a large number of machines on the same network and want to reuse the same configuration string across all of them.
aeron:udp?endpoint=192.168.0.1:40456|interface=192.168.0.0/24
Use hostnames instead of addresses:
aeron:udp?endpoint=localhost:40456
Or use IPv6:
aeron:udp?endpoint=[::1]:1234|interface=[::1]
Simple multicast:
aeron:udp?endpoint=224.0.1.1:40456|interface=192.168.0.3
It is also possible to specify an IPv6 address, but the IPv6 address portion needs to placed inside square brackets []
:
aeron:udp?endpoint=[ff02::1]:40456
To specify which interface to use, add the interface parameter. It will find the interface with that matches whose bound address matches the one specified. For an interface config, such as the following:
en0: flags=8863<UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST> mtu 1500
ether 04:0c:ce:e3:c8:c0
inet6 fe80::60c:ceff:fee3:c8c0%en0 prefixlen 64 scopeid 0x4
inet 192.168.1.4 netmask 0xffffff00 broadcast 192.168.1.255
nd6 options=1<PERFORMNUD>
media: autoselect
status: active
Then the following configuration will ensure that the 'en0' interface is used.
aeron:udp?endpoint=224.0.1.1:40456|interface=192.168.1.4
Or similarly for IPv6.
aeron:udp?endpoint=[ff02::1]:40456|interface=[fe80::60c:ceff:fee3:c8c0]
However, having to specify a full IP for the interface can be difficult. Especially if you want to share the configuration throughout a number of machines in the same network. Therefore with the interface specification you specify a subnet mask to allow Aeron to search for an appropriate interface. The following will find an interface on the 192.168.1.x network. If there are multiple interfaces that match the criteria, the result will be undefined. The first interface returned from NetworkInterface.getNetworkInterfaces()
that matches the IP address pattern and supports multicast will be used.
aeron:udp?endpoint=224.0.1.1:40456|interface=192.168.1.0/24
Specifically the /24
means that the first 24 bits of the IP address will be used when comparing against the IP addresses on the interface. Similarly for IPv6.
aeron:udp?endpoint=[ff02::1]:40456|interface=[fe80::60c:ceff:fee3]/88
Hostnames can be used instead of IP addresses for both the group and interface parameters.
aeron:udp?endpoint=all-systems.mcast.net:40456|interface=localhost
Simple multicast with specified TTL of 16.
aeron:udp?endpoint=224.0.1.1:40456|interface=192.168.0.3|ttl=16
Multi-Destination-Cast dynamic publication using control port of 40456.
aeron:udp?control=localhost:40456
Multi-Destination-Cast dynamic subscription using local endpoint port of 40457 and control port at 192.168.0.1 and port 40456.
aeron:udp?endpoint=localhost:40457|control=192.168.0.1:40456|control-mode=dynamic
Multi-Destination-Cast manual publication using control port of 40456.
aeron:udp?control=localhost:40456|control-mode=manual
If you wish to subscribe to a local Publication without receiving the packets back in the network interface, for which they are being sent out, then this is possible by spying on a network publication as an optimisation. A spy Subscription will receive the messages over IPC without a copy.
aeron-spy:aeron:udp?endpoint=224.20.30.39:54326
aeron-spy:aeron:udp?endpoint=224.0.1.1:40456|interface=192.168.1.0/24|session-id=tag:7
Note:
- Spy subscriptions are not applicable to IPC media.
- Spy subscriptions do not affect the Publication connected status by default. They can simulate a connected publication if the
aeron.spies.simulate.connection
property is set, in which case the publication can progress if it only has spies. - Spy subscriptions apply back-pressure like normal Subscriptions.
- Spy subscriptions override the min group size for
min
andtagged
flow control strategies.
Flow control parameters take the following form:
fc = strategy ?( group ) ?( timeout )
strategy = "max" | "min" | "tagged"
group = ",g:" ( group_tag | ( ?( group_tag ) "/" ( group_min_size ) )
group_tag = ?( "-" ) [0-9]+
group_min_size = [0-9]+
timeout = ",t:" ( [0-9]+ ) ?( "s" | "ns" | "us" | "ms" )
The first part of the parameter specifies the flow control strategy to use and is limited to the built in strategies. If you want the URI to configure a custom strategy, you will need to handle that in your own flow control supplier.
The group
can include a group_tag
(signed 64 bit integer) and group_min_size
(unsigned 31 bit integer). The group_tag
identifies which receivers are included in the group for the tagged flow control strategy. The group_min_size
is the minimum number of receivers required for the flow control strategy to indicate that the Publication
should be connected.
The timeout
specifies how long the flow control strategy should wait before removing receivers that have not sent status messages from the list of valid receivers. An optional time unit is allowed, if not time unit is nanoseconds.
Some of the parameters are not relevant for the different flow control strategies, however as long the parameters are properly formed, the URI won't be rejected it will just ignore the ones which don't apply. This is to allow the additional parameters in the future.
Examples:
aeron:udp?endpoint=224.20.30.39:24326|fc=max
aeron:udp?endpoint=224.20.30.39:24326|fc=min
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged
When specifying the flow control strategy you can use default values or those specified in the context for group parameters and timeouts.
aeron:udp?endpoint=224.20.30.39:24326|fc=min,g:/3,t:2s
aeron:udp?endpoint=224.20.30.39:24326|fc=min,g:/3,t:2000ms
aeron:udp?endpoint=224.20.30.39:24326|fc=min,g:/3,t:2000000000
aeron:udp?endpoint=224.20.30.39:24326|fc=min,g:1001/3,t:2000000000
All of the preceding are valid and set the min strategy with a minimum of 3 receivers for connectivity and timeout of 2 seconds. In the final case the group id is specified but ignored.
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,g:1001/3,t:2s
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,g:1001,t:2s
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,g:/3,t:2s
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,t:2s
The above use the tagged strategy. Where one of the parameters are not specified the value will be taken from the media driver context. When using the tagged strategy the receiver on the Subscription side will need to indicate that it is part of the group. This can be done either by a media driver parameter or via the gtag
parameter on the URI. For example, a subscription to be included in the 1001
group then the following could be used:
aeron:udp?endpoint=224.20.30.39:24326|gtag=1001
The following are invalid and will be rejected:
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,g:1001/,t:2s
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,g:/,t:2s
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,g:,t:2s
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,t:2s
At times it is possible that we don't want to pre-assign ports for the endpoint or control address. The common usecase for this is where we want to create a subscription that has a transient lifetime. For example, a channel provided to another service in order to receive messages, then be closed down later.
The simplest way to do this is to make use of the wildcard port (0
) and let the operating system handle the port assignment. Aeron supports using wildcard ports and making the assigned ports available via the client from version 1.28.0
.
Firstly declare the URI and create a subscription to it.
final String wildcardUriA = "aeron:udp?tags=1001|endpoint=192.168.10.1:0";
final Subscription subscriptionA1 = client.addSubscription(wildcardUriA);
We recommend using tags with wildcard ports so that further subscriptions to the same channel are possible. Therefore if you want to create a second subscription to the same endpoint use the tag. If you need a separate channel with a different port use a different tag.
final String tagUriA = "aeron:udp?tags=1001";
final Subscription subscriptionA2 = client.addSubscription(tagUriA, STREAM_ID);
final String wildcardUriB = "aeron:udp?tags=1002|endpoint=192.168.10.1:0";
final Subscription subscriptionB = client.addSubscription(wildcardUriB, STREAM_ID);
Tags should also be used when using wildcard control ports for publications and spies.
final String publicationUri = "aeron:udp?control=localhost:0|control-mode=dynamic|ssc=true|tags=1003";
final Publication publication = client.addPublication(publicationUri, STREAM_ID);
final String spyUri = "aeron:udp?tags=1003";
final Subscription spy = client.addSubscription(spyUri, STREAM_ID);
Once the subscription is established and bound, the list of local bound socket addresses is available via the Subscription.localSocketAddresses()
method. In the case of MDS it is possible to have multiple destinations on the same subscription returned as a list. If the socket has not been bound, as there is a possible delay between creating a subscription and it being bound, then the list will be empty. The result will be a list of strings formatted in the same manner as an IP address that is used in an Aeron URI, so they can be used directly in a URI for a publication.
List<String> localSocketAddresses
while ((localSocketAddresses = subscriptionA1.localSocketAddresses()).isEmpty())
{
Thread.yield();
// Check for timeout...
}
// Assuming a simple unicast subscription
final String uri = new ChannelUriStringBuilder()
.media("udp")
.endpoint(localSocketAddresses.get(0))
.build();
final Publication publication = client.addPublication(uri, STREAM_ID);
The same approach can be used when setting up explicit control addresses for publications. Use Publication.localSocketAddresses()
to get the local socket address for the control. This returns a list for symmetry with the subscription, but as of version 1.28.0
it will at most have 1 entry.
Some of the parameters on a channel reference a length of time. For convenience these can be express with a suffix to represent units. The following suffixes are supported:
Suffix | Units | Example |
---|---|---|
s | seconds | 10s |
ms | milliseconds | 10ms |
us | microseconds | 10us |
ns | nanoseconds | 10ns |
Aeron supports the capturing of timestamps within the system - 3 points within the C driver (media-rcv-ts-offset
, channel-rcv-ts-offset
, channel-snd-ts-offset
), 2 within the Java Driver (channel-rcv-ts-offset
, channel-snd-ts-offset
). These can be used to measure queuing between different parts of the system. Both drivers support capturing timestamps as the message moves through the channel endpoints. On Linux the C driver supports capture packet level timestamps. Linux has broad support for packet timestamps. Aeron uses SO_TIMESTAMPNS
and SOF_TIMESTAMPING_RX_HARDWARE
, which generate nanosecond precision timestamps using network card hardware when available and will fall back to software timestamps otherwise.
If the machine's network card's supports packet timestamping, then this will need to be enabled on the system. This is a privileged operation, so cannot be set up automatically by Aeron. Look at the hwstamp_ctl
program provided the linuxptp
(precision time protocol) package as a means to do this.
The easiest way to make the timestamp visible to the user was to store them in the incoming packet, so all of the configuration options take an offset from the beginning of the message. If more than one timestamp is required, then space will need to be made within the application message for the value to be stored. Care is required as this may overwrite application data. The special value of reserved
can also be used, this will store the timestamp within the reserved field. There is only one reserved field available on the message, so it can only be used for a single timestamp. Timestamps are a little-endian 64-bit integer which encodes the number of nanoseconds since the epoch 1 Jan 1970.
Note that the timestamp will only be applied to the first fragment if multiple fragments are received in a single batch. Client code that uses the timestamps will need to cater for this, i.e. if the timestamp is not set, then it should assume that the previous timestamp received is the one to use.
Timestamps are not supported on IPC channels as the driver is not in the message path. A sending timestamp can be added at time of publication to the reserved field by providing a ReservedValueSupplier as an argument to the offer methods or by setting BufferClaim.reservedValue
on the claim before committing.
Using media driver timestamps:
// Create a URI that will enable timestamping.
// aeron:udp?endpoint=localhost:9090|media-rcv-ts-offset=reserved|channel-rcv-ts-offset=0|channel-snd-ts-offset=8
final String uri = new ChannelUriStringBuilder()
.media("udp")
.endpoint("localhost")
.mediaReceiveTimestampOffset("reserved") // Store the media timestamp in the reserved field (generated by NIC if available)
.channelReceiveTimestampOffset("0") // Store the channel receive timestamp at offset 0
.channelSendTimestampOffset("8") // Store the channel send timestamp at offset 8
.build();
// Create publication and subscription as normal. Technically the publication only requires channel-snd-ts-offset and the
// subscription media-rcv-ts-offset and channel-rcv-ts-offset. However, it is often easier to use the same URI across both for simplicity.
final Subscription sub = aeron.addSubscription(uri, 10000);
final Publication pub = aeron.addPublication(uri, 10000);
final ExpandableArrayBuffer message = new ExpandableArrayBuffer(64);
final int messageStartOffset = 16; // Allow padding in message for the timestamp fields.
message.putLong(0, 0); // Optional, use 0 as a sentinel value to check if the timestamp has been set.
message.putLong(8, 0);
final int messageLength = message.putStringAscii(messageStartOffset, "Hello World");
final int totalLength = messageStartOffset + messageLength;
pub.offer(message, 0, totalLength);
// Timestamps on messages can be read on the subscription side. However, when messages are batched into a single MTU only the
// first message of the batch will contain the timestamp values, so using the previously received values is the best option.
final class Poller implements FragmentHandler
{
private long lastMediaRcvTimestamp = 0;
private long lastChannelRcvTimestamp = 0;
private long lastChannelSndTimestamp = 0;
public void onFragment(final DirectBuffer buffer, final int offset, final int length, final Header header)
{
long mediaRcvTimestamp = lastMediaRcvTimestamp;
if (0 != header.reservedValue())
{
mediaRcvTimestamp = header.reservedValue();
lastMediaRcvTimestamp = mediaRcvTimestamp;
}
long channelRcvTimestamp = lastChannelRcvTimestamp;
if (0 != buffer.getLong(offset))
{
channelRcvTimestamp = buffer.getLong(offset);
lastChannelRcvTimestamp = channelRcvTimestamp;
}
long channelSndTimestamp = lastChannelSndTimestamp;
if (0 != buffer.getLong(offset + 8))
{
channelSndTimestamp = buffer.getLong(offset + 8);
lastChannelSndTimestamp = channelSndTimestamp;
}
final String message = buffer.getStringAscii(offset + 16);
// Do something with timestamps
}
}