This document traces through what happens when a user submits a job.
Since jobs are Kubernetes CRDs, users submit jobs by asking Kubernetes to create new job CRDs. We will use the following sample application:
Note that this is in a separate repo from the Knative Streams code; it's a small repo with sample applications to demo various capabilities as we develop them. What's particularly relevant for us is the YAML file in that directory. We submit this job through the command:
The format of the YAML that users write to deploy a job is determined by the
JobSpec
:
The actual CRD is defined as the Job
class:
As a rule, all CRDs have a corresponding spec, which means that all other CRDs
in our implementation follow the same pattern. (This pattern is not actually
part of the CustomResoure
interface, but it is a widely adopted pattern
throughout the Kubernetes ecosystem.)
Users submit the job by applying the particular job YAML to their Kubernetes cluster:
kubectl apply -f apps.parallel/parallel.yaml
This command asks Kubernetes to create a new job CRD with the listed specification. Kubernetes does so, places that new job CRD in the job store, and then hands off control to the job controller.
The part of the job controller that responds to newly created jobs is the method
JobController.onAddition()
:
Some notable steps during this process:
-
Compare the timestamp of the event to the timestamp for when this
JobController
was created. After recovering from a catestrophic failure, Kubernetes replays all events. How to handle this depends on the controller. In the case of jobs, we need to catch up with the current job ID, but we don't need to recreate any jobs; they will be restored in the job store. -
Retrieve the job's ADL from the SAB. The user must have stored the SAB in the external repository first. At the moment, we are using Redis for this SAB store.
-
Create the logical model and model version of the job. We will dive into this process in the next section.
-
Retrieve meta-information about the job, such as whether it has any consistent regions, imports of exports and find all of the PE ports. Each one of these Streams concepts maps to either a Streams CRD or an existing Kubernetes construct.
-
Create the Kubernetes resources for the job.
Each Streams concepts retrieved in step 4 and created in step 5 could be its own deep-dive. But since this deep-dive is focused on job submission, we will go into PE creation, as all jobs have at least one PE.
The classes in the com.ibm.streams.controller.instance.sam
package are the
glue between the Kubernetes-aware code and the SAM code:
com.ibm.streams.controller/src/com/ibm/streams/controller/instance/sam
As most of the code that we need is related to the job-submission pipeline, the
primary class is called Pipeline
:
This class is where the Kubernetes controllers ask the SAM code to create new
Streams entities (such as LogicalModel
s), or to make queries about those
entities (such as if the job has any exports). At the moment, we're concerned
with two processes:
-
Create the
LogicalModel
given an ADL. Note that we create a newObjectTracker
on every call. Since we no longer rely on theObjectTracker
to store objects in ZooKeeper, this is safe for us to do. -
Create the model
Job
given aLogicalModel
. This method generates aTopologyApplication
as a part of the process, and initiates fusion.
The JobController
is responsible for creating all of the PEs. It must first
create a ProcessingElementSpec
for each PE, and then ask the PE factory to
actually create each individual PE:
-
Sort the PEs by most-restrictive placement spec first. When we eventually ask Kubernetes to schedule the pods behind these PEs, we want it to first handle the pods with the most restrictive requirements.
-
Create the PE CRD through the PE factory.
The ProcessingElementFactory
is responsible for creating an individual PE:
com.ibm.streams.controllerl.crds.pes.ProcessingElementFactory
There are three main steps:
-
Establish an owner reference. Because we establish that the
Job
CRD is the owner of thisProcessingElement
CRD, when a user removes the job, Kubernetes automatically removes the PE as well. -
Create the labels. These are the labels that will go on the PE's pod, visible to users.
-
Instantiate the
ProcessingElement
object and hand it off to Kubernetes through thecreateOrReplace()
method. The key difference between object factories in the Kubernetes ecosystem and in the standard pattern is that here factories do not actually return the newly created CRD. Instead, they pass it to Kubernetes through thecreateOrReplace()
method. That makes sure that the CRD is placed in the correct store, and the corresponding controller receives a notification. There is a parallel betweencreateOrReplace()
andkubectl apply
: they are both ways to create new Kubernetes objects.
Soon after this point, the PE CRD will exist in the PE store, and Kubernetes
will notify the ProcessingElementController
that there is a new PE.
The method ProcessingElementController.onAddition()
receives the event that a
new PE CRD was created:
com.ibm.streams.controller.crds.pes.instance.ProcessingElementController
It is the PE controller that enforces that one PE becomes one pod. It creates a
spec for that pod, and then creates it through PodFactory.addPod()
. Creating
the pod spec is involved enough that we created a separate class for it:
com.ibm.streams.controller.crds.pes.instance.ProcessingElementPodSpecBuilder
The ProcessingElementPodSpecBuilder
class is responsible for mapping PE
concepts into pod concepts.
After this point, the pod is deployed to Kubernetes. However, we still have a
PodController
which reacts to a PE's pod events.
When a PE's pod is first deployed, the PE PodController
does not have much
work to do:
The PE PodController
has more work to do in the case of modification and deletion.