Skip to content

Commit

Permalink
greatly lesson likelihood of race condition between supervisor and wo…
Browse files Browse the repository at this point in the history
…rker
  • Loading branch information
nathanmarz committed Mar 7, 2013
1 parent 87d133f commit f6804ad
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
12 changes: 9 additions & 3 deletions src/clj/backtype/storm/daemon/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,18 @@
(current-time-secs)
(:storm-id worker)
(:executors worker)
(:port worker))]
(:port worker))
state (worker-state conf (:worker-id worker))]
(log-debug "Doing heartbeat " (pr-str hb))
;; do the local-file-system heartbeat.
(.put (worker-state conf (:worker-id worker))
(.put state
LS-WORKER-HEARTBEAT
hb)
hb
false
)
(.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up.
; it shouldn't take supervisor 120 seconds between listing dir and reading it

))

(defn worker-outbound-tasks
Expand Down
20 changes: 16 additions & 4 deletions src/jvm/backtype/storm/utils/LocalState.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,34 @@ public Object get(Object key) throws IOException {
}

public synchronized void put(Object key, Object val) throws IOException {
put(key, val, true);
}

public synchronized void put(Object key, Object val, boolean cleanup) throws IOException {
Map<Object, Object> curr = snapshot();
curr.put(key, val);
persist(curr);
persist(curr, cleanup);
}

public synchronized void remove(Object key) throws IOException {
remove(key, true);
}

public synchronized void remove(Object key, boolean cleanup) throws IOException {
Map<Object, Object> curr = snapshot();
curr.remove(key);
persist(curr);
persist(curr, cleanup);
}

public synchronized void cleanup(int keepVersions) throws IOException {
_vs.cleanup(keepVersions);
}

private void persist(Map<Object, Object> val) throws IOException {
private void persist(Map<Object, Object> val, boolean cleanup) throws IOException {
byte[] toWrite = Utils.serialize(val);
String newPath = _vs.createVersion();
FileUtils.writeByteArrayToFile(new File(newPath), toWrite);
_vs.succeedVersion(newPath);
_vs.cleanup(4);
if(cleanup) _vs.cleanup(4);
}
}

0 comments on commit f6804ad

Please sign in to comment.