Skip to content

Commit b1dc422

Browse files
Robert (Bobby) Evansptgoetz
Robert (Bobby) Evans
authored andcommitted
STORM-765: Thrift serialization for local state.
1 parent 24e3b98 commit b1dc422

16 files changed

+4530
-104
lines changed

storm-core/src/clj/backtype/storm/daemon/common.clj

-13
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,6 @@
5858
(defprotocol DaemonCommon
5959
(waiting? [this]))
6060

61-
(def LS-WORKER-HEARTBEAT "worker-heartbeat")
62-
63-
;; LocalState constants
64-
(def LS-ID "supervisor-id")
65-
(def LS-LOCAL-ASSIGNMENTS "local-assignments")
66-
(def LS-APPROVED-WORKERS "approved-workers")
67-
68-
(defn mk-local-worker-heartbeat [time-secs storm-id executors port]
69-
{:time-secs time-secs
70-
:storm-id storm-id
71-
:executors executors
72-
:port port})
73-
7461
(defrecord ExecutorStats [^long processed
7562
^long acked
7663
^long emitted

storm-core/src/clj/backtype/storm/daemon/supervisor.clj

+15-19
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
[java.net URI]
2525
[org.apache.commons.io FileUtils]
2626
[java.io File])
27-
(:use [backtype.storm config util log timer])
27+
(:use [backtype.storm config util log timer local-state])
2828
(:use [backtype.storm.daemon common])
2929
(:require [backtype.storm.daemon [worker :as worker]]
3030
[backtype.storm [process-simulator :as psim] [cluster :as cluster] [event :as event]]
@@ -38,9 +38,6 @@
3838
(defmulti download-storm-code cluster-mode)
3939
(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
4040

41-
;; used as part of a map from port to this
42-
(defrecord LocalAssignment [storm-id executors])
43-
4441
(defprotocol SupervisorDaemon
4542
(get-id [this])
4643
(get-conf [this])
@@ -76,7 +73,7 @@
7673
(into {} (for [[port executors] port-executors]
7774
;; need to cast to int b/c it might be a long (due to how yaml parses things)
7875
;; doall is to avoid serialization/deserialization problems with lazy seqs
79-
[(Integer. port) (LocalAssignment. storm-id (doall executors))]
76+
[(Integer. port) (mk-local-assignment storm-id (doall executors))]
8077
))))
8178

8279
(defn- read-assignments
@@ -104,8 +101,8 @@
104101
(defn read-worker-heartbeat [conf id]
105102
(let [local-state (worker-state conf id)]
106103
(try
107-
(.get local-state LS-WORKER-HEARTBEAT)
108-
(catch IOException e
104+
(ls-worker-heartbeat local-state)
105+
(catch Exception e
109106
(log-warn e "Failed to read local heartbeat for workerId : " id ",Ignoring exception.")
110107
nil))))
111108

@@ -148,7 +145,7 @@
148145
(let [conf (:conf supervisor)
149146
^LocalState local-state (:local-state supervisor)
150147
id->heartbeat (read-worker-heartbeats conf)
151-
approved-ids (set (keys (.get local-state LS-APPROVED-WORKERS)))]
148+
approved-ids (set (keys (ls-approved-workers local-state)))]
152149
(into
153150
{}
154151
(dofor [[id hb] id->heartbeat]
@@ -174,7 +171,7 @@
174171
(defn- wait-for-worker-launch [conf id start-time]
175172
(let [state (worker-state conf id)]
176173
(loop []
177-
(let [hb (.get state LS-WORKER-HEARTBEAT)]
174+
(let [hb (ls-worker-heartbeat state)]
178175
(when (and
179176
(not hb)
180177
(<
@@ -185,7 +182,7 @@
185182
(Time/sleep 500)
186183
(recur)
187184
)))
188-
(when-not (.get state LS-WORKER-HEARTBEAT)
185+
(when-not (ls-worker-heartbeat state)
189186
(log-message "Worker " id " failed to start")
190187
)))
191188

@@ -320,7 +317,7 @@
320317
download-lock (:download-lock supervisor)
321318
^LocalState local-state (:local-state supervisor)
322319
storm-cluster-state (:storm-cluster-state supervisor)
323-
assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
320+
assigned-executors (defaulted (ls-local-assignments local-state) {})
324321
now (current-time-secs)
325322
allocated (read-allocated-workers supervisor assigned-executors now)
326323
keepers (filter-val
@@ -358,9 +355,9 @@
358355
(doseq [id (vals new-worker-ids)]
359356
(local-mkdirs (worker-pids-root conf id))
360357
(local-mkdirs (worker-heartbeats-root conf id)))
361-
(.put local-state LS-APPROVED-WORKERS
358+
(ls-approved-workers! local-state
362359
(merge
363-
(select-keys (.get local-state LS-APPROVED-WORKERS)
360+
(select-keys (ls-approved-workers local-state)
364361
(keys keepers))
365362
(zipmap (vals new-worker-ids) (keys new-worker-ids))
366363
))
@@ -416,7 +413,7 @@
416413
(defn shutdown-disallowed-workers [supervisor]
417414
(let [conf (:conf supervisor)
418415
^LocalState local-state (:local-state supervisor)
419-
assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
416+
assigned-executors (defaulted (ls-local-assignments local-state) {})
420417
now (current-time-secs)
421418
allocated (read-allocated-workers supervisor assigned-executors now)
422419
disallowed (keys (filter-val
@@ -442,7 +439,7 @@
442439
assignment-versions)
443440
storm-code-map (read-storm-code-locations assignments-snapshot)
444441
downloaded-storm-ids (set (read-downloaded-storm-ids conf))
445-
existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)
442+
existing-assignment (ls-local-assignments local-state)
446443
all-assignment (read-assignments assignments-snapshot
447444
(:assignment-id supervisor)
448445
existing-assignment
@@ -472,8 +469,7 @@
472469
(set (keys new-assignment)))]
473470
(.killedWorker isupervisor (int p)))
474471
(.assigned isupervisor (keys new-assignment))
475-
(.put local-state
476-
LS-LOCAL-ASSIGNMENTS
472+
(ls-local-assignments! local-state
477473
new-assignment)
478474
(reset! (:assignment-versions supervisor) versions)
479475
(reset! (:curr-assignment supervisor) new-assignment)
@@ -780,10 +776,10 @@
780776
(prepare [this conf local-dir]
781777
(reset! conf-atom conf)
782778
(let [state (LocalState. local-dir)
783-
curr-id (if-let [id (.get state LS-ID)]
779+
curr-id (if-let [id (ls-supervisor-id state)]
784780
id
785781
(generate-supervisor-id))]
786-
(.put state LS-ID curr-id)
782+
(ls-supervisor-id! state curr-id)
787783
(reset! id-atom curr-id))
788784
)
789785
(confirmAssigned [this port]

storm-core/src/clj/backtype/storm/daemon/worker.clj

+2-12
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
;; limitations under the License.
1616
(ns backtype.storm.daemon.worker
1717
(:use [backtype.storm.daemon common])
18-
(:use [backtype.storm config log util timer])
18+
(:use [backtype.storm config log util timer local-state])
1919
(:require [backtype.storm.daemon [executor :as executor]])
2020
(:require [backtype.storm [disruptor :as disruptor] [cluster :as cluster]])
2121
(:require [clojure.set :as set])
@@ -68,19 +68,9 @@
6868

6969
(defn do-heartbeat [worker]
7070
(let [conf (:conf worker)
71-
hb (mk-local-worker-heartbeat
72-
(current-time-secs)
73-
(:storm-id worker)
74-
(:executors worker)
75-
(:port worker))
7671
state (worker-state conf (:worker-id worker))]
77-
(log-debug "Doing heartbeat " (pr-str hb))
7872
;; do the local-file-system heartbeat.
79-
(.put state
80-
LS-WORKER-HEARTBEAT
81-
hb
82-
false
83-
)
73+
(ls-worker-heartbeat! state (current-time-secs) (:storm-id worker) (:executors worker) (:port worker))
8474
(.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up.
8575
; it shouldn't take supervisor 120 seconds between listing dir and reading it
8676

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
;; Licensed to the Apache Software Foundation (ASF) under one
2+
;; or more contributor license agreements. See the NOTICE file
3+
;; distributed with this work for additional information
4+
;; regarding copyright ownership. The ASF licenses this file
5+
;; to you under the Apache License, Version 2.0 (the
6+
;; "License"); you may not use this file except in compliance
7+
;; with the License. You may obtain a copy of the License at
8+
;;
9+
;; http://www.apache.org/licenses/LICENSE-2.0
10+
;;
11+
;; Unless required by applicable law or agreed to in writing, software
12+
;; distributed under the License is distributed on an "AS IS" BASIS,
13+
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
;; See the License for the specific language governing permissions and
15+
;; limitations under the License.
16+
(ns backtype.storm.local-state
17+
(:use [backtype.storm log util])
18+
(:import [backtype.storm.generated StormTopology
19+
InvalidTopologyException GlobalStreamId
20+
LSSupervisorId LSApprovedWorkers
21+
LSSupervisorAssignments LocalAssignment
22+
ExecutorInfo LSWorkerHeartbeat])
23+
(:import [backtype.storm.utils LocalState]))
24+
25+
(def LS-WORKER-HEARTBEAT "worker-heartbeat")
26+
(def LS-ID "supervisor-id")
27+
(def LS-LOCAL-ASSIGNMENTS "local-assignments")
28+
(def LS-APPROVED-WORKERS "approved-workers")
29+
30+
(defn ls-supervisor-id!
31+
[^LocalState local-state ^String id]
32+
(.put local-state LS-ID (LSSupervisorId. id)))
33+
34+
(defn ls-supervisor-id
35+
[^LocalState local-state]
36+
(if-let [super-id (.get local-state LS-ID)]
37+
(.get_supervisor_id super-id)))
38+
39+
(defn ls-approved-workers!
40+
[^LocalState local-state workers]
41+
(.put local-state LS-APPROVED-WORKERS (LSApprovedWorkers. workers)))
42+
43+
(defn ls-approved-workers
44+
[^LocalState local-state]
45+
(if-let [tmp (.get local-state LS-APPROVED-WORKERS)]
46+
(into {} (.get_approved_workers tmp))))
47+
48+
(defn ->ExecutorInfo
49+
[[low high]] (ExecutorInfo. low high))
50+
51+
(defn ->ExecutorInfo-list
52+
[executors]
53+
(map ->ExecutorInfo executors))
54+
55+
(defn ->executor-list
56+
[executors]
57+
(into []
58+
(for [exec-info executors]
59+
[(.get_task_start exec-info) (.get_task_end exec-info)])))
60+
61+
(defn ->LocalAssignment
62+
[{storm-id :storm-id executors :executors}]
63+
(LocalAssignment. storm-id (->ExecutorInfo-list executors)))
64+
65+
(defn mk-local-assignment
66+
[storm-id executors]
67+
{:storm-id storm-id :executors executors})
68+
69+
(defn ->local-assignment
70+
[^LocalAssignment thrift-local-assignment]
71+
(mk-local-assignment
72+
(.get_topology_id thrift-local-assignment)
73+
(->executor-list (.get_executors thrift-local-assignment))))
74+
75+
(defn ls-local-assignments!
76+
[^LocalState local-state assignments]
77+
(let [local-assignment-map (map-val ->LocalAssignment assignments)]
78+
(.put local-state LS-LOCAL-ASSIGNMENTS
79+
(LSSupervisorAssignments. local-assignment-map))))
80+
81+
(defn ls-local-assignments
82+
[^LocalState local-state]
83+
(if-let [thrift-local-assignments (.get local-state LS-LOCAL-ASSIGNMENTS)]
84+
(map-val
85+
->local-assignment
86+
(.get_assignments thrift-local-assignments))))
87+
88+
(defn ls-worker-heartbeat!
89+
[^LocalState local-state time-secs storm-id executors port]
90+
(.put local-state LS-WORKER-HEARTBEAT (LSWorkerHeartbeat. time-secs storm-id (->ExecutorInfo-list executors) port) false))
91+
92+
(defn ls-worker-heartbeat
93+
[^LocalState local-state]
94+
(if-let [worker-hb (.get local-state LS-WORKER-HEARTBEAT)]
95+
{:time-secs (.get_time_secs worker-hb)
96+
:storm-id (.get_topology_id worker-hb)
97+
:executors (->executor-list (.get_executors worker-hb))
98+
:port (.get_port worker-hb)}))
99+

storm-core/src/clj/backtype/storm/testing.clj

+3-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
(:require [backtype.storm [zookeeper :as zk]])
4343
(:require [backtype.storm.messaging.loader :as msg-loader])
4444
(:require [backtype.storm.daemon.acker :as acker])
45-
(:use [backtype.storm cluster util thrift config log]))
45+
(:use [backtype.storm cluster util thrift config log local-state]))
4646

4747
(defn feeder-spout
4848
[fields]
@@ -302,13 +302,13 @@
302302
(defn find-worker-id
303303
[supervisor-conf port]
304304
(let [supervisor-state (supervisor-state supervisor-conf)
305-
worker->port (.get supervisor-state common/LS-APPROVED-WORKERS)]
305+
worker->port (ls-approved-workers supervisor-state)]
306306
(first ((reverse-map worker->port) port))))
307307

308308
(defn find-worker-port
309309
[supervisor-conf worker-id]
310310
(let [supervisor-state (supervisor-state supervisor-conf)
311-
worker->port (.get supervisor-state common/LS-APPROVED-WORKERS)]
311+
worker->port (ls-approved-workers supervisor-state)]
312312
(worker->port worker-id)))
313313

314314
(defn mk-capture-shutdown-fn

0 commit comments

Comments
 (0)