Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement query based timeouts #9

Merged

Conversation

chitralverma
Copy link
Collaborator

Fixes #8 .

Addes the following new features,

  1. Allows timout specification on query level using Spark Conf
--conf spark.sql.streaming.stateStore.stateExpirySecs.queryName1=5
--conf spark.sql.streaming.stateStore.stateExpirySecs.queryName2=10
  1. Allows the use of helper method to change the state store provider,
import ru.chermenin.spark.sql.execution.streaming.state.implicits._

val spark = SparkSession.builder().master(...).useRocksDBStateStore().getOrCreate()
  1. Allows the use of helper method to set state timeout,
streamingDF.writeStream
     .format(...)
     .outputMode(...)
     .trigger(Trigger.ProcessingTime(1000L))
     .stateTimeout(spark.conf, queryName="myQuery1", expirySecs = 5, checkpointLocation ="chkpntloc")
     .start()

Added test cases and updated README/ code documentation for the same.

@chitralverma
Copy link
Collaborator Author

@chermenin Can you please look at this ?

@chitralverma
Copy link
Collaborator Author

@chermenin I've incorporated the review comments.

@chitralverma
Copy link
Collaborator Author

@chermenin can you please merge this. thanks. I was about to accidentally close it

@chermenin chermenin merged commit 0aace74 into chermenin:master Nov 14, 2019
chermenin pushed a commit that referenced this pull request Apr 24, 2020
…-backups to release/RE19C

* commit '0dec75730883af3edac2c9253c0a1644c0c1168c': (28 commits)
  bump version
  avoid flush and creating shared files when opening db
  improve cleanup of earlier files of the same version
  fix FileNotFoundException in cleanupRemoteBackupSharedFiles
  bump version
  fix lazy loading of shared files
  fix linux file separator parsing
  update default scala and spark version
  improve logging
  bump version
  check if removeExpiredRowsInMaintenanceColName is present in schema, otherwise disable removeExpiredRowsInMaintenance
  fix getting values from null-rows
  implement "loadRemoteBackupSelective" and dont "cleanupRemoteBackups"
  fix unit test "StateStore.get" to use RocksDbStateStoreProvider instead of HDFSStateStoreProvider
  use existing config strings for SQLConf
  improve logging
  fix open db before schema evolution
  avoid NullPointerExceptions on forced shutdown
  improve rocksDb size logging implement embedded maintenance
  cleanup rotating backup key migration code pauseBackgroundWork on closeDb
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow State timeouts on Query Level
2 participants