Skip to content

Commit 25590c9

Browse files
author
Davies Liu
committed
update with Java API
1 parent 360de2d commit 25590c9

File tree

4 files changed

+118
-37
lines changed

4 files changed

+118
-37
lines changed

examples/src/main/python/status_api_demo.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import threading
2020
import Queue
2121

22-
from pyspark import SparkContext
22+
from pyspark import SparkConf, SparkContext
2323

2424

2525
def delayed(seconds):
@@ -38,26 +38,26 @@ def call_in_background(f, *args):
3838

3939

4040
def main():
41-
sc = SparkContext(appName="PythonStatusAPIDemo")
41+
conf = SparkConf().set("spark.ui.showConsoleProgress", "false")
42+
sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf)
4243

4344
def run():
44-
sc.setJobGroup("demo", "demo status api")
4545
rdd = sc.parallelize(range(10), 10).map(delayed(2))
4646
reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
4747
return reduced.map(delayed(2)).collect()
4848

4949
result = call_in_background(run)
50-
50+
status = sc.statusTracker()
5151
while result.empty():
52-
ids = sc.getJobIdsForGroup("demo")
52+
ids = status.getJobIdsForGroup()
5353
for id in ids:
54-
job = sc.getJobInfo(id)
55-
print "Job", id, "status: ", job.status()
56-
for sid in job.stageIds():
57-
info = sc.getStageInfo(sid)
54+
job = status.getJobInfo(id)
55+
print "Job", id, "status: ", job.status
56+
for sid in job.stageIds:
57+
info = status.getStageInfo(sid)
5858
if info:
5959
print "Stage %d: %d tasks total (%d active, %d complete)" % \
60-
(sid, info.numTasks(), info.numActiveTasks(), info.numCompletedTasks())
60+
(sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks)
6161
time.sleep(1)
6262

6363
print "Job results are:", result.get()

python/pyspark/__init__.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,17 @@
2222
2323
- :class:`SparkContext`:
2424
Main entry point for Spark functionality.
25-
- L{RDD}
25+
- :class:`RDD`:
2626
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
27-
- L{Broadcast}
27+
- :class:`Broadcast`:
2828
A broadcast variable that gets reused across tasks.
29-
- L{Accumulator}
29+
- :class:`Accumulator`:
3030
An "add-only" shared variable that tasks can only add values to.
31-
- L{SparkConf}
31+
- :class:`SparkConf`:
3232
For configuring Spark.
33-
- L{SparkFiles}
33+
- :class:`SparkFiles`:
3434
Access files shipped with jobs.
35-
- L{StorageLevel}
35+
- :class:`StorageLevel`:
3636
Finer-grained cache persistence levels.
3737
3838
"""
@@ -45,11 +45,13 @@
4545
from pyspark.accumulators import Accumulator, AccumulatorParam
4646
from pyspark.broadcast import Broadcast
4747
from pyspark.serializers import MarshalSerializer, PickleSerializer
48+
from pyspark.status import *
4849

4950
# for back compatibility
5051
from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row
5152

5253
__all__ = [
5354
"SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast",
5455
"Accumulator", "AccumulatorParam", "MarshalSerializer", "PickleSerializer",
56+
"StatusTracker", "SparkJobInfo", "SparkStageInfo"
5557
]

python/pyspark/context.py

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from pyspark.storagelevel import StorageLevel
3434
from pyspark.rdd import RDD
3535
from pyspark.traceback_utils import CallSite, first_spark_call
36+
from pyspark.status import StatusTracker
3637

3738
from py4j.java_collections import ListConverter
3839

@@ -800,29 +801,11 @@ def cancelAllJobs(self):
800801
"""
801802
self._jsc.sc().cancelAllJobs()
802803

803-
def getJobIdsForGroup(self, jobGroup):
804+
def statusTracker(self):
804805
"""
805-
Return a list of all known jobs in a particular job group.
806-
807-
The returned list may contain running, failed, and completed jobs, and may
808-
vary across invocations of this method. This method does not guarantee the
809-
order of the elements in its result.
810-
"""
811-
return list(self._jsc.getJobIdsForGroup(jobGroup))
812-
813-
def getJobInfo(self, jobId):
814-
"""
815-
Returns job information, or `None` if the job info could not be found
816-
or was garbage collected.
817-
"""
818-
return self._jsc.getJobInfo(jobId)
819-
820-
def getStageInfo(self, stageId):
821-
"""
822-
Returns stage information, or `None` if the stage info could not be found or was
823-
garbage collected.
806+
Return :class:`StatusTracker` object
824807
"""
825-
return self._jsc.getStageInfo(stageId)
808+
return StatusTracker(self._jsc.statusTracker())
826809

827810
def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
828811
"""

python/pyspark/status.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
from collections import namedtuple
19+
20+
__all__ = ["SparkJobInfo", "SparkStageInfo", "StatusTracker"]
21+
22+
23+
class SparkJobInfo(namedtuple("SparkJobInfo", "jobId stageIds status")):
24+
"""
25+
Exposes information about Spark Jobs.
26+
"""
27+
28+
29+
class SparkStageInfo(namedtuple("SparkStageInfo",
30+
"stageId currentAttemptId name numTasks numActiveTasks "
31+
"numCompletedTasks numFailedTasks")):
32+
"""
33+
Exposes information about Spark Stages.
34+
"""
35+
36+
37+
class StatusTracker(object):
38+
"""
39+
Low-level status reporting APIs for monitoring job and stage progress.
40+
41+
These APIs intentionally provide very weak consistency semantics;
42+
consumers of these APIs should be prepared to handle empty / missing
43+
information. For example, a job's stage ids may be known but the status
44+
API may not have any information about the details of those stages, so
45+
`getStageInfo` could potentially return `None` for a valid stage id.
46+
47+
To limit memory usage, these APIs only provide information on recent
48+
jobs / stages. These APIs will provide information for the last
49+
`spark.ui.retainedStages` stages and `spark.ui.retainedJobs` jobs.
50+
"""
51+
def __init__(self, jtracker):
52+
self._jtracker = jtracker
53+
54+
def getJobIdsForGroup(self, jobGroup=None):
55+
"""
56+
Return a list of all known jobs in a particular job group. If
57+
`jobGroup` is None, then returns all known jobs that are not
58+
associated with a job group.
59+
60+
The returned list may contain running, failed, and completed jobs,
61+
and may vary across invocations of this method. This method does
62+
not guarantee the order of the elements in its result.
63+
"""
64+
return list(self._jtracker.getJobIdsForGroup(jobGroup))
65+
66+
def getActiveStageIds(self):
67+
"""
68+
Returns an array containing the ids of all active stages.
69+
"""
70+
return sorted(list(self._jtracker.getActiveStageIds()))
71+
72+
def getActiveJobsIds(self):
73+
"""
74+
Returns an array containing the ids of all active jobs.
75+
"""
76+
return sorted((list(self._jtracker.getActiveJobIds())))
77+
78+
def getJobInfo(self, jobId):
79+
"""
80+
Returns a :class:`SparkJobInfo` object, or None if the job info
81+
could not be found or was garbage collected.
82+
"""
83+
job = self._jtracker.getJobInfo(jobId)
84+
if job is not None:
85+
return SparkJobInfo(jobId, job.stageIds(), str(job.status()))
86+
87+
def getStageInfo(self, stageId):
88+
"""
89+
Returns a :class:`SparkStageInfo` object, or None if the stage
90+
info could not be found or was garbage collected.
91+
"""
92+
stage = self._jtracker.getStageInfo(stageId)
93+
if stage is not None:
94+
# TODO: fetch them in batch for better performance
95+
attrs = [getattr(stage, f)() for f in SparkStageInfo._fields[1:]]
96+
return SparkStageInfo(stageId, *attrs)

0 commit comments

Comments
 (0)