diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ee508f7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +/target +/lib +/classes +/checkouts +pom.xml +*.jar +*.class +.lein-deps-sum +.lein-failures +.lein-plugins diff --git a/README.md b/README.md index 14ae50d..2624e59 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,148 @@ clj-spark ========= -A Clojure api for the the Spark project (a fast, open source cluster computing system). \ No newline at end of file +A Clojure api for the Spark Project. It is useable, but not complete. I +provide it as a starting point. It should be simple enough to add the +additional wrappers as you need them. This is the result of about two weeks of +work. + +It handles many of the initial problems like serializing anonymous functions, +converting back and forth between Scala Tuples and Clojure seqs, and +converting RDDs to PairRDDs. + +### Example usage + +There is a complete sample program in src/clj_spark/examples/query.clj. To run +it, clone this repo and cd into it. You will need Leiningen 2 installed +(assuming this is available on your PATH as lein2): + +$ git clone ... +$ cd clj-spark +$ lein2 deps +$ lein2 compile +$ lein2 run + Compiling clj-spark.api + 2013-01-02 13:18:41.477 java[65466:1903] Unable to load realm mapping info from + SCDynamicStore + ============== + Premium Per State + NY 600.0 + + ============== + TOP100 + #{1 2} + + ============== + CTE Per State + NY 70.0 + + ============== + TOP100PERSTATE + {NY #{1 2}} + + ============== + Standalone CTE Per State + NY 70.0 + ============== + +The following are subsections copied from query.clj: + +Here is a sample of creating an RDD: + + (-> (.textFile sc testfile) + (k/map k/csv-split) + ; _,policy-id,field-id,_,_,element-id,element-value + (k/map (k/feach identity as-integer as-long identity identity as-integer as-double)) + (k/map (juxt (k/fchoose 1 2) (k/fchoose 5 6))) ; [ [policy-id field-id] [element-id element-value] ] + k/cache) + +And a sample query on that data: + + (-> input-rdd + (k/map second) ; [element-id element-value] + (k/reduce-by-key +) ; [element-id total] + (k/map (k/fchoose 1 0)) ; [total element-id] + (k/sort-by-key false) ; desc + (k/map second) ; element-id + (k/take 2) ; TODO n=100 + set) + +Running queries from the REPL +========== + +You can also start a repl and play around: + +# assuming you already did deps and compile above. +lein2 repl + +; deleted results to be more concise +user=> (use 'serializable.fn 'clj-spark.util) +user=> (require '[clj-spark.api :as k]) +user=> (def sc (k/spark-context :master "local" :job-name "Simple Job")) +user=> (def r1 (k/parallelize sc [10 20 25 30 35])) +user=> (def r2 (k/text-file sc "test/resources/input.csv")) + +user=> (k/count r2) +5 +user=> (def result (-> r1 (k/map inc) (k/map (fn [t] [(even? t) t])) (k/reduce-by-key +))) +#'clj-spark.examples.query/result +user=> (k/collect result) +# +; or, all in one step: +user=> (-> r1 (k/map inc) (k/map (fn [t] [(even? t) t])) (k/reduce-by-key +) k/collect) +# + +Other clojure apis +========== + +After working on this, I found another Clojure API for Spark project: https://github.com/markhamstra/spark/tree/master/cljspark + +It's a bit more complete, but no examples. You might find good ideas in both projects. + + +What is Spark +========== + +From http://spark-project.org/docs/latest/index.html + +Spark is a MapReduce-like cluster computing framework designed for low-latency iterative jobs and interactive use from an interpreter. It provides clean, language-integrated APIs in Scala and Java, with a rich array of parallel operators. Spark can run on top of the Apache Mesos cluster manager, Hadoop YARN, Amazon EC2, or without an independent resource manager (“standalone mode”). + +Known Issues +========== + +## Function serialization + +You must create your anonymous functions using serializable.fn/fn, as in: + +(ns ... + (:require \[serializable.fn :as sfn\])) + +(sfn/fn my-inc \[x\] (+ x 1)) + +Do not use clojure.core/fn or #(). This is necessary because the anonymous function must be serialized so it can be passed around to distributed tasks. + +## AOT compilation + +Generally speaking, any functions that are used in the Spark calls will need to be part of AOT compiled namespaces. I.e. they need to be compiled or the distributed Spark tasks will not be able to find them. In some cases, compiling on the fly might work also: + + (compile 'your-namespace) + +But you need to do this somewhere where it will be executed for each task. + +NOTE: This should be avoidable using the serializable.fn as above, but I did not get that to work in my initial attempts. + +## None of the Double* method are implemented + +The Spark Java API provides versions of some methods that accept or return Doubles. E.g. (copied from Spark docs, using Scala syntax): + +def map[R](f: DoubleFunction[T]): JavaDoubleRDD + +So class DoubleFunction has a function type of T => Double + +Compare this to the standard: + +def map[R](f: Function[T, R]): JavaRDD[R] + +Where Function has type T => R + +I didn't wrap any of these. To be honest, I don't see why they are needed. Maybe I'm missing something or maybe it just doesn't matter when called from a dynamically typed language like Clojure. Instead of DoubleFunction[T], just use Function which has type T => Double. I don't see why this wouldn't work, but interested to know if there is a case where this fails or is sub-optimal. diff --git a/log4j.properties b/log4j.properties new file mode 100644 index 0000000..1ab1b16 --- /dev/null +++ b/log4j.properties @@ -0,0 +1,7 @@ +# log4j config for clojure development +log4j.rootLogger=warn, stdout + +# Console appender +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %5p %c{1}:%L - %m%n diff --git a/project.clj b/project.clj new file mode 100644 index 0000000..71db961 --- /dev/null +++ b/project.clj @@ -0,0 +1,22 @@ +(defproject clj-spark/clj-spark "0.1.0-SNAPSHOT" + + :min-lein-version "2.0.0" + + :license {:name "Eclipse Public License" :url "http://www.eclipse.org/legal/epl-v10.html"} + :description "Clojure API wrapper on the Spark project (http://spark-project.org/)" + :url "https://github.com/TheClimateCorporation/clj-spark" + + :jvm-opts ["-Dlog4j.configuration=file:log4j.properties"] + + :dependencies [[org.clojure/clojure "1.4.0"] + [org.clojure/tools.logging "0.2.3"] + [org.clojure/tools.cli "0.2.1"] + [org.clojars.mlimotte/serializable-fn "0.0.3"] + [org.spark-project/spark-core_2.9.2 "0.6.1"]] + + :aot [clj-spark.spark.functions + clj-spark.api + weatherbill.query + clj-spark.examples.query] + + :main clj-spark.examples.query) diff --git a/src/clj_spark/api.clj b/src/clj_spark/api.clj new file mode 100644 index 0000000..219688a --- /dev/null +++ b/src/clj_spark/api.clj @@ -0,0 +1,168 @@ +(ns clj-spark.api + (:use clj-spark.spark.functions) + (:refer-clojure :exclude [map reduce count filter first take distinct]) + (:require + [serializable.fn :as sfn] + [clojure.string :as s] + [clj-spark.util :as util]) + (:import + java.util.Comparator + spark.api.java.JavaSparkContext)) + +; Helpers + +(defn spark-context + [& {:keys [master job-name]}] + ;JavaSparkContext(master: String, jobName: String, sparkHome: String, jars: Array[String], environment: Map[String, String]) + (JavaSparkContext. master job-name)) + +(defn- untuple + [t] + [(._1 t) (._2 t)]) + +(defn- double-untuple + "Convert (k, (v, w)) to [k [v w]]." + [t] + (let [[x t2] (untuple t)] + (vector x (untuple t2)))) + +(def csv-split util/csv-split) + +(defn ftopn + "Return a fn that takes (key, values), sorts the values in DESC order, + and takes the top N values. Returns (k, top-values)." + [n] + (fn [[k values]] + (vector k (->> values (sort util/rcompare) (clojure.core/take n))))) + +(defn fchoose + [& indices] + (fn [coll] + (util/choose coll indices))) + +(defn ftruthy? + [f] + (sfn/fn [x] (util/truthy? (f x)))) + +(defn feach + "Mostly useful for parsing a seq of Strings to their respective types. Example + (k/map (k/feach as-integer as-long identity identity as-integer as-double)) + Implies that each entry in the RDD is a sequence of 6 things. The first element should be + parsed as an Integer, the second as a Long, etc. The actual functions supplied here can be + any arbitray transformation (e.g. identity)." + [& fs] + (fn [coll] + (clojure.core/map (fn [f x] (f x)) fs coll))) + +; RDD construction + +(defn text-file + [spark-context filename] + (.textFile spark-context filename)) + +(defn parallelize + [spark-context lst] + (.parallelize spark-context lst)) + +; Transformations + +(defn echo-types + ; TODO make this recursive + [c] + (if (coll? c) + (println "TYPES" (clojure.core/map type c)) + (println "TYPES" (type c))) + c) + +(defn trace + [msg] + (fn [x] + (prn "TRACE" msg x) + x)) + +(defn map + [rdd f] + (.map rdd (function f))) + +(defn reduce + [rdd f] + (.reduce rdd (function2 f))) + +(defn flat-map + [rdd f] + (.map rdd (flat-map-function f))) + +(defn filter + [rdd f] + (.filter rdd (function (ftruthy? f)))) + +(defn foreach + [rdd f] + (.foreach rdd (void-function f))) + +(defn aggregate + [rdd zero-value seq-op comb-op] + (.aggregate rdd zero-value (function2 seq-op) (function2 comb-op))) + +(defn fold + [rdd zero-value f] + (.fold rdd zero-value (function2 f))) + +(defn reduce-by-key + [rdd f] + (-> rdd + (.map (pair-function identity)) + (.reduceByKey (function2 f)) + (.map (function untuple)))) + +(defn group-by-key + [rdd] + (-> rdd + (.map (pair-function identity)) + .groupByKey + (.map (function untuple)))) + +(defn sort-by-key + ([rdd] + (sort-by-key rdd compare true)) + ([rdd x] + ; Note: RDD has a .sortByKey signature with just a Boolean arg, but it doesn't + ; seem to work when I try it, bool is ignored. + (if (instance? Boolean x) + (sort-by-key rdd compare x) + (sort-by-key rdd x true))) + ([rdd compare-fn asc?] + (-> rdd + (.map (pair-function identity)) + (.sortByKey + (if (instance? Comparator compare-fn) + compare-fn + (comparator compare-fn)) + (util/truthy? asc?)) + (.map (function untuple))))) + +(defn join + [rdd other] + (-> rdd + (.map (pair-function identity)) + (.join (.map other (pair-function identity))) + (.map (function double-untuple)))) + +; Actions + +(def first (memfn first)) + +(def count (memfn count)) + +(def glom (memfn glom)) + +(def cache (memfn cache)) + +(def collect (memfn collect)) + +; take defined with memfn fails with an ArityException, so doing this instead: +(defn take + [rdd cnt] + (.take rdd cnt)) + +(def distinct (memfn distinct)) diff --git a/src/clj_spark/examples/extra.clj b/src/clj_spark/examples/extra.clj new file mode 100644 index 0000000..d05cf0a --- /dev/null +++ b/src/clj_spark/examples/extra.clj @@ -0,0 +1,12 @@ +(ns clj-spark.examples.extra + "This namespace simulates getting a dataset from a distinct source. This could be + a SQL query, for example.") + +(defn get-data + "Returns a result set as a seq of Maps. Similar to a result set acquired by clojure.data.jdbc." + [] + (map (partial zipmap [:policy_id :field_id :state :policy_premium :acres]) + [[(int 1) 10 "NY" 100.0 2] + [(int 1) 20 "NY" 200.0 2] + [(int 2) 10 "CT" 300.0 2] + [(int 2) 11 "CT" 400.0 2]])) diff --git a/src/clj_spark/examples/query.clj b/src/clj_spark/examples/query.clj new file mode 100644 index 0000000..d1d0505 --- /dev/null +++ b/src/clj_spark/examples/query.clj @@ -0,0 +1,138 @@ +(ns clj-spark.examples.query + (:refer-clojure :exclude [fn]) + (:use + serializable.fn + clj-spark.util) + (:require + [clj-spark.api :as k] + [clj-spark.examples.extra :as extra])) + +; TODO get this from an ENVIRONEMNT variable +(def spark-home "/usr/local/spark-0.6.1") + +(def testfile "test/resources/input.csv") + +(defn -main + [& args] + (let [sc + (k/spark-context :master "local" :job-name "Simple Job") + + extra-rdd + (->> (extra/get-data) + list* + (k/parallelize sc) + k/cache) + + input-rdd + (-> (.textFile sc testfile) + (k/map k/csv-split) + ; _,policy-id,field-id,_,_,element-id,element-value + (k/map (k/feach identity as-integer as-long identity identity as-integer as-double)) + (k/map (juxt (k/fchoose 1 2) (k/fchoose 5 6))) ; [ [policy-id field-id] [element-id element-value] ] + k/cache) + + premium-per-state + (-> input-rdd + (k/map first) ; [policy-id field-id] + k/distinct + + ; Need the 1, b/c there must be some data values to join + (k/map #(vector % 1)) ; [[policy-id field-id] 1] + + (k/join (-> extra-rdd + (k/map (fn [{:keys [policy_id field_id state policy_premium acres]}] + [[policy_id field_id] [state (* policy_premium acres)]])))) + ; [[policy-id field-id] [1 [state field-premium]]] + + (k/map (comp second second)) ; [state field-premium] + (k/reduce-by-key +)) + + top100-element-ids-overall + (-> input-rdd + (k/map second) ; [element-id element-value] + (k/reduce-by-key +) ; [element-id total] + (k/map (k/fchoose 1 0)) ; [total element-id] + (k/sort-by-key false) ; desc + (k/map second) ; element-id + (k/take 2) ; TODO n=100 + set) + + element-id-state-value-rdd + (-> input-rdd + (k/join + (-> extra-rdd + (k/map (fn [{:keys [policy_id field_id state]}] [[policy_id field_id] state])))) + ; [ [policy-id field-id] [ [element-id element-value] state ]] + (k/map + (fn [[_ [[element-id element-value] state]]] [element-id [state element-value]])) + ; [element-id [state element-value]]* + k/cache) + + ;CTE per state (based on common Top 100) + cte-per-state + (-> element-id-state-value-rdd + (k/filter (fn [[element-id _]] (top100-element-ids-overall element-id))) + (k/map second) ; [state element-value] + (k/group-by-key) ; [state element-values] + (k/map (fn [[state element-values]] [state (avg element-values)]))) ; [state cte99] + + ; Standalone CTE per state (based on each state's Top 100) + top100-element-ids-per-state + (-> element-id-state-value-rdd + (k/map (fn [[element-id [state element-value]]] + [[state element-id] element-value])) ; [[state element-id] element-value] + (k/reduce-by-key +) + (k/map (fn [[[state element-id] element-value]] + [state [element-value element-id]])) ; [ state [element-value element-id] ] + k/group-by-key ; [ state [element-value element-id]* ] + ; TODO n=100 + (k/map (k/ftopn 2)) ; [ state [element-value element-id]* ] for the TOP values + (k/map (fn [[state elements]] + (vector state (set (map second elements))))) ; [ state top-element-ids ] + k/collect + (->> (into {}))) + + standalone-cte-per-state + (-> element-id-state-value-rdd ; [element-id [state element-value]]* + (k/map (fn [[element-id [state element-value]]] + [[state element-id] element-value])) + k/group-by-key ; [ [state element-id] element-values ] + (k/filter (fn [[[state element-id] element-values]] + ((get top100-element-ids-per-state state) element-id))) + (k/map (fn [[[state _] element-values]] + [state element-values])) ; [state element-values]* + k/group-by-key ; [state element-values-seq-of-seqs] + + (k/map (fn [[state element-values-seq-of-seqs]] + ; NOTE clojure.core/flatten does not work on scala SeqWrapper + [state (avg (reduce concat element-values-seq-of-seqs))]))) ; [state cte99] + ] + + (println "==============") + (println "Premium Per State") + (doseq [[state premium] (k/collect premium-per-state)] + (println state premium)) + + (println) + (println "==============") + (println "TOP100") + (println top100-element-ids-overall) + + (println) + (println "==============") + (println "CTE Per State") + (doseq [[state cte] (k/collect cte-per-state)] + (println state cte)) + + (println) + (println "==============") + (println "TOP100PERSTATE") + (println top100-element-ids-per-state) + + (println) + (println "==============") + (println "Standalone CTE Per State") + (doseq [[state cte] (k/collect standalone-cte-per-state)] + (println state cte)) + + (println "=============="))) diff --git a/src/clj_spark/spark/functions.clj b/src/clj_spark/spark/functions.clj new file mode 100644 index 0000000..8643318 --- /dev/null +++ b/src/clj_spark/spark/functions.clj @@ -0,0 +1,73 @@ +(ns clj-spark.spark.functions + (:require + [serializable.fn :as sfn]) + (:import + scala.Tuple2)) + +;;; Helpers + +(defn- serfn? + [f] + (= (type f) :serializable.fn/serializable-fn)) + +(def serialize-fn sfn/serialize) + +(def deserialize-fn (memoize sfn/deserialize)) + +(def array-of-bytes-type (Class/forName "[B")) + +;;; Generic + +(defn -init + "Save the function f in state" + [f] + [[] f]) + +(defn -call + [this & xs] + ; A little ugly that I have to do the deser here, but I tried in the -init fn and it failed. Maybe it would work in a :post-init? + (let [fn-or-serfn (.state this) + f (if (instance? array-of-bytes-type fn-or-serfn) + (deserialize-fn fn-or-serfn) + fn-or-serfn)] + (apply f xs))) + +;;; Functions + +(defn mk-sym + [fmt sym-name] + (symbol (format fmt sym-name))) + +(defmacro gen-function + [clazz wrapper-name] + (let [new-class-sym (mk-sym "clj_spark.spark.functions.%s" clazz) + prefix-sym (mk-sym "%s-" clazz) + ] + `(do + (def ~(mk-sym "%s-init" clazz) -init) + (def ~(mk-sym "%s-call" clazz) -call) + (gen-class + :name ~new-class-sym + :extends ~(mk-sym "spark.api.java.function.%s" clazz) + :prefix ~prefix-sym + :init ~'init + :state ~'state + :constructors {[Object] []}) + (defn ~wrapper-name [f#] + (new ~new-class-sym + (if (serfn? f#) (serialize-fn f#) f#)))))) + +(gen-function Function function) + +(gen-function VoidFunction void-function) + +(gen-function Function2 function2) + +(gen-function FlatMapFunction flat-map-function) + +(gen-function PairFunction pair-function) + +; Replaces the PairFunction-call defined by the gen-function macro. +(defn PairFunction-call + [this x] + (let [[a b] (-call this x)] (Tuple2. a b))) diff --git a/src/clj_spark/util.clj b/src/clj_spark/util.clj new file mode 100644 index 0000000..97d4045 --- /dev/null +++ b/src/clj_spark/util.clj @@ -0,0 +1,38 @@ +(ns clj-spark.util + (:require + [clojure.string :as s])) + +(defn truthy? + [x] + (if x (Boolean. true) (Boolean. false))) + +(defn as-integer + [s] + (Integer. s)) + +(defn as-long + [s] + (Long. s)) + +(defn as-double + [s] + (Double. s)) + +(defn csv-split + [s] + (s/split s #",")) + +(defn rcompare + "Comparator. The reverse of clojure.core/compare, i.e. + compare in DESCending order." + [x y] (- (clojure.lang.Util/compare x y))) + +(defn choose + [coll indices] + (reduce #(conj %1 (nth coll %2)) [] indices)) + +(defn avg + [values] + (if (empty? values) + 0.0 + (/ (reduce + values) (count values)))) diff --git a/test/resources/input.csv b/test/resources/input.csv new file mode 100644 index 0000000..ab5b4b3 --- /dev/null +++ b/test/resources/input.csv @@ -0,0 +1,5 @@ +108,1,10,junk,,2,100 +108,1,10,junk,,0,1 +108,1,10,junk,,1,10 +108,1,10,junk,,1,100 +108,1,20,junk,,0,1