Here we ran reduce operation on an Array(firstWords) which is not on an RDD. reduceByKey() can only called on an RDD where as reduce() can be called even if the object is not an RDD.+ +### Scala Underscore +Now, let us come back to the reduce operation and see how we can re-write it using underscore( _ ). Here reduce takes two arguments a and b and does a summation. + +{% highlight scala %} +lines.map(line => (line.length)).reduce((a,b) => a + b) +{% endhighlight %} + +Using Scala underscore, this can also be written as + +{% highlight scala %} +lines.map(line => (line.length)).reduce(_ + _) +{% endhighlight %} + +((a,b) => a+b) can be re-written as (_ + _) : here it is implicitly understood that the function takes two parameters and does a "+" on them. + +(line => (line.length)) can be re-written as _.length : map is taking a single parameter- line as input and doing a.length on it, so the _ here becomes the only parameter and _.length finds the length of the line. + +### Word count using flatMap and reduceByKey +One of the difference between flatMap() and map() is that, map should always return a result where as flatMap need not. Have a look at the below examples + +{% highlight scala %} +val lines = sc.parallelize(List("this is line number one","line number two","line number three")) +lines.flatMap(_.split(" ").filter(word => word.contains("this")).map(word => (word,1))).collect() +res83: Array[(String, Int)] = Array((this,1)) +lines.map(_.split(" ").filter(word => word.contains("this")).map(word => (word,1))).collect() +res85: Array[Array[(String, Int)]] = Array(Array((this,1)), Array(), Array()) +lines.map(line => (line.length)).reduce(_ + _) +{% endhighlight %} + +We have three strings and we are doing a filtering based on the content. The result we got from the flatMap after filtering is `Array((this,1))` where as the map operation returned `Array(Array((this,1)), Array(), Array())` -two empty arrays. + +
return type of flatMap and map is an RDD not an array, the above result with array was obtained after calling collect() on the RDD returned by map operations.+ +Another difference between flatMap and map is that, flatMap flattens out the result, i.e., if you are getting an Array of Array of String in map, in flatMap you will get Array of String. See the below example + +{% highlight scala %} +val lines = sc.parallelize(List("line number one","line number two")) +lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at
+ Note: The artifact slf4j-log4j12 has to be excluded from storm-core and kafka_2.9.1 dependency. Otherwise you might get 'multiple SLF4J bindings' exception during execution. ++ +
+ Note: We have to package the jar with all the dependencies except storm-core. It is better to use maven shade plugin rather than maven assembly plugin because the packaging done by assembly plugin may throw exception while submitting the jar to storm. ++ +#### Structure of the project +
Note: We are not creating the client object in the constructor because when a topology is submitted, the bolt object will be serialized and submitted and the class HttpSolrClient is non-serializable. If we initialize HttpSolrClient in the constructor, we will receive java.io.NotSerializableException exception. Where as the method prepare will be called only after the object is deserialized.+{% highlight java %} +public void prepare(Map conf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + this.solrClient = new HttpSolrClient(solrAddress); +} +{% endhighlight %} +2.getSolrInputDocumentForInput + +This method is used for converting a tuple into SolrInputDocument, which is required for indexing the document onto SOLR. +{% highlight java %} +public SolrInputDocument getSolrInputDocumentForInput(Tuple input) { + String content = (String) input.getValueByField("content"); + String[] parts = content.trim().split(" "); + System.out.println("Received in SOLR bolt "+content); + SolrInputDocument document = new SolrInputDocument(); + try { + for(String part : parts) { + String[] subParts = part.split(":"); + String fieldName = subParts[0]; + String value = subParts[1]; + document.addField(fieldName, value); + } + } catch(Exception e) { + + } + return document; +} +{% endhighlight %} +3.execute + +Execute method converts the input Tuple into a SolrInputDocument and sends it to SOLR server by calling commit() +
Note: Ideally, we should not be committing each document, rather we should first buffer the documents and commit only once the buffer reaches a certain threshold. ++{% highlight java %} +public void execute(Tuple input) { + SolrInputDocument document = getSolrInputDocumentForInput(input); + try{ + solrClient.add(document); + solrClient.commit(); + collector.ack(input); + }catch(Exception e) { + } +} +{% endhighlight %} + + +#### MongoDB Bolt +MongodbBolt.java is similar to SolrBolt. It creates an instance of MongoClient using hostname and port, and then it creates an instance of MongoDatabase using ths MongoClient and the database name. Input tuple is converted into `org.bson.Document` by the method `getMongoDocForInput` and is inserted into the collection by + +{% highlight java %} +mongoDB.getCollection(collection).insertOne(mongoDoc) +{% endhighlight %} + +{% highlight java %} +public void execute(Tuple input) { + Document mongoDoc = getMongoDocForInput(input); + try{ + mongoDB.getCollection(collection).insertOne(mongoDoc); + collector.ack(input); + }catch(Exception e) { + e.printStackTrace(); + collector.fail(input); + } +} + +public Document getMongoDocForInput(Tuple input) { + Document doc = new Document(); + String content = (String) input.getValueByField("content"); + String[] parts = content.trim().split(" "); + System.out.println("Received in MongoDB bolt "+content); + try { + for(String part : parts) { + String[] subParts = part.split(":"); + String fieldName = subParts[0]; + String value = subParts[1]; + doc.append(fieldName, value); + } + } catch(Exception e) { + + } + return doc; +} +{% endhighlight %} +
Note: shuffleGrouping is one of the eight stream grouping methods available in Storm (it sends the tuples to bolts in random). Another type of grouping is fieldsGrouping - in fields grouping, the tuples are grouped based on a specified field and the tuples having same value for that field is always sent to the same task. We can also implement custom grouping by implementing the interface CustomStreamGrouping. ++Finally the topology can be submitted by +{% highlight java %} +Config conf = new Config(); +conf.put("solr.zookeeper.hosts",configs.getProperty(Keys.SOLR_ZOOKEEPER_HOSTS)); +String topologyName = configs.getProperty(Keys.TOPOLOGY_NAME); +//Defines how many worker processes have to be created for the topology in the cluster. +conf.setNumWorkers(1); +StormSubmitter.submitTopology(topologyName, conf, builder.createTopology()); +{% endhighlight %} + + +#### **Execution** +For execution, we need to start the below servers + +1. Hadoop servers +2. Solr server +3. Kafka broker +4. Mongod server +5. Storm nimbus +6. Storm supervisor +7. Storm UI (optional) + +Build the jar using the command `mvn clean install`. The command will create your toplogy jar with all the dependencies - `stormkafka-0.0.1-SNAPSHOT.jar`. +Run the jar using the command +{% highlight sh %} +$storm jar stormkafka-0.0.1-SNAPSHOT.jar com.vishnu.storm.Topology +... +768 [main] INFO b.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /Users/vishnu/apache-storm-0.10.0/storm-local/nimbus/inbox/stormjar-be5f5f13-c6d6-456d-b45e-2e7bbf6ba4c8.jar +768 [main] INFO b.s.StormSubmitter - Submitting topology storm-kafka-topology in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-8123923076974561721:-8924677632676109956","topology.workers":1,"solr.zookeeper.hosts":"localhost:2181"} +861 [main] INFO b.s.StormSubmitter - Finished submitting topology: storm-kafka-topology +{% endhighlight %} +where `com.vishnu.storm` is the package name and `Topology` is the class containing the main method. +Open your storm UI at http://localhost:8080/ and verify that job has been deployed correctly. Storm UI provides a very good visualization of the toplogy, you can view it by clicking `your-tolology-name>Show Visualization`. + +
PassengerId | +Survived | +
---|---|
892 | +0 | +
893 | +1 | +
894 | +0 | +
895 | +0 | +
896 | +1 | +
897 | +0 | +
898 | +1 | +
899 | +0 | +
900 | +1 | +
901 | +0 | +
Note that we did not call methods fit() or transform() here, that will be taken care by the Pipeline. Pipeline will execute each stage and pass the result of current stage to the next. If a stage is a Transformer, Pipeline will call transform() on it, or if it is an Estimator, pipeline will first call fit() and then transform(). But if the Estimator is the last stage in a pipeline, then the transform() won't be called. ++ +#### **Binning / Bucketing** + +During Binning/Bukceting, a column with continuous values is converted into buckets. We define the start and end value of each bucket while creating the Bucketizer - *which is a Transformer*. We are going to bucketize the column 'Fare'. + +{% highlight scala %} +//define the buckets/splits +val fareSplits = Array(0.0,10.0,20.0,30.0,40.0,Double.PositiveInfinity) +val fareBucketize = new Bucketizer().setInputCol("Fare").setOutputCol("FareBucketed").setSplits(fareSplits) +fareBucketize.transform(train_data).select("Fare","FareBucketed").show(10) ++-------+------------+ +| Fare|FareBucketed| ++-------+------------+ +| 7.25| 0.0| +|71.2833| 4.0| +| 7.925| 0.0| +| 53.1| 4.0| +| 8.05| 0.0| +| 8.4583| 0.0| +|51.8625| 4.0| +| 21.075| 2.0| +|11.1333| 1.0| +|30.0708| 3.0| ++-------+------------+ +only showing top 10 rows +{% endhighlight %} + +#### **Vector Assembler** +VectorAssembler is used for assembling features into a vector. We will pass all the columns that we are going to use for the prediction to the VectorAssembler and it will create a new vector column. +{% highlight scala %} +val assembler = new VectorAssembler().setInputCols(Array("SexIndex", "Age", "TitleIndex", "Pclass", "Family","FareBucketed")).setOutputCol("features_temp") +{% endhighlight %} + +#### **Normalizer** + +Next we will normalize or standardize the data using the transformer - `Normalizer`. The normalizer will take the column created by the VectorAssembler, normalize it and produce a new column. +{% highlight scala %} +val normalizer = new Normalizer().setInputCol("features_temp").setOutputCol("features") +{% endhighlight %} + +### **Building and Evaluating Model** +
+ Note that the model object here is instance of PipelineModel not LogisticRegression. This is because LogisticRegression is only a component in our PipelineModel. Whenever a prediction is done for a data set, the data set has to go through all the transformations done by other components in the Pipeline before it can be used by the LogisticRegression component for prediction. ++To evaluate how well the model did, select the columns 'prediction' and 'Survived' from `result`, create an RDD of [(Double, Double)] and pass it on to BinaryClassificationMetrics. +{% highlight scala %} +result = result.select("prediction","Survived") +val predictionAndLabels = result.map { row => + (row.get(0).asInstanceOf[Double],row.get(1).asInstanceOf[Double]) + } +val metrics = new BinaryClassificationMetrics(predictionAndLabels) +println("Area under ROC = " + metrics.areaUnderROC()) +Area under ROC = 0.7757266300078556 +{% endhighlight %} + +Which is not bad, check this [link](http://gim.unmc.edu/dxtests/roc3.htm) to read more about how to evaluate the model based on the value of area under ROC curve. + +The prediction that we did now, was on our input data where we knew the actual classification. The reason why split the data into train and test set is because we needed to compare actual result with predicted result for evaluating the model. Now will use the entire input data to train the model again. +{% highlight scala %} +model = pipeline.fit(train_data) +{% endhighlight %} + +### **Doing the Prediction** +
Note : Received a score of 0.77512 on submitting this to Kaggle.+ +This concludes the post and I hope it was helpful. Thanks for reading. +
RDDs can also be thought of as a set of instructions that has to be executed, first instruction being the load instruction.+ +### **Caching** +You can cache an RDD in memory by calling `rdd.cache()`. When you cache an RDD, it's Partitions are loaded into memory of the nodes that hold it. +
In the above method, the RDDs are not serialized before saving to Memory, there are two other StorageLevels - MEMORY_ONLY_SER and MEMORY_AND_DISK_SER, which will store the RDDs as serialized java objects. ++There are a few more StorageLevels which I did not mention here, you can find more details about it [here](http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence) + +### **Broadcast variables** +A broadcast variable, is a type of shared variable, used for broadcasting data across the cluster. Hadoop MapReduce users can relate this to distributed cache. Let us first understand why we need a broadcast variable. Take a look at the below example, where *names* is joined with *addresses*. +{% highlight scala %} +val names = sc.textFile("/names").map(line => (line.split(",")(3),line)) +val addresses = sc.textFile("/address").map(line=>(line.split(",")(0),line)) +names.join(addresses) +{% endhighlight %} +Here, both names and addresses will be shuffled over the network for performing the join which is not efficient since any data transfer over the network will reduce the execution speed. +
Broadcast variables are read-only, broadcast.value is an immutable object+Spark uses BitTorrent like protocol for sending the broadcast variable across the cluster, i.e., for each variable that has to be broadcasted, initially the driver will act as the only source. The data will be split into blocks at the driver and each leecher *(receiver)* will start fetching the block to it's local directory. Once a block is completely received, then that leecher will also act as a source for this block for the rest of the leechers *(This reduces the load at the machine running driver).* This is continued for rest of the blocks. So initially, only the driver is the source and later on the number of sources increases - because of this, rate at which the blocks are fetched by a node increases over time. +
Note 1: I am assuming that we are receiving the same word in the stream. This is done to make the explanation simple. Since there is a keyBy(0) after map, each word will belong to separate logical window grouped by the word.+ +
Note 2: The sliding window used in this example is based on Processing time. Processing time is the time at which an event is processed in the system compared to EventTime which is the time at which event was created. I will be explaining these concepts in the upcoming blogs.+**Update** : Read about the concept of ProcesingTime and EventTime from this [blog](flink_eventtime.html).
Note : All the commands used in the blog post can be found here+{% highlight scala %} +spark.read. //pressed tab here +csv format jdbc json load option options orc parquet schema stream table text +//Load some json file +val df = spark.read.json("/spark_learning/pandainfo.json") +df.show ++--------------------+-----------+---------------+ +| knows|lovesPandas| name| ++--------------------+-----------+---------------+ +| null| true|Sparky The Bear| +| null| null| Holden| +|[WrappedArray(hol...| true|Sparky The Bear| ++--------------------+-----------+---------------+ +{% endhighlight %} +
Note: I am using the dataset from learning-spark github repository.+Let us now register this Dataframe as a temp table. +{% highlight scala %} +df.registerTempTable("pandas") +warning: there was one deprecation warning; re-run with -deprecation for details +{% endhighlight %} +It looks like `registerTempTable` method is deprecated. Let's check [Dataset.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2692) to figure out which alternate method to use. +
+ It is an important aspect to understand that the messages should contain the information on when it was generated. Flink or any other system is not a magic box that can somehow figure this out by itself. Later we will see that, Event Time processing extracts this timestamp information to handle late messages. ++{% highlight scala %} +val text = senv.socketTextStream("localhost", 9999) +val counts = text.map {(m: String) => (m.split(",")(0), 1) } + .keyBy(0) + .timeWindow(Time.seconds(10), Time.seconds(5)) + .sum(1) +counts.print +senv.execute("ProcessingTime processing example") +{% endhighlight %} + +#### **Case 1: Messages arrive without delay** +Suppose the source generated three messages of the type **a** at times 13th second, 13th second and 16th second respectively. (Hours and minutes are not important here since the window size is only 10 seconds). +
Note that in window 2, the delayed message was still placed at 19th second, not at 13th second(it's event time). This depiction in the figure was intentional to indicate that the messages within a window are not sorted according to it's event time. (this might change in future)+ +## **Watermarks** +Watermarks is a very important and interesting idea and I will try to give you a brief overview about it. If you are interested in learning more, you can watch this awesome [talk](https://www.youtube.com/watch?v=3UfZN59Nsk8) from Google and also read this [blog](http://data-artisans.com/how-apache-flink-enables-new-streaming-applications-part-1/) from dataArtisans. A Watermark is essentially a timestamp. When an Operator in Flink receives a watermark, it understands(assumes) that it is not going to see any message older than that timestamp. Hence watermark can also be thought of as a way of telling Flink how far it is, in the "EventTime". + +For the purpose of this example, think of it as a way of telling Flink how much delayed a message can be. In the last attempt, we set the watermark as the current system time. It was, therefore, not expecting any delayed messages. We will now set the watermark as **current time - 5 seconds**, which tells Flink to expect messages to be a maximum of 5 seconds dealy - This is because each window will be evaluated only when the watermark passes through it. Since our watermark is current time - 5 seconds, the first window [5s-15s] will be evaluated only at 20th second. Similarly the window [10s-20s] will be evaluated at 25th second and so on. + +{% highlight scala %} +override def getCurrentWatermark(): Watermark = { + new Watermark(System.currentTimeMillis - 5000) + } +{% endhighlight %} +
Here we are assuming that the eventtime is 5 seconds older than the current system time, but that is not always the case. In many cases it will be better to hold the max timestamp received so far(which is extracted from the message) and subtract the expected delay from it.+The result of running the code after making above changes is: +
Note that all messages in Kafka should have a Key and a Value. If we do not pass a key during ingestion through KafkaConsoleProducer, it will be null.+ +### **branch** +Branch creates multiple branches from a single stream. It takes in Varargs of Predicates and produces a KStream of each Predicate. Each element in the source KStream is applied against each Predicate and the element is assigned to the KStream corresponding to the first Predicate that it matches. In our example, we will create two predicates one for highHumidity and the other for lowTemp. + +{% highlight scala %} +//define the predicates to split the stream into branches +val highHumidty = new Predicate[String, ClimateLog] { + override def test(t: String, c: ClimateLog): Boolean = c.humidity > 50 +} +val lowTemp = new Predicate[String, ClimateLog] { + override def test(t: String, c: ClimateLog): Boolean = c.temperature < 0 +} +//array of streams for each predicate +val branches = climateLogStream.branch(highHumidty, lowTemp) +{% endhighlight %} + + +### **through** +Through persists the messages from a KStream to the given topic and creates a new KStream from that topic. This can be used if you want the intermediate result from the application to be made available to other application, but at the same time use the stream further downstream in the current application. We will persist lowTemp stream and highHumidity stream to 2 new topics - low_temp and high_humidity. + +{% highlight scala %} +val highHumidityStream = branches(0).through(new Serdes.StringSerde, new ClimateLogSerDe, "high_humidity") +val lowTempStream = branches(1).through(new Serdes.StringSerde, new ClimateLogSerDe, "low_temp") +{% endhighlight %} + +Note that the Value serializer is a custom Kryo based serializer for ClimateLog, which we will be creating next. + +### **kryo serializer** +The serializer needs to implement `org.apache.kafka.common.serialization.Serde`. *Serde* has mainly two methods - serializer() and deserializer() which return instance of Serializer and Deserializer. Kafka expects this class to have an empty constructor. So, we will create a class ClimateLogSerDe which extends ClimatelogWrappedSerde class, which takes the Serializer and Deserializer as arguments in it's constructor. We also create ClimateLogSerializer and ClimateLogDeserializer which uses ClimateLogKryoSerDe as default serializer. The implementation is bit lengthy, please check the [github page](https://github.com/soniclavier/hadoop_datascience/blob/master/KafkaStreams/src/main/scala-2.11/com/vishnuviswanath/kafka/streams/ClimateLogStream.scala#L124-L194) for complete code. + +### **selectKey** +The streams we have till now does not have a key (assuming you are using KafkaConsoleProducer and is not passing a key). *selectKey* selects a key using the KeyValueMapper provided and creates a new stream from the existing stream. We create two streams from highHumdityStream and lowTempStream by choosing *value.country* as the key. + +{% highlight scala %} +val keyedHighHumStream: KStream[String, ClimateLog] = highHumidityStream.selectKey(new KeyValueMapper[String, ClimateLog, String] { + override def apply(key: String, value: ClimateLog): String = value.country +}) + +val keyedLowTempStream: KStream[String, ClimateLog] = lowTempStream.selectKey(new KeyValueMapper[String, ClimateLog, String] { + override def apply(key: String, value: ClimateLog): String = value.country +}) +{% endhighlight %} + +### **join** +Next, we join the highHumidity stream and lowTemperature stream to create a new stream called warnings. The two streams will be joined based on the key - which in this case is the country. We should also define a join window, +{% highlight scala %} +//create a join window. This window joins all the elements of the same key if the difference between their timestamps is within 60 seconds +val joinWindow = JoinWindows.of(60 * 1000) +{% endhighlight %} +Now join the streams using a ValueJoiner. A ValueJoiner defines what should be done when we find two values for the same key. In this example, we simply merge these two values by getting the temperature from low temp stream and humidity from high humidity stream. +{% highlight scala %} +val warningsStream: KStream[String, String] = keyedHighHumStream.join[ClimateLog, String]( + keyedLowTempStream, + new ValueJoiner[ClimateLog, ClimateLog, String] { + override def apply(value1: ClimateLog, value2: ClimateLog): String = value2.copy(humidity = value1.humidity).toString + }, + joinWindow) +{% endhighlight %} + +Finally, we store the warningsStream to another topic called "warnings", and then start the stream. + +{% highlight scala %} +warningsStream.to(new Serdes.StringSerde, new Serdes.StringSerde, "warnings") + +val streams = new KafkaStreams(kstreamBuilder, settings) +streams.start +{% endhighlight %} + +We have already seen how to submit the job, how to create the topics(climate_events, high_humidity, low_temp, warnings) and how to send the message to these topics in the previous [blog post](hello-kafka-streams#hello-kafka-streams), so I am not going to bore you with the same details :) + +To summarize, we saw how to use various KafkaStreams APIs such as - branch, through, selectKey, join. We also created a custom serializer using Kryo. Hope this was useful and Thanks for reading! +