Skip to content

[SPARK-2103][Streaming] Change to ClassTag for KafkaInputDStream and fix reflection issue #1508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

jerryshao
Copy link
Contributor

This PR updates previous Manifest for KafkaInputDStream's Decoder to ClassTag, also fix the problem addressed in SPARK-2103.

Previous Java interface cannot actually get the type of Decoder, so when using this Manifest to reconstruct the decode object will meet reflection exception.

Also for other two Java interfaces, ClassTag[String] is useless because calling Scala API will get the right implicit ClassTag.

Current Kafka unit test cannot actually verify the interface. I've tested these interfaces in my local and distribute settings.

@SparkQA
Copy link

SparkQA commented Jul 21, 2014

QA tests have started for PR 1508. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16900/consoleFull

@srowen
Copy link
Member

srowen commented Jul 21, 2014

Nice one Jerry! this actually enables using Kafka with non-String data in Java.

@SparkQA
Copy link

SparkQA commented Jul 21, 2014

QA results for PR 1508:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16900/consoleFull

@srowen
Copy link
Member

srowen commented Jul 21, 2014

It's the MIMA test that fails, since the method signature is changed. It's possible to keep and deprecate the existing method of course. Should we just do that, or OK to remove the method on the grounds that the API doesn't quite work?

@jerryshao
Copy link
Contributor Author

Hi @srowen , thanks for your advice. I've also noticed that seems MIMA cannot allow me to change the method signature. But I think it is meaningful and will not change the user's code, only need to recompile.

@tdas, would you mind giving me some suggestions, thanks a lot.

@tdas
Copy link
Contributor

tdas commented Jul 30, 2014

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA tests have started for PR 1508. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17484/consoleFull

@tdas
Copy link
Contributor

tdas commented Jul 30, 2014

LGTM. I am okay with the binary compatibility change, since it is really wrong that we were using manifest instead of classtags.

But I am not sure I really understand what is the reason behind the failure of the
val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
Why could the manifest not get the runtimeClass correctly?

@tdas
Copy link
Contributor

tdas commented Jul 30, 2014

Oh, please add the appropriate exclude in the Mima exclusions
https://github.com/apache/spark/blob/master/project/MimaExcludes.scala

See the error in the jenkins console output for the exact exception string that you need to add.

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA results for PR 1508:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17484/consoleFull

@tdas
Copy link
Contributor

tdas commented Jul 30, 2014

This failure is unrelated to this patch. Its a current known issue in Jenkins, will run the test again when this error gets cleared.

@jerryshao
Copy link
Contributor Author

Hi TD, thanks for your review. I think the previous issue is that:

implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]

This line of code cannot actually get the correct runtime class, the runtime class of keyCmd is Object as SPARK-2103 mentioned.

Because we only got Object, so create through reflection will get exception is this line:

val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])

The right way is like this:

implicit val keyCmd: Manifest[U] = Manifest.classType(keyDecodeClass)

Manifest can also do the same thing if we correctly feed it with class info. But I think it would be better to change Manifest to ClassTag to keep align with other code, so I changed to use ClassTag.

I will add this to Mima exclude.

Thanks a lot.

@tdas
Copy link
Contributor

tdas commented Jul 31, 2014

I get it now. Let me try to run the Jenkins once again, to figure out what needs to be added to Mima

@tdas
Copy link
Contributor

tdas commented Jul 31, 2014

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA tests have started for PR 1508. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17533/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA results for PR 1508:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17533/consoleFull

@tdas
Copy link
Contributor

tdas commented Jul 31, 2014

Again, this is an unrelated failure. Running it again.

@tdas
Copy link
Contributor

tdas commented Jul 31, 2014

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA tests have started for PR 1508. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17584/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA results for PR 1508:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17584/consoleFull

@tdas
Copy link
Contributor

tdas commented Jul 31, 2014

Can you add these to the Mima excludes.

ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.kafka.KafkaUtils.createStream")

ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.kafka.KafkaReceiver.this")

The last one should not be necessary as it is not a public class. Let me look into this. No harm in adding the exclusion though.

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1508. This patch DID NOT merge cleanly!
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17627/consoleFull

@tdas
Copy link
Contributor

tdas commented Aug 1, 2014

@jerryshao Oops, not sure how that is possible given what this patch touches. Can you merge with master nonetheless.

@jerryshao
Copy link
Contributor Author

Ok, I will update the code.

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1508. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17629/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 1508:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17629/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 1508:
- This patch PASSES unit tests.

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17627/consoleFull

@tdas
Copy link
Contributor

tdas commented Aug 1, 2014

OK, this is very confusing, sequence of two results is very confusing. Let me run the tests again.

@tdas
Copy link
Contributor

tdas commented Aug 1, 2014

Jenkins, test this again.

@tdas
Copy link
Contributor

tdas commented Aug 1, 2014

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1508. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17670/consoleFull

@srowen
Copy link
Member

srowen commented Aug 1, 2014

A bunch of tests have been failing spuriously with "java.net.BindException: Address already in use". It's not the PR. I wonder what recent change could have made this happen? Is something being stricter about assigning a fixed port? did a config change to let multiple tests run on the same virtual machine?

@tdas
Copy link
Contributor

tdas commented Aug 1, 2014

Also, a number of them have been failing for spurious python mllib issues. At least was the case yesterday.

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 1508:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17670/consoleFull

@tdas
Copy link
Contributor

tdas commented Aug 1, 2014

Heyyyaa, it finally passed. I am merging this. Thanks @jerryshao

@asfgit asfgit closed this in a32f0fb Aug 1, 2014
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
…fix reflection issue

This PR updates previous Manifest for KafkaInputDStream's Decoder to ClassTag, also fix the problem addressed in [SPARK-2103](https://issues.apache.org/jira/browse/SPARK-2103).

Previous Java interface cannot actually get the type of Decoder, so when using this Manifest to reconstruct the decode object will meet reflection exception.

Also for other two Java interfaces, ClassTag[String] is useless because calling Scala API will get the right implicit ClassTag.

Current Kafka unit test cannot actually verify the interface. I've tested these interfaces in my local and distribute settings.

Author: jerryshao <saisai.shao@intel.com>

Closes apache#1508 from jerryshao/SPARK-2103 and squashes the following commits:

e90c37b [jerryshao] Add Mima excludes
7529810 [jerryshao] Change Manifest to ClassTag for KafkaInputDStream's Decoder and fix Decoder construct issue when using Java API
@salex89
Copy link

salex89 commented Feb 16, 2015

I'm not sure if this is the exactly same issue, but I am experience this with Spark 1.2.1 and Scala 2.10.4 also.

ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 -   java.lang.NoSuchMethodException: UserLocationEventDecoder.<init>(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:2971)
at java.lang.Class.getConstructor(Class.java:1812)
at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106)
at   org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
at   org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:288)
at   org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:280)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

@jerryshao
Copy link
Contributor Author

Hi @salex89 , seems your exception is a little different to this PR, maybe I need a careful look at this exception, thanks for your reporting.

@salex89
Copy link

salex89 commented Feb 20, 2015

@jerryshao
OK, sorry for necroposting then. Should I open an issue on JIRA with the snippet?

@jerryshao
Copy link
Contributor Author

Yeah, please do it and describe it in detail so we can easily reproduce it.

sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…locks.enabled` by default (apache#1508)

### What changes were proposed in this pull request?

rdar://99068608 (SPARK-40198 Enable `spark.storage.decommission.(rdd|shuffle)Blocks.enabled` by default)

This PR aims to enable the following two configurations by default. Note that these storage configurations are disabled by default by the parent configuration, `spark.storage.decommission.enabled`, still. In addition, all decommission features including this storage decommissioning are disabled by `spark.decommission.enabled` by default.
- `spark.storage.decommission.rddBlocks.enabled`
- `spark.storage.decommission.shuffleBlocks.enabled`

### Why are the changes needed?

This will help users use the storage decommissioning feature more easily.

### Does this PR introduce _any_ user-facing change?

Yes, this will reduce the number of required configurations for decommission users.
However, these two configurations are disabled by two configurations still.

### How was this patch tested?

Pass the CIs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants