@@ -77,9 +77,11 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
7777 byte [] fileContent = getRandomBytesArray (ONE_MB );
7878 Path testFilePath = createFileWithContent (fs , fileName , fileContent );
7979 try (FSDataInputStream iStream = fs .open (testFilePath )) {
80- streamsInTest .add ((AbfsInputStream ) iStream .getWrappedStream ());
81- iStream .read ();
82- inProgressBuffers .addAll (bufferManager .getInProgressCopiedList ());
80+ streamsInTest .add (
81+ (AbfsInputStream ) iStream .getWrappedStream ());
82+ iStream .read ();
83+ inProgressBuffers .addAll (
84+ bufferManager .getInProgressCopiedList ());
8385 }
8486 executionCompletion [iteration ] = true ;
8587 return null ;
@@ -89,20 +91,37 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
8991 executorService .shutdown ();
9092 }
9193
92- while (!checkIfAllExecutionCompleted (executionCompletion )) {
93- Thread .sleep (checkExecutionWaitTime );
94- }
94+ /*
95+ * Since, the read from inputStream is happening in parallel thread, the
96+ * test has to wait for the execution to get over. If we don't wait, test
97+ * main thread will go on to do assertion where the stream execution may or
98+ * may not happen.
99+ */
100+ while (!checkIfAllExecutionCompleted (executionCompletion )) {
101+ Thread .sleep (checkExecutionWaitTime );
102+ }
95103
96- assertCompletedListContainsSubSetOfCertainSet (bufferManager .getCompletedReadListCopy (), inProgressBuffers , streamsInTest );
97- for (AbfsInputStream stream : streamsInTest ) {
98- assertListDoesnotContainBuffersForIstream (bufferManager .getReadAheadQueueCopy (), stream );
99- }
104+ /*
105+ * The close() method of AbfsInputStream would lead to purge of completedList.
106+ * Because the readBufferWorkers are running in parallel thread, due to race condition,
107+ * after close and before assert, it can happen that processing of inProgress buffer
108+ * can get completed and hence we cannot assert on completedList to be empty.
109+ * That is why completedList are checked to not have a buffer other than the
110+ * buffers in inProgressQueue just before the closure of AbfsInputStream object.
111+ */
112+ assertCompletedListContainsSubSetOfCertainSet (
113+ bufferManager .getCompletedReadListCopy (), inProgressBuffers ,
114+ streamsInTest );
115+ for (AbfsInputStream stream : streamsInTest ) {
116+ assertListDoesnotContainBuffersForIstream (
117+ bufferManager .getReadAheadQueueCopy (), stream );
118+ }
100119 }
101120
102121 private void assertCompletedListContainsSubSetOfCertainSet (final List <ReadBuffer > completedList ,
103122 Set <ReadBuffer > bufferSet , final Set <AbfsInputStream > streamsInTest ) {
104123 for (ReadBuffer buffer : completedList ) {
105- if (!streamsInTest .contains (buffer .getStream ())) {
124+ if (!streamsInTest .contains (buffer .getStream ())) {
106125 return ;
107126 }
108127 Assertions .assertThat (bufferSet )
@@ -155,19 +174,27 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception {
155174 iStream2 = (AbfsInputStream ) fs .open (testFilePath ).getWrappedStream ();
156175 streamsInTest .add (iStream2 );
157176 iStream2 .read ();
158- // After closing stream1, none of the buffers associated with stream1 should be present.
177+ // After closing stream1, no queued buffers of stream1 should be present.
159178 assertListDoesnotContainBuffersForIstream (bufferManager .getReadAheadQueueCopy (), iStream1 );
160179 } finally {
161180 // closing the stream later.
162181 IOUtils .closeStream (iStream2 );
163182 inProgressBufferSet .addAll (bufferManager .getInProgressCopiedList ());
164183 }
165- // After closing stream2, none of the buffers associated with stream2 should be present.
184+ // After closing stream2, no queued buffers of stream2 should be present.
166185 assertListDoesnotContainBuffersForIstream (bufferManager .getReadAheadQueueCopy (), iStream2 );
167186
168- assertCompletedListContainsSubSetOfCertainSet (bufferManager .getCompletedReadListCopy (), inProgressBufferSet ,
169- streamsInTest );
170-
187+ /*
188+ * The close() method of AbfsInputStream would lead to purge of completedList.
189+ * Because the readBufferWorkers are running in parallel thread, due to race condition,
190+ * after close and before assert, it can happen that processing of inProgress buffer
191+ * can get completed and hence we cannot assert on completedList to be empty.
192+ * That is why completedList are checked to not have a buffer other than the
193+ * buffers in inProgressQueue just before the closure of AbfsInputStream object.
194+ */
195+ assertCompletedListContainsSubSetOfCertainSet (
196+ bufferManager .getCompletedReadListCopy (), inProgressBufferSet ,
197+ streamsInTest );
171198 }
172199
173200
0 commit comments