Skip to content

[SPARK-23143][SS][PYTHON] Added python API for setting continuous trigger #20309

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ def queryName(self, queryName):

@keyword_only
@since(2.0)
def trigger(self, processingTime=None, once=None):
def trigger(self, processingTime=None, once=None, continuous=None):
"""Set the trigger for the stream query. If this is not set it will run the query as fast
as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.

Expand All @@ -802,23 +802,38 @@ def trigger(self, processingTime=None, once=None):
>>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
>>> # trigger the query for just once batch of data
>>> writer = sdf.writeStream.trigger(once=True)
>>> # trigger the query for execution every 5 seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not quite the right comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise

>>> writer = sdf.writeStream.trigger(continuous='5 seconds')
"""
params = [processingTime, once, continuous]

if params.count(None) == 3:
raise ValueError('No trigger provided')
elif params.count(None) < 2:
raise ValueError('Multiple triggers not allowed.')

jTrigger = None
if processingTime is not None:
if once is not None:
raise ValueError('Multiple triggers not allowed.')
if type(processingTime) != str or len(processingTime.strip()) == 0:
raise ValueError('Value for processingTime must be a non empty string. Got: %s' %
processingTime)
interval = processingTime.strip()
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.ProcessingTime(
interval)

elif once is not None:
if once is not True:
raise ValueError('Value for once must be True. Got: %s' % once)
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once()

else:
raise ValueError('No trigger provided')
if type(continuous) != str or len(continuous.strip()) == 0:
raise ValueError('Value for continuous must be a non empty string. Got: %s' %
continuous)
interval = continuous.strip()
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Continuous(
interval)

self._jwrite = self._jwrite.trigger(jTrigger)
return self

Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1518,6 +1518,12 @@ def test_stream_trigger(self):
except ValueError:
pass

# Should not take multiple args
try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas, could we maybe test this by assertRaises if it checks raising an exception?

df.writeStream.trigger(processingTime='5 seconds', continuous='1 second')
except ValueError:
pass

# Should take only keyword args
try:
df.writeStream.trigger('5 seconds')
Expand Down