28
28
import java .io .IOException ;
29
29
import java .util .HashMap ;
30
30
import java .util .Map ;
31
+ import java .util .concurrent .ConcurrentHashMap ;
31
32
import java .util .stream .IntStream ;
32
33
33
34
import static org .apache .flink .util .Preconditions .checkArgument ;
@@ -216,13 +217,13 @@ public void close() {
216
217
asyncThread .interrupt ();
217
218
}
218
219
219
- public void iniEnv (int numberOfChannels ){
220
- bufferStatista = new HashMap <>(numberOfChannels );
220
+ public void iniEnv (int numberOfChannels ) {
221
+ bufferStatista = new ConcurrentHashMap <>(numberOfChannels );
221
222
asyncThread = new Thread (() -> {
222
223
try {
223
224
Long checkStartTime = System .currentTimeMillis ();
224
225
while (iniAsyncBackground ) {
225
- if (System .currentTimeMillis () - checkStartTime > FLUSH_TIME ){
226
+ if (System .currentTimeMillis () - checkStartTime > FLUSH_TIME ) {
226
227
bufferStatista .clear ();
227
228
checkStartTime = System .currentTimeMillis ();
228
229
}
@@ -231,7 +232,7 @@ public void iniEnv(int numberOfChannels){
231
232
bufferStatista .put (e ,
232
233
bufferStatista .getOrDefault (e , 0 )
233
234
+ Math .max (subpartitions [e ].getBuffersInBacklog (), 1 )));
234
- if (getBackPressuredTimeMsPerSecond ().getValue () > ACTIVE_FLUSH_TASK_BACK_PRESSURED_TIME ){
235
+ if (getBackPressuredTimeMsPerSecond ().getValue () > ACTIVE_FLUSH_TASK_BACK_PRESSURED_TIME ) {
235
236
flushStatista = true ;
236
237
}
237
238
Thread .sleep (ACTIVE_FLUSH_TASK_BACK_PRESSURED_TIME );
@@ -245,7 +246,7 @@ public void iniEnv(int numberOfChannels){
245
246
246
247
public Map <Integer , Integer > getBufferStatista () {
247
248
if (flushStatista ) {
248
- return bufferStatista ;
249
+ return new HashMap <>( bufferStatista ) ;
249
250
}
250
251
return null ;
251
252
}
0 commit comments