Skip to content

Commit

Permalink
Add MapReduceJobState adapted from Ambrose to pigvisualizer. Fix build.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidzchen committed Nov 7, 2013
1 parent 03584c5 commit 5054f88
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 14 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
dist/
azkaban-hadoopsecuritymanager-*.jar
azkaban-pigvisualizer-*.jar
24 changes: 13 additions & 11 deletions plugins/jobtype/build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,24 @@
<delete>
<fileset file="${lib.dir}/azkaban-hadoopsecuritymanager-*.jar">
</fileset>
<fileset file="${lib.dir}/azkaban-pigvisualizer-*.jar">
</fileset>
</delete>
</target>

<target name="build" description="Compile main source tree java files">
<!-- copy the latest securitymanager jar -->
<delete>
<fileset file="${lib.dir}/azkaban-hadoopsecuritymanager-*.jar">
</fileset>
<fileset file="${lib.dir}/azkaban-pigvisualizer-*.jar">
</fileset>
</delete>
<!-- copy the latest securitymanager jar -->
<copy todir="${lib.dir}">
<fileset file="${hadoopsecuritymanagerjar}">
</fileset>
</copy>
<!-- copy the latest pigvisualizer jar -->
<delete>
<fileset file="${lib.dir}/azkaban-pigvisualizer-*.jar">
</fileset>
</delete>
<copy todir="${lib.dir}">
<fileset file="${pigvisualizerjar}">
</fileset>
Expand Down Expand Up @@ -101,11 +101,11 @@

<!-- Build jobtypes directory-->
<copy todir="${dist.packages.dir}" >
<fileset dir="${jobtypes.dir}" >
<fileset dir="${jobtypes.dir}" >
</fileset>
</copy>

<!-- Copy jobtype jar-->
<!-- Copy jobtype jar-->
<copy file="${azkaban-jobtype-jar}" todir="${dist.packages.dir}/java" />
<copy file="${azkaban-jobtype-jar}" todir="${dist.packages.dir}/hadoopJava" />
<copy file="${azkaban-jobtype-jar}" todir="${dist.packages.dir}/pig-0.9.2" />
Expand All @@ -122,15 +122,17 @@
<copy file="${hadoopsecuritymanagerjar}" todir="${dist.packages.dir}/pig-0.10.1" />
<copy file="${hadoopsecuritymanagerjar}" todir="${dist.packages.dir}/pig-0.11.0" />
<copy file="${hadoopsecuritymanagerjar}" todir="${dist.packages.dir}/hive-0.8.1" />

<!-- Copy pigvisualizerjar to pig-0.11.0 jar -->
<copy file="${pigvisualizerjar}" todir="${dist.packages.dir}/pig-0.11.0/lib" />

<!-- Tarball it -->
<tar destfile="${dist.packages.dir}/${name}-${version}.tar.gz" compression="gzip" longfile="gnu">
<!-- Tarball it -->
<tar destfile="${dist.packages.dir}/${name}-${version}.tar.gz" compression="gzip" longfile="gnu">
<tarfileset dir="${dist.packages.dir}" prefix="${name}-${version}" filemode="755" />
</tar>
</target>

<target name="package" depends="package-jobtype" description="Create all packages">
<target name="package" depends="package-jobtype" description="Create all packages">
</target>


</project>
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ public Object toJson() {
jsonObj.put("features", Arrays.asList(features));
jsonObj.put("parents", parents);
jsonObj.put("successors", successors);
jsonObj.put("jobConfiguration", propertiesToJson(jobConfiguration));
if (jobConfiguration != null) {
jsonObj.put("jobConfiguration", propertiesToJson(jobConfiguration));
}
//jsonObj.put("mapReduceJobState", mapReduceJobState.toJson());
//jsonObj.put("jobStats", jobStats.toJson());
return jsonObj;
Expand All @@ -141,8 +143,10 @@ public static JobDagNode fromJson(Object obj) throws Exception {
(String[]) features.toArray());
node.setParents((ArrayList<String>) jsonObj.get("parents"));
node.setSuccessors((ArrayList<String>) jsonObj.get("successors"));
node.setJobConfiguration(
propertiesFromJson(jsonObj.get("jobConfiguration")));
if (jsonObj.containsKey("jobConfiguration")) {
node.setJobConfiguration(
propertiesFromJson(jsonObj.get("jobConfiguration")));
}
// XXX mapReduceJobState
// XXX jobStats
return node;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright 2012 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package azkaban.viewer.pigvisualizer;

import java.io.IOException;

import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TIPStatus;
import org.apache.hadoop.mapred.TaskReport;

/**
* Container that holds state of a MapReduce job
*/
public class MapReduceJobState {
private String jobId;
private String jobName;
private String trackingURL;
private boolean isComplete;
private boolean isSuccessful;
private float mapProgress;
private float reduceProgress;
private long jobStartTime;
private long jobLastUpdateTime;

private int totalMappers;
private int finishedMappersCount;

private int totalReducers;
private int finishedReducersCount;

public MapReduceJobState() {
}

@SuppressWarnings("deprecation")
public MapReduceJobState(RunningJob runningJob,
TaskReport[] mapTaskReport,
TaskReport[] reduceTaskReport) throws IOException {
jobId = runningJob.getID().toString();
jobName = runningJob.getJobName();
trackingURL = runningJob.getTrackingURL();
isComplete = runningJob.isComplete();
isSuccessful = runningJob.isSuccessful();
mapProgress = runningJob.mapProgress();
reduceProgress = runningJob.reduceProgress();

totalMappers = mapTaskReport.length;
totalReducers = reduceTaskReport.length;

for (TaskReport report : mapTaskReport) {
if (report.getStartTime() < jobStartTime || jobStartTime == 0L) {
jobStartTime = report.getStartTime();
}

TIPStatus status = report.getCurrentStatus();
if (status != TIPStatus.PENDING && status != TIPStatus.RUNNING) {
finishedMappersCount++;
}
}

for (TaskReport report : reduceTaskReport) {
if (jobLastUpdateTime < report.getFinishTime()) {
jobLastUpdateTime = report.getFinishTime();
}

TIPStatus status = report.getCurrentStatus();
if (status != TIPStatus.PENDING && status != TIPStatus.RUNNING) {
finishedReducersCount++;
}
}

// If not all the reducers are finished.
if (finishedReducersCount != reduceTaskReport.length ||
jobLastUpdateTime == 0) {
jobLastUpdateTime = System.currentTimeMillis();
}
}

public String getJobId() {
return jobId;
}

public void setJobId(String jobId) {
this.jobId = jobId;
}

public String getJobName() {
return jobName;
}

public void setJobName(String jobName) {
this.jobName = jobName;
}

public String getTrackingURL() {
return trackingURL;
}

public void setTrackingURL(String trackingURL) {
this.trackingURL = trackingURL;
}

public boolean isComplete() {
return isComplete;
}

public void setComplete(boolean complete) {
isComplete = complete;
}

public boolean isSuccessful() {
return isSuccessful;
}

public void setSuccessful(boolean successful) {
isSuccessful = successful;
}

public float getMapProgress() {
return mapProgress;
}

public void setMapProgress(float mapProgress) {
this.mapProgress = mapProgress;
}

public float getReduceProgress() {
return reduceProgress;
}

public void setReduceProgress(float reduceProgress) {
this.reduceProgress = reduceProgress;
}

public int getTotalMappers() {
return totalMappers;
}

public void setTotalMappers(int totalMappers) {
this.totalMappers = totalMappers;
}

public int getTotalReducers() {
return totalReducers;
}

public void setTotalReducers(int totalReducers) {
this.totalReducers = totalReducers;
}

public int getFinishedMappersCount() {
return finishedMappersCount;
}

public void setFinishedMappersCount(int finishedMappersCount) {
this.finishedMappersCount = finishedMappersCount;
}

public int getFinishedReducersCount() {
return finishedReducersCount;
}

public void setFinishedReducersCount(int finishedReducersCount) {
this.finishedReducersCount = finishedReducersCount;
}

public long getJobStartTime() {
return jobStartTime;
}

public void setJobStartTime(long jobStartTime) {
this.jobStartTime = jobStartTime;
}

public long getJobLastUpdateTime() {
return jobLastUpdateTime;
}

public void setJobLastUpdateTime(long jobLastUpdateTime) {
this.jobLastUpdateTime = jobLastUpdateTime;
}
}

0 comments on commit 5054f88

Please sign in to comment.