Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid initializing all versions' state in initStateStore method #5

Merged
merged 1 commit into from
Jun 28, 2019
Merged

Avoid initializing all versions' state in initStateStore method #5

merged 1 commit into from
Jun 28, 2019

Conversation

LiShuMing
Copy link
Contributor

Codes below iterates all versions to initStateStore which wastes a lot of time.

 val stateStore = versions.sorted(Ordering.Long.reverse)
    .map(version => Try(loadDb(version)).map(initStateStore))
   .find(_.isSuccess).map(_.get)

Only the nearest version is needed.

@chermenin
Copy link
Owner

@LiShuMing If the last version of the snapshot is corrupted we always can try to restore state from earlier versions. Applying your changes we will lose this opportunity. There are test cases for this and in this PR they failed.

The current implementation allows load only the last version and doesn't iterate by all versions in the case when the last version is fine. Also, you can take a look at the find method implementation.

@LiShuMing
Copy link
Contributor Author

LiShuMing commented Jun 25, 2019

Thanks for you replies.

If the last version of the snapshot is corrupted we always can try to restore state from earlier versions. Applying your changes we will lose this opportunity. There are test cases for this and in this PR they failed.

Here I may change original policy that cannot restore state from earlier versions. I will change my codes later.

Also, you can take a look at the find method implementation.

import scala.util.Try

object FindCase {
  def parseInt(value: String): Try[Int] = Try(value.toInt)
  def main(args: Array[String]): Unit = {

    val a = Array("a", "1", "2", "3")

    val xx = a.map(x => {
      println(s"x=${x}")
      parseInt(x)
    }).find(_.isSuccess).getOrElse("0")

    println(xx)
  }
}

--result
x=a
x=1
x=2
x=3
Success(1)

This is an example to point that find method will iterate all codes in map. so in initStateStore method, loadDb will be called for all versions which will waste a lot of time.

@chermenin
Copy link
Owner

Oh, big thanks for the example, looks a bit strange, but we can just fix using toStream before the map functions, can't we?

@LiShuMing
Copy link
Contributor Author

Yep, you are right. I've tried codes below:

import scala.util.Try

object FindCase {
  def parseInt(value: String): Try[Int] = Try(value.toInt)
  def main(args: Array[String]): Unit = {

    val a = Array("a", "1", "2", "3")

    val xx = a.toStream.map(x => {
      println(s"x=${x}")
      parseInt(x)
    }).find(_.isSuccess).getOrElse("0")

    println(xx)
  }
}

--result
x=a
x=1
Success(1)

Then I will update my PR, can you review it again?

@chitralverma
Copy link
Collaborator

chitralverma commented Jun 25, 2019

@LiShuMing, @chermenin
Here are my thoughts,

This is an example to point that find method will iterate all codes in map

This statement is incorrect. As the print statements are inside the map not the find. Check this out.

object Test {
  def main(args: Array[String]): Unit = {

    val arr = Array(1, 2, 3, 4)

    val r = arr.find(x => {
      println(s"x = $x")

      x % 2 == 0
    })

    println(r)
  }
}

This results in the following, which proves that find results in the first match of the predicate,


x = 1
x = 2
Some(2)

Now coming to the point of loading all snapshots, yes the current implementation is sub optimal. I'd like to suggest a simpler and clearer approach.

For the current code,

val stateStore = versions.sorted(Ordering.Long.reverse)
    .map(version => Try(loadDb(version)).map(initStateStore))
   .find(_.isSuccess).map(_.get)

we can change it to something like,

  val stateStore = versions.sorted(Ordering.Long.reverse)
    .find(version => Try(loadDb(version)).isSuccess)
    .map(version => initStateStore(loadDb(version)))
    .getOrElse(initStateStore(getTempDir(getTempPrefix(), s".$version")))

Supporting arguments for this implementation,

  • limits the number of actual initStateStore calls to 1
  • Only the recent most non corrupted snapshot is found.
  • No need to stream the results of the array using iterator, view or toStream
  • Gets rid of the nasty .map(_.get)

Note, that loadDb maybe called multiple times (length of versions + 1 in the worst case) but its still okay its mostly path construction. (Although, I must add that loadDb implementation must also be revisited since its doing some file movement which may be dangerous)

What are your thoughts?

@chermenin
Copy link
Owner

@chitralverma As I think, using the toStream method will be better for the current implementation of the loadDb function. But your point makes sense as well. So, how do you look at the following: to merge this PR as is for now and to submit an issue for the described points and methods refactoring?

@chitralverma
Copy link
Collaborator

@chermenin I believe we can merge this pull request and also raise an issue to revisit the loadDb implementation.

I'll create an issue right away.

@LiShuMing LiShuMing changed the title Only load state store of the version needed once to avoid initializing all versions's state Avoid initializing all versions' state in initStateStore method Jun 27, 2019
@LiShuMing
Copy link
Contributor Author

LiShuMing commented Jun 28, 2019

So can this patch be merged into master?

@chermenin chermenin merged commit fecba31 into chermenin:master Jun 28, 2019
@chermenin
Copy link
Owner

@LiShuMing yep, it's merged 😃

zzeekk pushed a commit to zzeekk/spark-states that referenced this pull request Sep 4, 2019
…nspection to release/RE19A

* commit '2e329865360204bf428d216c1f1ac418b7310d04':
  change rocksdb log level to ERROR, bump version
  bump version
  add function for external state inspection, log index file content for debugging consistency problems with S3
  fix handling for directory entries on decompress
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants