Skip to content

Commit

Permalink
SAMZA-1154: Tasks endpoint to list the complete details of all tasks …
Browse files Browse the repository at this point in the history
…related to a job
  • Loading branch information
Shanthoosh Venkataraman authored and Xinyu Liu committed Mar 20, 2017
1 parent d399d6f commit 26280ca
Show file tree
Hide file tree
Showing 32 changed files with 1,475 additions and 256 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ title: Resource Directory
The Samza REST Service ships with the JAX-RS Resources listed below.

- [JobsResource](resources/jobs.html)
- (Second resource coming soon)
- [TasksResource](resources/tasks.html)

## [Jobs Resource »](resources/jobs.html)
## [Tasks Resource »](resources/tasks.html)
125 changes: 125 additions & 0 deletions docs/learn/documentation/versioned/rest/resources/tasks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
---
layout: page
title: Tasks Resource
---
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

This resource exposes endpoints to support operations at the tasks scope. The initial implementation includes the ability to list all the tasks for a particular job.
This is a sub-resource of the [Jobs Resource &raquo;](jobs.html) and is not intended to be used independently.

Responses of individual endpoints will vary in accordance with their functionality and scope. However, the error
messages of all of the tasks resource end points will be of the following form.

**Error Message**

Every error response will have the following structure:

{% highlight json %}
{
"message": "Unrecognized status parameter: null"
}
{% endhighlight %}
`message` is the only field in the response and contains a description of the problem.
<br>

## Get All Tasks
Lists the complete details about all the tasks for a particular job

######Request
GET /v1/jobs/{jobName}/{jobId}/tasks

######Response
Status: 200 OK

{% highlight json %}
[
{
"preferredHost" : "samza-preferredHost",
"taskName" : "Samza task",
"containerId" : "0",
"partitions" : [{
"system" : "kafka",
"stream" : "topic-name",
"partitionId" : "0"
}]
}
]
{% endhighlight %}

######Response codes
<table class="table table-condensed table-bordered table-striped">
<thead>
<tr>
<th>Status</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>200 OK</td><td>The operation completed successfully and all the tasks that for
the job are returned.</td>
</tr>
<tr>
<td>404 Not Found</td><td>Invalid job instance was provided as an argument.{% highlight json %}
{
"message": "Invalid arguments for getTasks. jobName: SamzaJobName jobId: SamzaJobId."
}
{% endhighlight %}</td>
</tr>
<tr>
<td>500 Server Error</td><td>There was an error executing the command on the server. e.g. The command timed out.{% highlight json %}
{
"message": "Timeout waiting for get all tasks."
}
{% endhighlight %}</td>
</tr>
</tbody>
</table>
<br/>

<br/>

###Design
###Abstractions
There are two primary abstractions that are required by the TasksResource that users can implement to handle any details specific to their environment.

1. **TaskProxy**: This interface is the central point of interacting with Samza tasks. It exposes a method to get all the tasks of a Samza job.
2. **InstallationFinder**: The InstallationFinder provides a generic interface to discover all the installed jobs, hiding any customizations in the job package structure and its location. The InstallationFinder also resolves the job configuration, which is used to validate and identify the job.

## Configuration
The TasksResource properties should be specified in the same file as the Samza REST configuration.

<table class="table table-condensed table-bordered table-striped">
<thead>
<tr>
<th>Name</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>task.proxy.factory.class</td><td><b>Required:</b> The TaskProxyFactory that will be used to create the TaskProxy instances. The value is a fully-qualified class name which must implement TaskProxyFactory. Samza ships with one implementation: <pre>org.apache.samza.rest.proxy.task.SamzaTaskProxy</pre> <li> gets the details of all the tasks of a job. It uses the <pre>SimpleInstallationRecord</pre> to interact with Samza jobs installed on disk.</li></td>
</tr>
<tr>
<td>job.installations.path</td><td><b>Required:</b> The file system path which contains the Samza job installations. The path must be on the same host as the Samza REST Service. Each installation must be a directory with structure conforming to the expectations of the InstallationRecord implementation used by the JobProxy.</td>
</tr>
<tr>
<td>job.config.factory.class</td><td>The config factory to use for reading Samza job configs. This is used to fetch the job.name and job.id properties for each job instance in the InstallationRecord. It's also used to validate that a particular directory within the installation path actually contains Samza jobs. If not specified <pre>org.apache.samza.config.factories.PropertiesConfigFactory</pre> will be used. </td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
package org.apache.samza.standalone;

import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.JobModelManager$;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.processor.SamzaContainerController;
import org.apache.samza.system.StreamMetadataCache;
Expand Down Expand Up @@ -65,19 +65,19 @@ public class StandaloneJobCoordinator implements JobCoordinator {
private static final Logger log = LoggerFactory.getLogger(StandaloneJobCoordinator.class);
private final int processorId;
private final Config config;
private final JobModelManager jobModelManager;
private final JobModel jobModel;
private final SamzaContainerController containerController;

@VisibleForTesting
StandaloneJobCoordinator(
int processorId,
Config config,
SamzaContainerController containerController,
JobModelManager jobModelManager) {
JobModel jobModel) {
this.processorId = processorId;
this.config = config;
this.containerController = containerController;
this.jobModelManager = jobModelManager;
this.jobModel = jobModel;
}

public StandaloneJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
Expand Down Expand Up @@ -105,7 +105,7 @@ public StandaloneJobCoordinator(int processorId, Config config, SamzaContainerCo
* TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator
* (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper)
*/
this.jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null, null);
this.jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, null);
}

@Override
Expand Down Expand Up @@ -143,6 +143,6 @@ public int getProcessorId() {

@Override
public JobModel getJobModel() {
return jobModelManager.jobModel();
return jobModel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -28,7 +29,6 @@
import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.JobModelManager$;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.processor.SamzaContainerController;
import org.apache.samza.system.StreamMetadataCache;
Expand Down Expand Up @@ -57,7 +57,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {

private JobModel newJobModel;
private String newJobModelVersion; // version published in ZK (by the leader)
private JobModelManager jobModelManager;
private JobModel jobModel;

public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, SamzaContainerController containerController) {
this.zkUtils = zkUtils;
Expand Down Expand Up @@ -201,9 +201,7 @@ private void generateNewJobModel() {
}
log.info("generate new job model: processorsIds: " + sb.toString());

jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null,
containerIds);
JobModel jobModel = jobModelManager.jobModel();
jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, containerIds);

log.info("pid=" + processorId + "Generated jobModel: " + jobModel);

Expand All @@ -218,4 +216,4 @@ private void generateNewJobModel() {
zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion);
}
}
}
Loading

0 comments on commit 26280ca

Please sign in to comment.