-
Notifications
You must be signed in to change notification settings - Fork 6
Design: Import Export
The Export operator makes a stream available for external subscription. It
accepts 3 parameters:
-
allowFilter: optional boolean; iffalseforbidImportoperators to specify a filter -
properties: a key-value pair list of properties that describe the exported stream -
streamId: a unique identifier for the exported stream
The parameters properties and streamId are mutually exclusive.
The Import operator subscribe to streams made available by Export operators.
It accepts 5 parameters:
-
subscription: expression used to matchExportoperator properties -
applicationName: the name of the application from which the stream is imported -
applicationScope: the scope of the exporting application -
streamId: the identifier of the stream to import -
filter: the filter to be applied on the exported stream
The parameters (applicationName, streamId) and subscription are mutually exclusive.
SAM performs the connection brokering between the Import and Export
operators. In effect, SAM instructs the Export operator to connect to the
Import operators that requested its stream.
SAM keeps track of all jobs and all available exporting ports. When importing
ports are processed, SAM scans exporting port for matching exporting ports.
For each match, SAM sends an update route notification to each owning PE.
Import and Export parameters can also be altered programmatically by the
application using the following functions:
int32 setInputPortImportFilterExpression(rstring filter, uint32 port)
int32 setInputPortImportSubscription(rstring subscription, uint32 port)
<tuple T> int32 setOutputPortExportProperties(T properties, uint32 port)
int32 setOutputPortExportProperties(list<tuple<rstring name, rstring value, rstring typ>> properties, uint32 port)
The route change notification and the application of the changes programmatically
defined by these functions are processed by the HostController API of the PEC.
Dynamic connections, or optional connections, may come and go. However, they are implemented using the same classes as the static connections. To handle their dynamic aspect, their connection process differs from the one use by the static connections.
When a static connection fails, the reconnection process is handled by the
TCPConnection::write() function. The write notices that the connection is
broken, attemps to reconnect, and resumes its operation once the reconnection
has succeeded.
When a dynamic connection fails, the write() goes through much of that
process, except that:
- It sends a notification to a connection helper, a separate thread that will attempt to reconnect on its behalf.
- It throws an
OptionalConnectIncompleteExceptionto let the caller know that the reconnect has failed and that it can skip the connection until the next write.
In principle this design is sound. However, its use of exceptions in the
critical path of the thread (the write path) is unwise and potentially the
reason why dynamic connection do not perform on par with static connections.
A congestion policy was added to the Export operator to address adverse
performance effects happening when some Import are slow. References:
- The documentation about the policy
- The actual implementation
There is a disconnect between the documentation, the title of the design
document and the actual implementation. Connections are not dropped when they
are congested, the data is. The wait policy add a blocking wait in the
critical write() path in the hope that the connection helper will succeed in
establishing the connection. The drop policy do not block and simply go
through throwing an exception.
Blocking in the write() end up delaying all the other importers down the line
in TCPSender::write(). Since drop does not drop the connection, the same
reconnect/fail/wait process happens for every write.
The integration of the import/export subscription system hinges around two new
custom resources: the Import resource and the Export resource. The Import
resource collects all imports available across jobs in a namespace. The Export
resource collects all exports available across jobs in a namespace.
For each new resource, there is a new controller. When the ImportController
gets an onAddition event, it will collects all existing exports in the
namespace and select the one matching the new import requirements. The
ExportController behaves similarly, collecting all matching imports for the
new export.
Sharing state changes event with PEs is a challenging problems. The solution we
explored are: 1/ exporting the changes into the job ConfigMap and monitor the
file change; 2/ use a secondary container with a local import/export controller
to proxy state changes to the PE; 3/ export a REST API to manipulate and query
optional connections.
-
Exporting the changes into the job
ConfigMapquickly became irrelevant as we learnt that import/export parameters can be updated programmatically. -
Running an import/export controller allows to proxy both changes and notifications between PEs and Kubernetes. But running a controller in a separate container for each PE in the system is too heavyweight.
-
Exporting a REST API on both the PE and the Streams controller allows both notification and parameter alterations to happen. It's also quite lightweight on both side.
-
Exporting a REST API on the Streams controller only and use loosely coupled UDP notifications as well as periodic polling to synchornize the PEs
Options 4 is a variant on option 3 with the added benefit that the controller does not need to manage connections to PEs to send synchronous REST calls. In that scenario, PEs and controller stay loosely coupled, which is a very attractive attribute.
Therefore we selected option 4. We may need to revisit that choice as we progress with our integration of the Consistent Regions. Indeed, the JCP will have to send state change events to PEs and in that case a REST API on PEs will be needed.

The controller export an API endpoint for subscriptions at /api/subscriptions.
Path entries surrounded by {} are attributes set by the client.
| GET | PUT | PATCH | DELETE |
|---|---|---|---|
/job/{jobid}/pe/{peid} |
|||
| Fetch all the subscriptions for a PE. | |||
/job/{jobid}/pe/{peid}/export/{opid} |
|||
| Fetch the description of the export. | |||
/job/{jobid}/pe/{peid}/export/{opid}/properties |
|||
| Get all properties of the export. | Replace all properties of the export. | Delete some properties of the export. | |
/job/{jobid}/pe/{peid}/export/{opid}/property/{name} |
|||
| Get a property of the export by name. | Replace a property of the export by name. | Delete a property of the export by name. | |
/job/{jobid}/pe/{peid}/import/{opid} |
|||
| Fetch the description of the import. | |||
/job/{jobid}/pe/{peid}/import/{opid}/filter |
|||
| Get the filter of the import. | Replace the filter of the import. | ||
/job/{jobid}/pe/{peid}/import/{opid}/streams |
|||
| Get the imported streams of the import. | Replace the imported streams of the import. | ||
Import and Export operators are not actual operators. They are annotations
that change the behavior of the input port and output port, respectively, of the
operator they are connected to.
Therefore, when an Import is connected to more than one operator, each
connection is reflected as a separate Import resource in Kubernetes.
Connecting multiple subscription Import to the same operator port is not
supported. It is prevented by the compiler. As for Export, the same stream
cannot be exported more than once. It is prevented by the compiler.
It is possible that the subscription service becomes unreachable if the operator
goes down. Setters, in SPLFunctions.cpp, properly handle that situation by
catching any downstream exception and returning error code 3. Getters, on the
other hand, don't.
When such a situation occur and an exception is thrown, the exception is caught
upstream of the application and the PE is terminated. We replicate that behavior
in K8SPlatform.cpp. Ideally, getters should also catch downstream exceptions
and return error code 3.