-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-11706][Streaming]Fix the bug that Streaming Python tests cannot report failures #9669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bbfea87
1f3764b
1ea7a52
08d0f60
36ae6ba
a5699e0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -611,12 +611,16 @@ class CheckpointTests(unittest.TestCase): | |
@staticmethod | ||
def tearDownClass(): | ||
# Clean up in the JVM just in case there has been some issues in Python API | ||
jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive() | ||
if jStreamingContextOption.nonEmpty(): | ||
jStreamingContextOption.get().stop() | ||
jSparkContextOption = SparkContext._jvm.SparkContext.get() | ||
if jSparkContextOption.nonEmpty(): | ||
jSparkContextOption.get().stop() | ||
if SparkContext._jvm is not None: | ||
jStreamingContextOption = \ | ||
SparkContext._jvm.org.apache.spark.streaming.StreamingContext.getActive() | ||
if jStreamingContextOption.nonEmpty(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
jStreamingContextOption.get().stop() | ||
|
||
def setUp(self): | ||
self.ssc = None | ||
self.sc = None | ||
self.cpd = None | ||
|
||
def tearDown(self): | ||
if self.ssc is not None: | ||
|
@@ -626,6 +630,7 @@ def tearDown(self): | |
if self.cpd is not None: | ||
shutil.rmtree(self.cpd) | ||
|
||
@unittest.skip("Enable it when we fix the checkpoint bug") | ||
def test_get_or_create_and_get_active_or_create(self): | ||
inputd = tempfile.mkdtemp() | ||
outputd = tempfile.mkdtemp() + "/" | ||
|
@@ -648,7 +653,7 @@ def setup(): | |
self.cpd = tempfile.mkdtemp("test_streaming_cps") | ||
self.setupCalled = False | ||
self.ssc = StreamingContext.getOrCreate(self.cpd, setup) | ||
self.assertFalse(self.setupCalled) | ||
self.assertTrue(self.setupCalled) | ||
|
||
self.ssc.start() | ||
|
||
|
@@ -1322,11 +1327,16 @@ def search_kinesis_asl_assembly_jar(): | |
"or 'build/mvn -Pkinesis-asl package' before running this test.") | ||
|
||
sys.stderr.write("Running tests: %s \n" % (str(testcases))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
failed = False | ||
for testcase in testcases: | ||
sys.stderr.write("[Running %s]\n" % (testcase)) | ||
tests = unittest.TestLoader().loadTestsFromTestCase(testcase) | ||
if xmlrunner: | ||
unittest.main(tests, verbosity=3, | ||
testRunner=xmlrunner.XMLTestRunner(output='target/test-reports')) | ||
result = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=3).run(tests) | ||
if not result.wasSuccessful(): | ||
failed = True | ||
else: | ||
unittest.TextTestRunner(verbosity=3).run(tests) | ||
result = unittest.TextTestRunner(verbosity=3).run(tests) | ||
if not result.wasSuccessful(): | ||
failed = True | ||
sys.exit(failed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamingContext doesn't have a static
_jvm
. It's an instance field.