-
Notifications
You must be signed in to change notification settings - Fork 6
Design: Host Constraints
Streams applications can have host constraints which specify which hosts an operator can execute on. Host constraints are considered during fusion, so we are guaranteed that a PE will have sensical host constraints (that is, operators with mutually exclusive host constraints are not fused together). For our purposes, we will talk about a PE's host constraints, which are really the union of all of the host constraints of the operators it contains. When we refer to "a PE's host constraints," we really mean "the union of the host constraints of all of the operators inside of the PE."
We will take advantage of two beta features in Kubernetes to implement Streams
host constraints. The nodeAffinity feature introduce in
Kubernetes 1.2 allows pods to specify which nodes they can run on based on node
labels. It will replace the nodeSelector option. We will use nodeAffinity to
implement the Streams host() placement config. The podAffinity and antiAffinity
feature introduced in Kubernetes 1.4 allows pods to specify which nodes a pod
should run on based on pod labels on those nodes. We will use podAffinity to
implement the Streams hostColocation() config, and podAntiAffinity to
implement hostExlocation().
One common feature of all Streams host placement semantics is that host resolution failure results in job submission failure. That is, the semantics of Streams has always been that if the current instance cannot meet the host requirements of a submitted job, that job submission fails with an appropriate error message. For now, we are making the design decision to reproduce that behavior.
And important design decision made elsewhere is that we ensure one PE is deployed as one pod. That design decision simplifies how we handle host constraints.
Note that partitionColocation(), partitionExlocation() and partitionIsolation
are handled exactly as before as they are fundamental to the fusion process,
which we are using unchanged.
The host() config specifies a particular host that the PE must run on. There are
four variants of it, each of which we will handle differently.
Specifying a host name maps exactly to the nodeName node selection constraint.
Note that this option does not use labels, and requires that the Kubernetes node
has exactly the specified name.
Kubernetes does not support assigning pods to nodes based on IP address. But, it
does publish the IP address of each node in its description. That means that we
can do the mapping of IP address to a node name, and then assign the the PE's
pod using nodeName just as above.
Resolved Questions:
Given that IP addresses are strings, and names are strings, how will we determine when an IP address is used?
Resolution: If an operator has a
host()config, and that config is a valid IPv4 or IPv6 address according to the Apache CommonsInetAddressValidator, then we will treat it as an IP address. If no node in the Kubernetes cluster has that IP address, the job submission will not succeed.
SPL has the notion of a hostPool, which can be created on the main composite of
an application. They have no runtime mechanism; there is no part of a running
system that is the hostpool. Rather, they are an abstraction for describing
where PEs can execute.
We will define a Kubernetes Custom Resource Description (CRD), HostPool, to
represent them. The corresponding controller will be responsible for taking
hostpool-related actions, and other components (such as as the PE controller)
will look up hostpools in the store. Upon job submission, we retrieve all of the
hostpool information from the job's model. We use this information to create new
HostPool entries which go into the store and are handled by the controller.
PE's have references to these hostpools, and we look them up in the store when
we create the pod spec for those PEs.
Initially, we thought we would not need to define a CRD for hostpools. But, once it became clear that we needed a persistent notion of a hostpool that could be referenced by both jobs and PEs, it became clear that a CRD is the natural solution.
In SPL, users can create hostpool literals:
config hostPool: NamesPool = ["foo", "bar"];
Similar to the host("name") config, we can support the above by ensuring that
the current cluster has nodes with the contained names. One wrinkle is that I do
not believe the nodeName pod spec allows specifying a set of node names, but
allows only a single node name. If so, then we would have to create pod specs
which arbitrarily choose one of the allowable names.
Open Questions:
- Should we not care about a node's name at all, and just consider names the same as labels?
SPL users can also create hostpool literals using IP addresses:
config hostPool: IPsPool = ["10.8.5.6", "10.8.5.7", "10.8.5.8"];
Similar to how we handle host("10.8.5.6"), we will map the IP addresses to the
names for those nodes, and then the hostpool will be the same as if it was
specified with names.
Hostpool literals are not the common case. More prevalents are hostpools created
through the createPool() intrinsic which allows users to specify a number of
hosts, tags and exclusivity. From the documentation:
config hostPool:
P1=createPool({size=10u, tags=["ib","blue"]}, Sys.Shared), //sized, tagged, shared
P2=createPool({size=10u, tags=["ib","blue"]}, Sys.Exclusive), //sized, tagged, exclusive
P4=createPool({size=10u}, Sys.Shared), //sized, untagged, shared
P5=createPool({size=10u}, Sys.Exclusive), //sized, untagged, exclusive
P3=createPool({tags=["ib","blue"]}, Sys.Shared), //unsized, tagged, shared
P6=createPool(Sys.Shared); //unsized, untagged, shared
All of the above should be legal constructs in the Kubernetes realization of hostpools:
-
size: If provided, we need to ensure that we have access to enough nodes that meet thetagsand exclusivity requirements. -
tags: Each tag maps directly to a Kubernetes label. - exclusivity: No action needed for
Sys.Shared. ForSys.Exclusive, we need to ensure that no other Streams PEs are started on the hosts we give to the PEs in this job. We should be able to implement this with Kubernetes Taints. We will apply taints to the nodes chosen to be exclusive, and then make sure the PEs assigned to those nodes have the appropriate tolerations.
All of the above means that in the Kubernetes world, hostpools exist as labels and taints applied to nodes, and labels and tolerations passed down to PEs. Inability to create any of the hostpools that a job contains will result in job submission failure.
Open Questions:
- Need to do more investigation to ensure taints can work for our purposes.
- Should exclusivity apply to all pods, not just pods that contain Streams PEs? This may actually be easier to implement given the semantics of taints and tolerations.
- Should we just ignore size and exclusivity completely? Tags may be the only valuable concept in a Kubernetes environment.
- Applying tags and taints to nodes is a new kind of action. We have not yet made node changes. I am not sure who the appropriate actor is (do we need a node controller?).
PEs that belong to a particular hostpool will inherit that hostpool's tags as
Kubernetes labels. The PE controller will add those labels to the pod's spec,
and we will use nodeAffinity to ensure that the pod for those PEs are only
created on nodes with those labels. Kubernetes labels are key-value pairs, so
for hostpools, we will always use the key streams.ibm.com/hostpool.
The nodeAffinity feature provides two options:
requiredDuringSchedulingIgnoredDuringExecution and
preferredDuringSchedulingIgnoredDuringExecution. As implied by the names, the
first is a requirement and the second is only a preference. The names also imply
that if the node labels change at runtime, then Kubernetes will not move the
pods elsewhere. We established earlier that we want to replicate the current
Streams semantics, which means that we will use the required option. However, we
can easily imagine providing a Kubernetes-specific toggle that lets users map
hostColocation() requests to a preference instead of a requirement. Not moving
pods based on node changes during execution is also consistent with Streams
semantics. (Future releases are supposed to support an additional option,
requiredDuringSchedulingRequiredDuringExecution. Again, we could provide a
Kubernetes-specifc toggle to allow it.)
Note that of the available ways of specifying host(), specifying a particular
hostpool is the most Kubernetes-like. And of the ways of creating a hostpool,
the most Kubernetes-like is creating one that is unsized, tagged and shared.
These uses mesh with Kubernetes easily because then the PEs adopt certain
labels, and we specify those labels in the pods we deploy. Assuming that the
nodes already have the appropriate labels, Kubernetes handles the rest.
We propose that the following SPL:
composite Main {
graph
stream<uint64 num> Source = Beacon() {
config placement: host(MyPool);
}
config hostPool: MyPool = createPool({tags=["source"]}, Sys.Shared);
}
Will map to the following pod spec (with many values elided):
apiVersion: v1
kind: Pod
metadata:
labels:
app: streams
streams.ibm.com/job: parallel
streams.ibm.com/pe: parallel-0
name: parallel-0-ioyms
namespace: default
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: streams.ibm.com/hostpool
operator: In
values:
- source
containers:
- args:
- /opt/ibm/streams/system/impl/bin/streams-k8s-bootstrap
image: us.icr.io/spl_team/streams-runtime:6.releaseThe above assumes that someone already labeled some node in the cluster with the
label streams.ibm.com/hostpool=source.
If the hostpool is exclusive, then the PE controller must apply the appropriate tolerations to the pod to ensure they can run on the nodes. (No example yet.)
Specifying a particular host in a hostpool is not very Kubernetes-like, but it is better than specifying a name because we still have the freedom to decide which actual Kubernetes node should be that host. That users can index hostpools implies that we also need to maintain some ordering on the nodes that implement the hostpool.
Open Questions:
- How wo we implement the mapping? We have two options:
- Make and assign node names. (Probably not the best option.)
- Create a special label just for the chosen node, give that label to the PE. Probably better than option i, but it has the wrinkle that we need to synthesize unique node labels that don't clash with user-created node labels.
The SPL construct hostColocation is not about assigning a PE
to a particular host, but making sure that a PE is co-located on the same host
as some other PE. That is, the host itself is not important but what else is
running on that host is. The semantics are straight-forward: all PEs with exactly
matching values in their call to hostColocation() must execute on the same
host.
This concept neatly maps to the Kubernetes podAffinity feature. Its options
are the same as with nodeAffinity, and again we choose the required options.
The pod affinity options introduce another concept, that of a topologyKey. It
is meant to ditinguish "topology domains", where a domain could be something
like a node, rack or zone. See the documentation for more. When using
podAffinity, topologyKey must have some value; it cannot be empty.
Similar to nodeAffinity, hosts are specified using a key-value pair, which is
unlike hostColocation(). Because a pod may have multiple colocation
requirements, we must use a unique key for each requirement. The key will have
the prefix streams.ibm.com/hostcolocation. and the rest will be the SHA-1 hash
of the string provided to hostColocation(). We use a hash of the value because
we need to ensure that the key name is 63 characters or less (see the
Kubernetes documentation on labels for more discussion). The
value must also be 63 characters or less. We considered using the SHA-1 hash of
the original stringe as the value as well, but we wanted the label to retain
identity with what appears in the applicaiton. This does introduce an
incompatability with Streams 4.3: hostColocation() can accept any SPL
string, including strings with spaces. Such values are not valid Kubernetes
label values. Developers will have to modify such applications.
We propose that the following SPL:
stream<Type> Out = Functor(In) {
config placement: hostColocation("together");
}
Will map to the following pod spec (assuming that the PE which contains the above operator has no additional host constraints):
apiVersion: v1
kind: Pod
metadata:
labels:
app: streams
streams.ibm.com/job: parallel
streams.ibm.com/pe: parallel-0
streams.ibm.com/hostcolocation.9034FF9E2B8F00B47A44DFAF3C2A37176C101E2A: together
name: parallel-0-ioyms
namespace: default
spec:
affinity:
podAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
key: streams.ibm.com/hostcolocation.9034FF9E2B8F00B47A44DFAF3C2A37176C101E2A
operator: In
values:
- together
topologyKey: kubernetes.io/hostname
containers:
- args:
- /opt/ibm/streams/system/impl/bin/streams-k8s-bootstrap
image: us.icr.io/spl_team/streams-runtime:6.releaseNote that not only do we need to add the appropriate value (together) to our
podAffinity spec, we also need to add the appropriate label to this pod
(streams.ibm.com/hostcolocation.9034FF9E2B8F00B47A44DFAF3C2A37176C101E2A=together) to ensure that other pods with the
same podAffinity spec will match with us.
Open Questions
- The additional concept of a
topologyKeyis still unclear to me. I know the rough meaning, but I can't describe it well.- Performance impact. The documentation also warns that these features can take a long time to process, which gels with our own understanding of the problem. They warn against using them with clusters with more than several hundred nodes.
The SPL construct hostExlocation is the inverse of hostColocation(): it is
for ensuring that PEs with matching token values are placed on different
hosts. All PEs with hostExlocation("apart") must end up on different hosts.
Just as hostExlocation() is the inverse of hostColocation(), the Kubernetes
option podAntiAffinity is the inverse of podAffinity. Our use of
hostExlocation() maps to podAntiAffinity exactly. Once again, we will use
the required as opposed to preffered variant. The key we will use for the
labels is streams.ibm.com/hostexlocation.
When using the combination of podAntiAffinity and
requiredDuringSchedulingIgnoredDuringExecution, we are restricted to the value
kubernetes.io/hostname for topologyKey by an admission controller. We can
disable this admission controller, but I don't see a need to.
We propose that the following SPL:
stream<Type> Out = Functor(In) {
config placement: hostExlocation("apart");
}
Will map to the following pod spec (assuming that the PE which contains the above operator has no additional host constraints):
apiVersion: v1
kind: Pod
metadata:
labels:
app: streams
streams.ibm.com/job: parallel
streams.ibm.com/pe: parallel-0
streams.ibm.com/hostexlocation.B76ADE163D874CC5BC0F408D70CFC165667EEC5F: apart
name: parallel-0-ioyms
namespace: default
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
key: streams.ibm.com/hostexlocation.B76ADE163D874CC5BC0F408D70CFC165667EEC5F
operator: In
values:
- apart
topologyKey: kubernetes.io/hostname
containers:
- args:
- /opt/ibm/streams/system/impl/bin/streams-k8s-bootstrap
image: us.icr.io/spl_team/streams-runtime:6.releaseAgain note that we also need to add the label
streams.ibm.com/hostexlocation.B76ADE163D874CC5BC0F408D70CFC165667EEC5F=apart to this pod as well as use it in our
podAntiAffinity spec.
Open Questions
- Once again,
topologyKey. We need to more fully understand the implications of these restrictions.
The SPL construct hostIsolation assures that the PE runs on a host which
contains no other PEs from the same job. It is semantically equivalent to
creating a hostExlocation("token-X") config on an operator for every other
operator in the application, where "token-X" would become "token-0" for the
first operator pairing, "token-1" for the second operator pairing, and so on.
We will use a similar notion to implement hostIsolation in Kubernetes, but
luckily we don't need to be that extreme. The hostExlocation() constraint in
SPL is symmetric and transitive. It is symmetric because in order for operator
A to be exlocated from operator B, both must explicitly state they are
exlocated from each other using the same token. It is transitive since if
operator A is exlocated from operator B using token foo, and operator A
is exlocated from operator C also using token foo, operator B and C will
also be exlocated from each other. This is why in order to recreate
hostIsolation constraints with hostExlocation(), we must generate unique
tokens for each pairing. (Note that this is an illustrative exercise to
understand the logical consequences of these mechanisms, not reccomendations.)
The podAntiAffinity feature is not symmetric: pod A can specify that it is
anti-affinity to pod B, but pod B does not need to specify it is
anti-affinity to pod A. Because the relationship is not symmetric, we avoid
the transitivity. We can take advantage of this fact, and of the semantic
equivalence of hostIsolation and careful hostExlocation() pairings, to
ensure host isolation through creating just one new label.
Specifically, for each instance of hostIsolation in a job, we must create a
unique label with the key-base streams.ibm.com/hostisolation. The remainder of
the key will be the SHA-1 hash of the operator name with the hostIsolation
config. The value of the label will be a sanitized version of same operator name
(the sanitization must enforce the Kubernetes label value restrictions). Then we
must apply that label to each pod in the job, excluding the pod which we want to
isolate. Then we specify a podAntiAffinity on the pod we want to isolate
using that unique label.
Assume the following SPL application:
composite Main {
graph
stream<uint64 num> Source = Beacon() {
config placement: partitionIsolation;
}
stream<uint64 num> Work = Functor(Source) {
config placement: partitionIsolation;
}
() as Sink = Custom(Double) {
config placement: partitionIsolation,
hostIsolation;
}
}
Because of the partitionIsolation configs on each operator, there will be three
PEs. Let's name these PEs after the only operators they contain: Source,
Work and Sink. Because Sink has hostIsolation, it must run on a node by itself
(although we don't care which).
The Source pod spec:
apiVersion: v1
kind: Pod
metadata:
labels:
app: streams
streams.ibm.com/job: ex
streams.ibm.com/pe: parallel-0
streams.ibm.com/hostisolation.E53E8D5300C878019A997D4CFB7201C7ED2EE003: Sink
name: parallel-0-fjcq
namespace: default
spec:
containers:
- args:
- /opt/ibm/streams/system/impl/bin/streams-k8s-bootstrap
image: us.icr.io/spl_team/streams-runtime:6.releaseThe Work pod spec:
apiVersion: v1
kind: Pod
metadata:
labels:
app: streams
streams.ibm.com/job: ex
streams.ibm.com/pe: parallel-1
streams.ibm.com/hostisolation.E53E8D5300C878019A997D4CFB7201C7ED2EE003: Sink
name: parallel-1-fdpf
namespace: default
spec:
containers:
- args:
- /opt/ibm/streams/system/impl/bin/streams-k8s-bootstrap
image: us.icr.io/spl_team/streams-runtime:6.releaseNote that aside from their PE names, the pod specs are the same. Also note that
they have the label streams.ibm.com/hostisolation.E53E8D5300C878019A997D4CFB7201C7ED2EE003=Sink.
The Sink pod spec:
apiVersion: v1
kind: Pod
metadata:
labels:
app: streams
streams.ibm.com/job: ex
streams.ibm.com/pe: parallel-2
name: parallel-2-xnvy
namespace: default
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
key: streams.ibm.com/hostisolation.E53E8D5300C878019A997D4CFB7201C7ED2EE003
operator: In
values:
- Sink
topologyKey: kubernetes.io/hostname
containers:
- args:
- /opt/ibm/streams/system/impl/bin/streams-k8s-bootstrap
image: us.icr.io/spl_team/streams-runtime:6.el7.x86The podAntiAffinity spec in Sink ensures that it will not end up on a node
that contains either Source or Work because the pods containing those PEs
have the label streams.ibm.com/hostisolation.E53E8D5300C878019A997D4CFB7201C7ED2EE003=Sink.