1+ /**
2+ * Licensed to the Apache Software Foundation (ASF) under one
3+ * or more contributor license agreements. See the NOTICE file
4+ * distributed with this work for additional information
5+ * regarding copyright ownership. The ASF licenses this file
6+ * to you under the Apache License, Version 2.0 (the
7+ * "License"); you may not use this file except in compliance
8+ * with the License. You may obtain a copy of the License at
9+ *
10+ * http://www.apache.org/licenses/LICENSE-2.0
11+ *
12+ * Unless required by applicable law or agreed to in writing, software
13+ * distributed under the License is distributed on an "AS IS" BASIS,
14+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+ * See the License for the specific language governing permissions and
16+ * limitations under the License.
17+ */
18+ package org .apache .hadoop .hdfs .procedure ;
19+
20+ import com .google .common .annotations .VisibleForTesting ;
21+ import org .apache .commons .lang3 .builder .EqualsBuilder ;
22+ import org .apache .commons .lang3 .builder .HashCodeBuilder ;
23+ import org .apache .hadoop .io .Text ;
24+ import org .apache .hadoop .io .Writable ;
25+ import org .apache .hadoop .util .ReflectionUtils ;
26+ import org .slf4j .Logger ;
27+ import org .slf4j .LoggerFactory ;
28+
29+ import java .io .DataInput ;
30+ import java .io .DataOutput ;
31+ import java .io .IOException ;
32+ import java .util .Map ;
33+ import java .util .HashMap ;
34+ import java .util .Set ;
35+ import java .util .HashSet ;
36+ import java .util .List ;
37+ import java .util .ArrayList ;
38+
39+ /**
40+ * A Job is a state machine consists of many procedures. The procedures are
41+ * executed as a chain. Each procedure needs to specify the next procedure. If
42+ * there is no next procedure then the job is finished.
43+ */
44+ public final class BalanceJob <T extends BalanceProcedure > implements Writable {
45+ private String id ;
46+ private BalanceProcedureScheduler scheduler ;
47+ private volatile boolean jobDone = false ;
48+ private Exception error ;
49+ public static final Logger LOG = LoggerFactory .getLogger (BalanceJob .class );
50+ private Map <String , T > procedureTable = new HashMap <>();
51+ private T firstProcedure ;
52+ private T curProcedure ;
53+ private T lastProcedure ;
54+ private boolean removeAfterDone ;
55+
56+ static final String NEXT_PROCEDURE_NONE = "NONE" ;
57+ private static Set <String > reservedNames = new HashSet <>();
58+
59+ static {
60+ reservedNames .add (NEXT_PROCEDURE_NONE );
61+ }
62+
63+ public static class Builder <T extends BalanceProcedure > {
64+
65+ private List <T > procedures = new ArrayList <>();
66+ private boolean removeAfterDone = false ;
67+
68+ /**
69+ * Append a procedure to the tail.
70+ */
71+ public Builder nextProcedure (T procedure ) {
72+ int size = procedures .size ();
73+ if (size > 0 ) {
74+ procedures .get (size - 1 ).setNextProcedure (procedure .name ());
75+ }
76+ procedure .setNextProcedure (NEXT_PROCEDURE_NONE );
77+ procedures .add (procedure );
78+ return this ;
79+ }
80+
81+ /**
82+ * Automatically remove this job from the scheduler cache when the job is
83+ * done.
84+ */
85+ public Builder removeAfterDone (boolean remove ) {
86+ removeAfterDone = remove ;
87+ return this ;
88+ }
89+
90+ public BalanceJob build () throws IOException {
91+ BalanceJob job = new BalanceJob (procedures , removeAfterDone );
92+ for (BalanceProcedure <T > p : procedures ) {
93+ p .setJob (job );
94+ }
95+ return job ;
96+ }
97+ }
98+
99+ private BalanceJob (Iterable <T > procedures , boolean remove )
100+ throws IOException {
101+ for (T p : procedures ) {
102+ String taskName = p .name ();
103+ if (reservedNames .contains (taskName )) {
104+ throw new IOException (taskName + " is reserved." );
105+ }
106+ procedureTable .put (p .name (), p );
107+ if (firstProcedure == null ) {
108+ firstProcedure = p ;
109+ }
110+ }
111+ removeAfterDone = remove ;
112+ lastProcedure = null ;
113+ curProcedure = firstProcedure ;
114+ }
115+
116+ /**
117+ * Run the state machine.
118+ */
119+ public void execute () {
120+ boolean quit = false ;
121+ try {
122+ while (!jobDone && !quit && scheduler .isRunning ()) {
123+ if (curProcedure == null ) { // Job done.
124+ finish (null );
125+ quit = true ;
126+ } else {
127+ if (curProcedure == firstProcedure || lastProcedure != curProcedure ) {
128+ LOG .info ("Start procedure {}, last procedure is {}" ,
129+ curProcedure .name (),
130+ lastProcedure == null ? null : lastProcedure .name ());
131+ }
132+ if (curProcedure .execute ()) {
133+ lastProcedure = curProcedure ;
134+ curProcedure = next ();
135+ }
136+ if (!scheduler .writeJournal (this )) {
137+ quit = true ; // Write journal failed. Simply quit because this job
138+ // has already been added to the recoverQueue.
139+ LOG .debug ("Write journal failed. Quit and wait for recovery." );
140+ }
141+ }
142+ }
143+ } catch (BalanceProcedure .RetryException tre ) {
144+ scheduler .delay (this , curProcedure .delayMillisBeforeRetry ());
145+ } catch (Exception e ) {
146+ finish (e );
147+ } catch (Throwable t ) {
148+ IOException err = new IOException ("Got throwable error." , t );
149+ finish (err );
150+ }
151+ }
152+
153+ private T next () {
154+ if (curProcedure == null ) {
155+ return firstProcedure ;
156+ } else {
157+ return procedureTable .get (curProcedure .nextProcedure ());
158+ }
159+ }
160+
161+ /**
162+ * Job finishes. It could be either success or failure.
163+ * @param exception the exception that causes the job to fail. null indicates
164+ * the job is successful.
165+ */
166+ private synchronized void finish (Exception exception ) {
167+ assert !jobDone ;
168+ if (scheduler .jobDone (this )) {
169+ jobDone = true ;
170+ error = exception ;
171+ notifyAll ();
172+ }
173+ }
174+
175+ void setScheduler (BalanceProcedureScheduler scheduler ) {
176+ this .scheduler = scheduler ;
177+ }
178+
179+ void setId (String id ) {
180+ this .id = id ;
181+ }
182+
183+ /**
184+ * Get the uid of the job.
185+ */
186+ public String getId () {
187+ return this .id ;
188+ }
189+
190+ /**
191+ * Whether this job should be removed after it's done.
192+ */
193+ @ VisibleForTesting
194+ public boolean shouldRemoveAfterDone () {
195+ return removeAfterDone ;
196+ }
197+
198+ @ VisibleForTesting
199+ void setLastProcedure (T lastProcedure ) {
200+ this .lastProcedure = lastProcedure ;
201+ }
202+
203+ @ VisibleForTesting
204+ void setCurrentProcedure (T currentProcedure ) {
205+ this .curProcedure = currentProcedure ;
206+ }
207+
208+ /**
209+ * Return true if the job has finished.
210+ */
211+ public boolean isJobDone () {
212+ return jobDone ;
213+ }
214+
215+ /**
216+ * Wait until the job is done.
217+ */
218+ public synchronized void waitJobDone () throws InterruptedException {
219+ while (!jobDone ) {
220+ wait ();
221+ }
222+ }
223+
224+ /**
225+ * Return the error exception during the job execution. This should be called
226+ * after the job finishes.
227+ */
228+ public Exception getError () {
229+ return error ;
230+ }
231+
232+ @ Override
233+ public void write (DataOutput out ) throws IOException {
234+ if (id == null ) {
235+ throw new IOException ("BalanceJob with id=null can not be serialized." );
236+ }
237+ Text .writeString (out , id );
238+ int taskTableSize = procedureTable .size ();
239+ out .writeInt (taskTableSize );
240+ for (T p : procedureTable .values ()) {
241+ Text .writeString (out , p .getClass ().getName ());
242+ p .write (out );
243+ }
244+ if (firstProcedure != null ) {
245+ Text .writeString (out , firstProcedure .name ());
246+ } else {
247+ Text .writeString (out , NEXT_PROCEDURE_NONE );
248+ }
249+ if (curProcedure != null ) {
250+ Text .writeString (out , curProcedure .name ());
251+ } else {
252+ Text .writeString (out , NEXT_PROCEDURE_NONE );
253+ }
254+ if (lastProcedure != null ) {
255+ Text .writeString (out , lastProcedure .name ());
256+ } else {
257+ Text .writeString (out , NEXT_PROCEDURE_NONE );
258+ }
259+ }
260+
261+ @ Override
262+ public void readFields (DataInput in ) throws IOException {
263+ this .id = Text .readString (in );
264+ procedureTable = new HashMap <>();
265+ int taskTableSize = in .readInt ();
266+ for (int i = 0 ; i < taskTableSize ; i ++) {
267+ String className = Text .readString (in );
268+ try {
269+ T p = (T ) ReflectionUtils .newInstance (Class .forName (className ), null );
270+ p .readFields (in );
271+ procedureTable .put (p .name (), p );
272+ } catch (Exception e ) {
273+ LOG .error ("Failed reading Procedure." , e );
274+ throw new IOException (e );
275+ }
276+ }
277+ String firstProcedureName = Text .readString (in );
278+ if (firstProcedureName .equals (NEXT_PROCEDURE_NONE )) {
279+ firstProcedure = null ;
280+ } else {
281+ firstProcedure = procedureTable .get (firstProcedureName );
282+ }
283+ String currentProcedureName = Text .readString (in );
284+ if (currentProcedureName .equals (NEXT_PROCEDURE_NONE )) {
285+ curProcedure = null ;
286+ } else {
287+ curProcedure = procedureTable .get (currentProcedureName );
288+ }
289+ String lastProcedureName = Text .readString (in );
290+ if (lastProcedureName .equals (NEXT_PROCEDURE_NONE )) {
291+ lastProcedure = null ;
292+ } else {
293+ lastProcedure = procedureTable .get (lastProcedureName );
294+ }
295+ }
296+
297+ @ Override
298+ public boolean equals (Object obj ) {
299+ if (obj == null ) {
300+ return false ;
301+ }
302+ if (obj == this ) {
303+ return true ;
304+ }
305+ if (obj .getClass () != getClass ()) {
306+ return false ;
307+ }
308+ BalanceJob bj = (BalanceJob ) obj ;
309+ return new EqualsBuilder ()
310+ .append (id , bj .id )
311+ .append (procedureTable , bj .procedureTable )
312+ .append (firstProcedure , bj .firstProcedure )
313+ .isEquals ();
314+ }
315+
316+ @ Override
317+ public int hashCode () {
318+ return new HashCodeBuilder (17 , 37 )
319+ .append (id )
320+ .append (procedureTable )
321+ .toHashCode ();
322+ }
323+
324+ @ Override
325+ public String toString () {
326+ return "{jobId=" + id + "}" ;
327+ }
328+
329+ /**
330+ * Get the detail description of this job.
331+ */
332+ public String getDetailMessage () {
333+ StringBuilder builder = new StringBuilder ();
334+ builder .append ("id=" ).append (id );
335+ if (firstProcedure != null ) {
336+ builder .append (",firstProcedure=" ).append (firstProcedure );
337+ }
338+ if (curProcedure != null ) {
339+ builder .append (",currentProcedure=" ).append (curProcedure );
340+ }
341+ builder .append (",jobDone=" ).append (jobDone );
342+ if (error != null ) {
343+ builder .append (",error=" ).append (error .getMessage ());
344+ }
345+ return builder .toString ();
346+ }
347+
348+ boolean isSchedulerShutdown () {
349+ return !scheduler .isRunning ();
350+ }
351+
352+ @ VisibleForTesting
353+ Map <String , T > getProcedureTable () {
354+ return procedureTable ;
355+ }
356+
357+ @ VisibleForTesting
358+ T getCurProcedure () {
359+ return curProcedure ;
360+ }
361+ }
0 commit comments