@@ -323,35 +323,60 @@ public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService po
323
323
324
324
this .id = COUNTER .incrementAndGet ();
325
325
326
- this .pause = conf .getLong (HConstants .HBASE_CLIENT_PAUSE ,
327
- HConstants .DEFAULT_HBASE_CLIENT_PAUSE );
328
- long configuredPauseForCQTBE = conf .getLong (HConstants .HBASE_CLIENT_PAUSE_FOR_CQTBE , pause );
329
- if (configuredPauseForCQTBE < pause ) {
330
- LOG .warn ("The " + HConstants .HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
331
- + configuredPauseForCQTBE + " is smaller than " + HConstants .HBASE_CLIENT_PAUSE
332
- + ", will use " + pause + " instead." );
333
- this .pauseForCQTBE = pause ;
334
- } else {
335
- this .pauseForCQTBE = configuredPauseForCQTBE ;
336
- }
337
- this .numTries = conf .getInt (HConstants .HBASE_CLIENT_RETRIES_NUMBER ,
338
- HConstants .DEFAULT_HBASE_CLIENT_RETRIES_NUMBER );
326
+ ConnectionConfiguration connConf =
327
+ hc .getConfiguration () == conf
328
+ ? hc .getConnectionConfiguration ()
329
+ // Slow: parse conf in ConnectionConfiguration constructor
330
+ : new ConnectionConfiguration (conf );
331
+ if (connConf == null ) {
332
+ // Slow: parse conf in ConnectionConfiguration constructor
333
+ connConf = new ConnectionConfiguration (conf );
334
+ }
335
+
336
+ this .pause = connConf .getPause ();
337
+ this .pauseForCQTBE = connConf .getPauseForCQTBE ();
338
+
339
+ this .numTries = connConf .getRetriesNumber ();
339
340
this .rpcTimeout = rpcTimeout ;
340
- this .operationTimeout = conf .getInt (HConstants .HBASE_CLIENT_OPERATION_TIMEOUT ,
341
- HConstants .DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT );
342
- this .primaryCallTimeoutMicroseconds = conf .getInt (PRIMARY_CALL_TIMEOUT_KEY , 10000 );
343
-
344
- this .maxTotalConcurrentTasks = conf .getInt (HConstants .HBASE_CLIENT_MAX_TOTAL_TASKS ,
345
- HConstants .DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS );
346
- this .maxConcurrentTasksPerServer = conf .getInt (HConstants .HBASE_CLIENT_MAX_PERSERVER_TASKS ,
347
- HConstants .DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS );
348
- this .maxConcurrentTasksPerRegion = conf .getInt (HConstants .HBASE_CLIENT_MAX_PERREGION_TASKS ,
349
- HConstants .DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS );
350
- this .maxHeapSizePerRequest = conf .getLong (HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE ,
351
- DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE );
352
- this .maxHeapSizeSubmit = conf .getLong (HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE , DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE );
341
+ this .operationTimeout = connConf .getOperationTimeout ();
342
+
343
+ // Parse config once and reuse config values of hc's AsyncProcess in AsyncProcess for put
344
+ // Can be null when constructing hc's AsyncProcess or it's not reusable
345
+ AsyncProcess globalAsyncProcess = hc .getConfiguration () == conf ? hc .getAsyncProcess () : null ;
346
+
347
+ this .primaryCallTimeoutMicroseconds =
348
+ globalAsyncProcess == null
349
+ ? conf .getInt (PRIMARY_CALL_TIMEOUT_KEY , 10000 )
350
+ : globalAsyncProcess .primaryCallTimeoutMicroseconds ;
351
+
352
+ this .maxTotalConcurrentTasks =
353
+ globalAsyncProcess == null
354
+ ? conf .getInt (HConstants .HBASE_CLIENT_MAX_TOTAL_TASKS ,
355
+ HConstants .DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS )
356
+ : globalAsyncProcess .maxTotalConcurrentTasks ;
357
+ this .maxConcurrentTasksPerServer =
358
+ globalAsyncProcess == null
359
+ ? conf .getInt (HConstants .HBASE_CLIENT_MAX_PERSERVER_TASKS ,
360
+ HConstants .DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS )
361
+ : globalAsyncProcess .maxConcurrentTasksPerServer ;
362
+ this .maxConcurrentTasksPerRegion =
363
+ globalAsyncProcess == null
364
+ ? conf .getInt (HConstants .HBASE_CLIENT_MAX_PERREGION_TASKS ,
365
+ HConstants .DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS )
366
+ : globalAsyncProcess .maxConcurrentTasksPerRegion ;
367
+ this .maxHeapSizePerRequest =
368
+ globalAsyncProcess == null
369
+ ? conf .getLong (HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE ,
370
+ DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE )
371
+ : globalAsyncProcess .maxHeapSizePerRequest ;
372
+ this .maxHeapSizeSubmit =
373
+ globalAsyncProcess == null
374
+ ? conf .getLong (HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE , DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE )
375
+ : globalAsyncProcess .maxHeapSizeSubmit ;
353
376
this .startLogErrorsCnt =
354
- conf .getInt (START_LOG_ERRORS_AFTER_COUNT_KEY , DEFAULT_START_LOG_ERRORS_AFTER_COUNT );
377
+ globalAsyncProcess == null
378
+ ? conf .getInt (START_LOG_ERRORS_AFTER_COUNT_KEY , DEFAULT_START_LOG_ERRORS_AFTER_COUNT )
379
+ : globalAsyncProcess .startLogErrorsCnt ;
355
380
356
381
if (this .maxTotalConcurrentTasks <= 0 ) {
357
382
throw new IllegalArgumentException ("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks );
@@ -387,11 +412,16 @@ public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService po
387
412
388
413
this .rpcCallerFactory = rpcCaller ;
389
414
this .rpcFactory = rpcFactory ;
390
- this .logBatchErrorDetails = conf .getBoolean (LOG_DETAILS_FOR_BATCH_ERROR , false );
415
+ this .logBatchErrorDetails =
416
+ globalAsyncProcess == null
417
+ ? conf .getBoolean (LOG_DETAILS_FOR_BATCH_ERROR , false )
418
+ : globalAsyncProcess .logBatchErrorDetails ;
391
419
392
420
this .thresholdToLogUndoneTaskDetails =
393
- conf .getInt (THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS ,
394
- DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS );
421
+ globalAsyncProcess == null
422
+ ? conf .getInt (THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS ,
423
+ DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS )
424
+ : globalAsyncProcess .thresholdToLogUndoneTaskDetails ;
395
425
}
396
426
397
427
public void setRpcTimeout (int rpcTimeout ) {
0 commit comments