forked from TheClimateCorporation/clj-spark
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
10 changed files
with
618 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
/target | ||
/lib | ||
/classes | ||
/checkouts | ||
pom.xml | ||
*.jar | ||
*.class | ||
.lein-deps-sum | ||
.lein-failures | ||
.lein-plugins |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,148 @@ | ||
clj-spark | ||
========= | ||
|
||
A Clojure api for the the Spark project (a fast, open source cluster computing system). | ||
A Clojure api for the Spark Project. It is useable, but not complete. I | ||
provide it as a starting point. It should be simple enough to add the | ||
additional wrappers as you need them. This is the result of about two weeks of | ||
work. | ||
|
||
It handles many of the initial problems like serializing anonymous functions, | ||
converting back and forth between Scala Tuples and Clojure seqs, and | ||
converting RDDs to PairRDDs. | ||
|
||
### Example usage | ||
|
||
There is a complete sample program in src/clj_spark/examples/query.clj. To run | ||
it, clone this repo and cd into it. You will need Leiningen 2 installed | ||
(assuming this is available on your PATH as lein2): | ||
|
||
$ git clone ... | ||
$ cd clj-spark | ||
$ lein2 deps | ||
$ lein2 compile | ||
$ lein2 run | ||
Compiling clj-spark.api | ||
2013-01-02 13:18:41.477 java[65466:1903] Unable to load realm mapping info from | ||
SCDynamicStore | ||
============== | ||
Premium Per State | ||
NY 600.0 | ||
|
||
============== | ||
TOP100 | ||
#{1 2} | ||
|
||
============== | ||
CTE Per State | ||
NY 70.0 | ||
|
||
============== | ||
TOP100PERSTATE | ||
{NY #{1 2}} | ||
|
||
============== | ||
Standalone CTE Per State | ||
NY 70.0 | ||
============== | ||
|
||
The following are subsections copied from query.clj: | ||
|
||
Here is a sample of creating an RDD: | ||
|
||
(-> (.textFile sc testfile) | ||
(k/map k/csv-split) | ||
; _,policy-id,field-id,_,_,element-id,element-value | ||
(k/map (k/feach identity as-integer as-long identity identity as-integer as-double)) | ||
(k/map (juxt (k/fchoose 1 2) (k/fchoose 5 6))) ; [ [policy-id field-id] [element-id element-value] ] | ||
k/cache) | ||
|
||
And a sample query on that data: | ||
|
||
(-> input-rdd | ||
(k/map second) ; [element-id element-value] | ||
(k/reduce-by-key +) ; [element-id total] | ||
(k/map (k/fchoose 1 0)) ; [total element-id] | ||
(k/sort-by-key false) ; desc | ||
(k/map second) ; element-id | ||
(k/take 2) ; TODO n=100 | ||
set) | ||
|
||
Running queries from the REPL | ||
========== | ||
|
||
You can also start a repl and play around: | ||
|
||
# assuming you already did deps and compile above. | ||
lein2 repl | ||
|
||
; deleted results to be more concise | ||
user=> (use 'serializable.fn 'clj-spark.util) | ||
user=> (require '[clj-spark.api :as k]) | ||
user=> (def sc (k/spark-context :master "local" :job-name "Simple Job")) | ||
user=> (def r1 (k/parallelize sc [10 20 25 30 35])) | ||
user=> (def r2 (k/text-file sc "test/resources/input.csv")) | ||
|
||
user=> (k/count r2) | ||
5 | ||
user=> (def result (-> r1 (k/map inc) (k/map (fn [t] [(even? t) t])) (k/reduce-by-key +))) | ||
#'clj-spark.examples.query/result | ||
user=> (k/collect result) | ||
#<ArrayList [[false 63], [true 62]]> | ||
; or, all in one step: | ||
user=> (-> r1 (k/map inc) (k/map (fn [t] [(even? t) t])) (k/reduce-by-key +) k/collect) | ||
#<ArrayList [[false 63], [true 62]]> | ||
|
||
Other clojure apis | ||
========== | ||
|
||
After working on this, I found another Clojure API for Spark project: https://github.com/markhamstra/spark/tree/master/cljspark | ||
|
||
It's a bit more complete, but no examples. You might find good ideas in both projects. | ||
|
||
|
||
What is Spark | ||
========== | ||
|
||
From http://spark-project.org/docs/latest/index.html | ||
|
||
Spark is a MapReduce-like cluster computing framework designed for low-latency iterative jobs and interactive use from an interpreter. It provides clean, language-integrated APIs in Scala and Java, with a rich array of parallel operators. Spark can run on top of the Apache Mesos cluster manager, Hadoop YARN, Amazon EC2, or without an independent resource manager (“standalone mode”). | ||
|
||
Known Issues | ||
========== | ||
|
||
## Function serialization | ||
|
||
You must create your anonymous functions using serializable.fn/fn, as in: | ||
|
||
(ns ... | ||
(:require \[serializable.fn :as sfn\])) | ||
|
||
(sfn/fn my-inc \[x\] (+ x 1)) | ||
|
||
Do not use clojure.core/fn or #(). This is necessary because the anonymous function must be serialized so it can be passed around to distributed tasks. | ||
|
||
## AOT compilation | ||
|
||
Generally speaking, any functions that are used in the Spark calls will need to be part of AOT compiled namespaces. I.e. they need to be compiled or the distributed Spark tasks will not be able to find them. In some cases, compiling on the fly might work also: | ||
|
||
(compile 'your-namespace) | ||
|
||
But you need to do this somewhere where it will be executed for each task. | ||
|
||
NOTE: This should be avoidable using the serializable.fn as above, but I did not get that to work in my initial attempts. | ||
|
||
## None of the Double* method are implemented | ||
|
||
The Spark Java API provides versions of some methods that accept or return Doubles. E.g. (copied from Spark docs, using Scala syntax): | ||
|
||
def map[R](f: DoubleFunction[T]): JavaDoubleRDD | ||
|
||
So class DoubleFunction<T> has a function type of T => Double | ||
|
||
Compare this to the standard: | ||
|
||
def map[R](f: Function[T, R]): JavaRDD[R] | ||
|
||
Where Function<T, R> has type T => R | ||
|
||
I didn't wrap any of these. To be honest, I don't see why they are needed. Maybe I'm missing something or maybe it just doesn't matter when called from a dynamically typed language like Clojure. Instead of DoubleFunction[T], just use Function<T, Double> which has type T => Double. I don't see why this wouldn't work, but interested to know if there is a case where this fails or is sub-optimal. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
# log4j config for clojure development | ||
log4j.rootLogger=warn, stdout | ||
|
||
# Console appender | ||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender | ||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout | ||
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %5p %c{1}:%L - %m%n |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
(defproject clj-spark/clj-spark "0.1.0-SNAPSHOT" | ||
|
||
:min-lein-version "2.0.0" | ||
|
||
:license {:name "Eclipse Public License" :url "http://www.eclipse.org/legal/epl-v10.html"} | ||
:description "Clojure API wrapper on the Spark project (http://spark-project.org/)" | ||
:url "https://github.com/TheClimateCorporation/clj-spark" | ||
|
||
:jvm-opts ["-Dlog4j.configuration=file:log4j.properties"] | ||
|
||
:dependencies [[org.clojure/clojure "1.4.0"] | ||
[org.clojure/tools.logging "0.2.3"] | ||
[org.clojure/tools.cli "0.2.1"] | ||
[org.clojars.mlimotte/serializable-fn "0.0.3"] | ||
[org.spark-project/spark-core_2.9.2 "0.6.1"]] | ||
|
||
:aot [clj-spark.spark.functions | ||
clj-spark.api | ||
weatherbill.query | ||
clj-spark.examples.query] | ||
|
||
:main clj-spark.examples.query) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
(ns clj-spark.api | ||
(:use clj-spark.spark.functions) | ||
(:refer-clojure :exclude [map reduce count filter first take distinct]) | ||
(:require | ||
[serializable.fn :as sfn] | ||
[clojure.string :as s] | ||
[clj-spark.util :as util]) | ||
(:import | ||
java.util.Comparator | ||
spark.api.java.JavaSparkContext)) | ||
|
||
; Helpers | ||
|
||
(defn spark-context | ||
[& {:keys [master job-name]}] | ||
;JavaSparkContext(master: String, jobName: String, sparkHome: String, jars: Array[String], environment: Map[String, String]) | ||
(JavaSparkContext. master job-name)) | ||
|
||
(defn- untuple | ||
[t] | ||
[(._1 t) (._2 t)]) | ||
|
||
(defn- double-untuple | ||
"Convert (k, (v, w)) to [k [v w]]." | ||
[t] | ||
(let [[x t2] (untuple t)] | ||
(vector x (untuple t2)))) | ||
|
||
(def csv-split util/csv-split) | ||
|
||
(defn ftopn | ||
"Return a fn that takes (key, values), sorts the values in DESC order, | ||
and takes the top N values. Returns (k, top-values)." | ||
[n] | ||
(fn [[k values]] | ||
(vector k (->> values (sort util/rcompare) (clojure.core/take n))))) | ||
|
||
(defn fchoose | ||
[& indices] | ||
(fn [coll] | ||
(util/choose coll indices))) | ||
|
||
(defn ftruthy? | ||
[f] | ||
(sfn/fn [x] (util/truthy? (f x)))) | ||
|
||
(defn feach | ||
"Mostly useful for parsing a seq of Strings to their respective types. Example | ||
(k/map (k/feach as-integer as-long identity identity as-integer as-double)) | ||
Implies that each entry in the RDD is a sequence of 6 things. The first element should be | ||
parsed as an Integer, the second as a Long, etc. The actual functions supplied here can be | ||
any arbitray transformation (e.g. identity)." | ||
[& fs] | ||
(fn [coll] | ||
(clojure.core/map (fn [f x] (f x)) fs coll))) | ||
|
||
; RDD construction | ||
|
||
(defn text-file | ||
[spark-context filename] | ||
(.textFile spark-context filename)) | ||
|
||
(defn parallelize | ||
[spark-context lst] | ||
(.parallelize spark-context lst)) | ||
|
||
; Transformations | ||
|
||
(defn echo-types | ||
; TODO make this recursive | ||
[c] | ||
(if (coll? c) | ||
(println "TYPES" (clojure.core/map type c)) | ||
(println "TYPES" (type c))) | ||
c) | ||
|
||
(defn trace | ||
[msg] | ||
(fn [x] | ||
(prn "TRACE" msg x) | ||
x)) | ||
|
||
(defn map | ||
[rdd f] | ||
(.map rdd (function f))) | ||
|
||
(defn reduce | ||
[rdd f] | ||
(.reduce rdd (function2 f))) | ||
|
||
(defn flat-map | ||
[rdd f] | ||
(.map rdd (flat-map-function f))) | ||
|
||
(defn filter | ||
[rdd f] | ||
(.filter rdd (function (ftruthy? f)))) | ||
|
||
(defn foreach | ||
[rdd f] | ||
(.foreach rdd (void-function f))) | ||
|
||
(defn aggregate | ||
[rdd zero-value seq-op comb-op] | ||
(.aggregate rdd zero-value (function2 seq-op) (function2 comb-op))) | ||
|
||
(defn fold | ||
[rdd zero-value f] | ||
(.fold rdd zero-value (function2 f))) | ||
|
||
(defn reduce-by-key | ||
[rdd f] | ||
(-> rdd | ||
(.map (pair-function identity)) | ||
(.reduceByKey (function2 f)) | ||
(.map (function untuple)))) | ||
|
||
(defn group-by-key | ||
[rdd] | ||
(-> rdd | ||
(.map (pair-function identity)) | ||
.groupByKey | ||
(.map (function untuple)))) | ||
|
||
(defn sort-by-key | ||
([rdd] | ||
(sort-by-key rdd compare true)) | ||
([rdd x] | ||
; Note: RDD has a .sortByKey signature with just a Boolean arg, but it doesn't | ||
; seem to work when I try it, bool is ignored. | ||
(if (instance? Boolean x) | ||
(sort-by-key rdd compare x) | ||
(sort-by-key rdd x true))) | ||
([rdd compare-fn asc?] | ||
(-> rdd | ||
(.map (pair-function identity)) | ||
(.sortByKey | ||
(if (instance? Comparator compare-fn) | ||
compare-fn | ||
(comparator compare-fn)) | ||
(util/truthy? asc?)) | ||
(.map (function untuple))))) | ||
|
||
(defn join | ||
[rdd other] | ||
(-> rdd | ||
(.map (pair-function identity)) | ||
(.join (.map other (pair-function identity))) | ||
(.map (function double-untuple)))) | ||
|
||
; Actions | ||
|
||
(def first (memfn first)) | ||
|
||
(def count (memfn count)) | ||
|
||
(def glom (memfn glom)) | ||
|
||
(def cache (memfn cache)) | ||
|
||
(def collect (memfn collect)) | ||
|
||
; take defined with memfn fails with an ArityException, so doing this instead: | ||
(defn take | ||
[rdd cnt] | ||
(.take rdd cnt)) | ||
|
||
(def distinct (memfn distinct)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
(ns clj-spark.examples.extra | ||
"This namespace simulates getting a dataset from a distinct source. This could be | ||
a SQL query, for example.") | ||
|
||
(defn get-data | ||
"Returns a result set as a seq of Maps. Similar to a result set acquired by clojure.data.jdbc." | ||
[] | ||
(map (partial zipmap [:policy_id :field_id :state :policy_premium :acres]) | ||
[[(int 1) 10 "NY" 100.0 2] | ||
[(int 1) 20 "NY" 200.0 2] | ||
[(int 2) 10 "CT" 300.0 2] | ||
[(int 2) 11 "CT" 400.0 2]])) |
Oops, something went wrong.