Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into improve_ts
Browse files Browse the repository at this point in the history
Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
	sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
	sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
	sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
	sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
  • Loading branch information
Davies Liu committed Jun 13, 2015
2 parents a3171b8 + d986fb9 commit c834108
Show file tree
Hide file tree
Showing 157 changed files with 1,676 additions and 1,356 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ function renderDagViz(forJob) {

// Find cached RDDs and mark them as such
metadataContainer().selectAll(".cached-rdd").each(function(v) {
var nodeId = VizConstants.nodePrefix + d3.select(this).text();
var rddId = d3.select(this).text().trim();
var nodeId = VizConstants.nodePrefix + rddId;
svg.selectAll("g." + nodeId).classed("cached", true);
});

Expand All @@ -150,7 +151,7 @@ function renderDagViz(forJob) {
/* Render the RDD DAG visualization on the stage page. */
function renderDagVizForStage(svgContainer) {
var metadata = metadataContainer().select(".stage-metadata");
var dot = metadata.select(".dot-file").text();
var dot = metadata.select(".dot-file").text().trim();
var containerId = VizConstants.graphPrefix + metadata.attr("stage-id");
var container = svgContainer.append("g").attr("id", containerId);
renderDot(dot, container, false);
Expand Down
12 changes: 11 additions & 1 deletion docs/streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ Next, we discuss how to use this approach in your streaming application.
See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).

</div>
<div data-lang="python" markdown="1">
from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py).
</div>
</div>

Expand Down Expand Up @@ -147,10 +154,13 @@ Next, we discuss how to use this approach in your streaming application.
}
);
</div>
<div data-lang="python" markdown="1">
Not supported
</div>
</div>

You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.

Another thing to note is that since this approach does not use Receivers, the standard receiver-related (that is, [configurations](configuration.html) of the form `spark.streaming.receiver.*` ) will not apply to the input DStreams created by this approach (will apply to other input DStreams though). Instead, use the [configurations](configuration.html) `spark.streaming.kafka.*`. An important one is `spark.streaming.kafka.maxRatePerPartition` which is the maximum rate at which each Kafka partition will be read by this direct API.

3. **Deploying:** Similar to the first approach, you can package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR and the launch the application using `spark-submit`. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation.
3. **Deploying:** This is same as the first approach, for Scala, Java and Python.
24 changes: 15 additions & 9 deletions docs/streaming-kinesis-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

val kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position])
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)

See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the Running the Example section for instructions on how to run the example.
Expand All @@ -44,7 +45,8 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]);
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2);

See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the next subsection for instructions to run the example.
Expand All @@ -54,19 +56,23 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m

- `streamingContext`: StreamingContext containg an application name used by Kinesis to tie this Kinesis application to the Kinesis stream

- `[Kinesis stream name]`: The Kinesis stream that this streaming application receives from
- The application name used in the streaming context becomes the Kinesis application name
- `[Kineiss app name]`: The application name that will be used to checkpoint the Kinesis
sequence numbers in DynamoDB table.
- The application name must be unique for a given account and region.
- The Kinesis backend automatically associates the application name to the Kinesis stream using a DynamoDB table (always in the us-east-1 region) created during Kinesis Client Library initialization.
- Changing the application name or stream name can lead to Kinesis errors in some cases. If you see errors, you may need to manually delete the DynamoDB table.
- If the table exists but has incorrect checkpoint information (for a different stream, or
old expired sequenced numbers), then there may be temporary errors.

- `[Kinesis stream name]`: The Kinesis stream that this streaming application will pull data from.

- `[endpoint URL]`: Valid Kinesis endpoints URL can be found [here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).

- `[region name]`: Valid Kinesis region names can be found [here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html).

- `[checkpoint interval]`: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.

- `[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see Kinesis Checkpointing section and Amazon Kinesis API documentation for more details).

In other versions of the API, you can also specify the AWS access key and secret key directly.

3. **Deploying:** Package `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).

Expand Down Expand Up @@ -122,12 +128,12 @@ To run the example,
<div class="codetabs">
<div data-lang="scala" markdown="1">

bin/run-example streaming.KinesisWordCountASL [Kinesis stream name] [endpoint URL]
bin/run-example streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]

</div>
<div data-lang="java" markdown="1">

bin/run-example streaming.JavaKinesisWordCountASL [Kinesis stream name] [endpoint URL]
bin/run-example streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]

</div>
</div>
Expand All @@ -136,7 +142,7 @@ To run the example,

- To generate random string data to put onto the Kinesis stream, in another terminal, run the associated Kinesis data producer.

bin/run-example streaming.KinesisWordCountProducerASL [Kinesis stream name] [endpoint URL] 1000 10
bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10

This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream. This data should then be received and processed by the running example.

Expand Down
70 changes: 24 additions & 46 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ main entry point for all streaming functionality. We create a local StreamingCon
{% highlight scala %}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 1.3+
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
Expand Down Expand Up @@ -109,7 +109,7 @@ each line will be split into multiple words and the stream of words is represent
`words` DStream. Next, we want to count these words.

{% highlight scala %}
import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 1.3+
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
Expand Down Expand Up @@ -682,7 +682,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
### Advanced Sources
{:.no_toc}

<span class="badge" style="background-color: grey">Python API</span> As of Spark 1.3,
<span class="badge" style="background-color: grey">Python API</span> As of Spark {{site.SPARK_VERSION_SHORT}},
out of these sources, *only* Kafka is available in the Python API. We will add more advanced sources in the Python API in future.

This category of sources require interfacing with external non-Spark libraries, some of them with
Expand Down Expand Up @@ -723,7 +723,7 @@ and it in the classpath.

Some of these advanced sources are as follows.

- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kafka 0.8.1.1. See the [Kafka Integration Guide](streaming-kafka-integration.html) for more details.
- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kafka 0.8.2.1. See the [Kafka Integration Guide](streaming-kafka-integration.html) for more details.

- **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Flume 1.4.0. See the [Flume Integration Guide](streaming-flume-integration.html) for more details.

Expand Down Expand Up @@ -991,8 +991,9 @@ cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(.
</div>
</div>

In fact, you can also use [machine learning](mllib-guide.html) and
[graph computation](graphx-programming-guide.html) algorithms in the `transform` method.
Note that the supplied function gets called in every batch interval. This allows you to do
time-varying RDD operations, that is, RDD operations, number of partitions, broadcast variables,
etc. can be changed between batches.

#### Window Operations
{:.no_toc}
Expand Down Expand Up @@ -1427,38 +1428,18 @@ You can easily use [DataFrames and SQL](sql-programming-guide.html) operations o
<div data-lang="scala" markdown="1">
{% highlight scala %}

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
@transient private var instance: SQLContext = null

// Instantiate SQLContext on demand
def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}

...

/** Case class for converting RDD to DataFrame */
case class Row(word: String)

...

/** DataFrame operations inside your streaming program */

val words: DStream[String] = ...

words.foreachRDD { rdd =>

// Get the singleton instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._

// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Row(w)).toDF()
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF("word")

// Register as table
wordsDataFrame.registerTempTable("words")
Expand All @@ -1476,19 +1457,6 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma
<div data-lang="java" markdown="1">
{% highlight java %}

/** Lazily instantiated singleton instance of SQLContext */
class JavaSQLContextSingleton {
static private transient SQLContext instance = null;
static public SQLContext getInstance(SparkContext sparkContext) {
if (instance == null) {
instance = new SQLContext(sparkContext);
}
return instance;
}
}

...

/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
private String word;
Expand All @@ -1512,7 +1480,9 @@ words.foreachRDD(
new Function2<JavaRDD<String>, Time, Void>() {
@Override
public Void call(JavaRDD<String> rdd, Time time) {
SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());

// Get the singleton instance of SQLContext
SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());

// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
Expand Down Expand Up @@ -2234,7 +2204,7 @@ The following table summarizes the semantics under failures:

### With Kafka Direct API
{:.no_toc}
In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees. This approach (experimental as of Spark 1.3) is further discussed in the [Kafka Integration Guide](streaming-kafka-integration.html).
In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees. This approach (experimental as of Spark {{site.SPARK_VERSION_SHORT}}) is further discussed in the [Kafka Integration Guide](streaming-kafka-integration.html).

## Semantics of output operations
{:.no_toc}
Expand All @@ -2248,8 +2218,16 @@ additional effort may be necessary to achieve exactly-once semantics. There are

- *Transactional updates*: All updates are made transactionally so that updates are made exactly once atomically. One way to do this would be the following.

- Use the batch time (available in `foreachRDD`) and the partition index of the transformed RDD to create an identifier. This identifier uniquely identifies a blob data in the streaming application.
- Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. That is, if the identifier is not already committed, commit the partition data and the identifier atomically. Else if this was already committed, skip the update.
- Use the batch time (available in `foreachRDD`) and the partition index of the RDD to create an identifier. This identifier uniquely identifies a blob data in the streaming application.
- Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. That is, if the identifier is not already committed, commit the partition data and the identifier atomically. Else if this was already committed, skip the update.

dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}


***************************************************************************************************
Expand Down
2 changes: 2 additions & 0 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"1.2.1",
"1.3.0",
"1.3.1",
"1.4.0",
])

SPARK_TACHYON_MAP = {
Expand All @@ -82,6 +83,7 @@
"1.2.1": "0.5.0",
"1.3.0": "0.5.0",
"1.3.1": "0.5.0",
"1.4.0": "0.6.4",
}

DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ object LogisticRegressionExample {
val elapsedTime = (System.nanoTime() - startTime) / 1e9
println(s"Training time: $elapsedTime seconds")

val lirModel = pipelineModel.stages.last.asInstanceOf[LogisticRegressionModel]
val lorModel = pipelineModel.stages.last.asInstanceOf[LogisticRegressionModel]
// Print the weights and intercept for logistic regression.
println(s"Weights: ${lirModel.weights} Intercept: ${lirModel.intercept}")
println(s"Weights: ${lorModel.weights} Intercept: ${lorModel.intercept}")

println("Training data results:")
DecisionTreeExample.evaluateClassificationModel(pipelineModel, training, "indexedLabel")
Expand Down
4 changes: 2 additions & 2 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ private[spark] object BLAS extends Serializable with Logging {
def scal(a: Double, x: Vector): Unit = {
x match {
case sx: SparseVector =>
f2jBLAS.dscal(sx.values.size, a, sx.values, 1)
f2jBLAS.dscal(sx.values.length, a, sx.values, 1)
case dx: DenseVector =>
f2jBLAS.dscal(dx.values.size, a, dx.values, 1)
f2jBLAS.dscal(dx.values.length, a, dx.values, 1)
case _ =>
throw new IllegalArgumentException(s"scal doesn't support vector type ${x.getClass}.")
}
Expand Down
26 changes: 21 additions & 5 deletions mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,28 @@ object MLUtils {
* Returns a new vector with `1.0` (bias) appended to the input vector.
*/
def appendBias(vector: Vector): Vector = {
val vector1 = vector.toBreeze match {
case dv: BDV[Double] => BDV.vertcat(dv, new BDV[Double](Array(1.0)))
case sv: BSV[Double] => BSV.vertcat(sv, new BSV[Double](Array(0), Array(1.0), 1))
case v: Any => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
vector match {
case dv: DenseVector =>
val inputValues = dv.values
val inputLength = inputValues.length
val outputValues = Array.ofDim[Double](inputLength + 1)
System.arraycopy(inputValues, 0, outputValues, 0, inputLength)
outputValues(inputLength) = 1.0
Vectors.dense(outputValues)
case sv: SparseVector =>
val inputValues = sv.values
val inputIndices = sv.indices
val inputValuesLength = inputValues.length
val dim = sv.size
val outputValues = Array.ofDim[Double](inputValuesLength + 1)
val outputIndices = Array.ofDim[Int](inputValuesLength + 1)
System.arraycopy(inputValues, 0, outputValues, 0, inputValuesLength)
System.arraycopy(inputIndices, 0, outputIndices, 0, inputValuesLength)
outputValues(inputValuesLength) = 1.0
outputIndices(inputValuesLength) = dim
Vectors.sparse(dim + 1, outputIndices, outputValues)
case _ => throw new IllegalArgumentException(s"Do not support vector type ${vector.getClass}")
}
Vectors.fromBreeze(vector1)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
import scala.collection.Seq;
import scala.collection.mutable.ArraySeq;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.types.StructType;

public abstract class BaseRow implements Row {
public abstract class BaseRow extends InternalRow {

@Override
final public int length() {
Expand Down Expand Up @@ -176,7 +177,7 @@ public boolean equals(Object other) {
}

@Override
public Row copy() {
public InternalRow copy() {
final int n = size();
Object[] arr = new Object[n];
for (int i = 0; i < n; i++) {
Expand Down
Loading

0 comments on commit c834108

Please sign in to comment.