@@ -135,6 +135,7 @@ InputStreamImpl::InputStreamImpl() :
135
135
136
136
InputStreamImpl::InputStreamImpl (shared_ptr<LocatedBlocks> lbsPtr) {
137
137
new (this )InputStreamImpl ();
138
+ lock_guard<std::recursive_mutex> lock (infoMutex);
138
139
lbs = lbsPtr;
139
140
}
140
141
@@ -223,6 +224,7 @@ void InputStreamImpl::updateBlockInfos() {
223
224
* @param need Whether getBlockLocations needs to be called.
224
225
*/
225
226
void InputStreamImpl::updateBlockInfos (bool need) {
227
+ lock_guard<std::recursive_mutex> lock (infoMutex);
226
228
int retry = maxGetBlockInfoRetry;
227
229
228
230
for (int i = 0 ; i < retry; ++i) {
@@ -232,7 +234,6 @@ void InputStreamImpl::updateBlockInfos(bool need) {
232
234
}
233
235
234
236
if (need) {
235
- lock_guard<std::recursive_mutex> lock (infoMutex);
236
237
filesystem->getBlockLocations (path, cursor, prefetchSize, *lbs);
237
238
}
238
239
@@ -714,6 +715,7 @@ int32_t InputStreamImpl::readOneBlock(char * buf, int32_t size, bool shouldUpdat
714
715
*/
715
716
int32_t InputStreamImpl::readInternal (char * buf, int32_t size) {
716
717
int updateMetadataOnFailure = conf->getMaxReadBlockRetry ();
718
+ bool isInfoMutexLock = false ;
717
719
718
720
try {
719
721
do {
@@ -729,6 +731,10 @@ int32_t InputStreamImpl::readInternal(char * buf, int32_t size) {
729
731
* Do RPC failover work in updateBlockInfos.
730
732
*/
731
733
updateBlockInfos ();
734
+ if (isInfoMutexLock) {
735
+ isInfoMutexLock = false ;
736
+ infoMutex.unlock ();
737
+ }
732
738
733
739
/*
734
740
* We already have the up-to-date block information,
@@ -768,6 +774,8 @@ int32_t InputStreamImpl::readInternal(char * buf, int32_t size) {
768
774
* We will update metadata once and try again.
769
775
*/
770
776
if (retval < 0 ) {
777
+ infoMutex.lock ();
778
+ isInfoMutexLock = true ;
771
779
lbs.reset ();
772
780
endOfCurBlock = 0 ;
773
781
--updateMetadataOnFailure;
@@ -780,13 +788,26 @@ int32_t InputStreamImpl::readInternal(char * buf, int32_t size) {
780
788
continue ;
781
789
}
782
790
791
+ if (isInfoMutexLock) {
792
+ isInfoMutexLock = false ;
793
+ infoMutex.unlock ();
794
+ }
783
795
return retval;
784
796
} while (true );
785
797
} catch (const HdfsCanceled & e) {
798
+ if (isInfoMutexLock) {
799
+ infoMutex.unlock ();
800
+ }
786
801
throw ;
787
802
} catch (const HdfsEndOfStream & e) {
803
+ if (isInfoMutexLock) {
804
+ infoMutex.unlock ();
805
+ }
788
806
throw ;
789
807
} catch (const HdfsException & e) {
808
+ if (isInfoMutexLock) {
809
+ infoMutex.unlock ();
810
+ }
790
811
/*
791
812
* wrap the underlying error and rethrow.
792
813
*/
@@ -806,8 +827,14 @@ int32_t InputStreamImpl::readInternal(char * buf, int32_t size) {
806
827
int32_t InputStreamImpl::preadInternal (char * buf, int32_t size, int64_t position) {
807
828
int64_t cursor = position;
808
829
try {
830
+ infoMutex.lock ();
831
+ if (!lbs) {
832
+ THROW (HdfsIOException, " InputStreamImpl: lbs is empty, cannot pread file: %s, from position %" PRId64 " , size: %d." ,
833
+ path.c_str (), cursor, size);
834
+ }
809
835
int64_t filelen = getFileLength ();
810
836
if ((position < 0 ) || (position >= filelen)) {
837
+ infoMutex.unlock ();
811
838
return -1 ;
812
839
}
813
840
int32_t realLen = size;
@@ -818,6 +845,7 @@ int32_t InputStreamImpl::preadInternal(char * buf, int32_t size, int64_t positio
818
845
// determine the block and byte range within the block
819
846
// corresponding to position and realLen
820
847
std::vector<shared_ptr<LocatedBlock>> blockRange = getBlockRange (position, (int64_t )realLen);
848
+ infoMutex.unlock ();
821
849
int32_t remaining = realLen;
822
850
int32_t bytesHasRead = 0 ;
823
851
for (shared_ptr<LocatedBlock> blk : blockRange) {
@@ -842,7 +870,7 @@ int32_t InputStreamImpl::preadInternal(char * buf, int32_t size, int64_t positio
842
870
* wrap the underlying error and rethrow.
843
871
*/
844
872
NESTED_THROW (HdfsIOException,
845
- " InputStreamImpl: cannot read file: %s, from position %" PRId64 " , size: %d." ,
873
+ " InputStreamImpl: cannot pread file: %s, from position %" PRId64 " , size: %d." ,
846
874
path.c_str (), cursor, size);
847
875
}
848
876
}
@@ -857,12 +885,12 @@ int32_t InputStreamImpl::preadInternal(char * buf, int32_t size, int64_t positio
857
885
* @throws IOException
858
886
*/
859
887
std::vector<shared_ptr<LocatedBlock>> InputStreamImpl::getBlockRange (int64_t offset, int64_t length) {
888
+ lock_guard<std::recursive_mutex> lock (infoMutex);
860
889
// getFileLength(): returns total file length
861
890
// locatedBlocks.getFileLength(): returns length of completed blocks
862
891
if (offset >= getFileLength ()) {
863
892
THROW (HdfsIOException, " Offset: %" PRId64 " exceeds file length: %" PRId64, offset, getFileLength ());
864
893
}
865
- lock_guard<std::recursive_mutex> lock (infoMutex);
866
894
std::vector<shared_ptr<LocatedBlock>> blocks;
867
895
int64_t lengthOfCompleteBlk = lbs->getFileLength ();
868
896
bool readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
@@ -1185,7 +1213,9 @@ void InputStreamImpl::close() {
1185
1213
prefetchSize = 0 ;
1186
1214
blockReader.reset ();
1187
1215
curBlock.reset ();
1216
+ infoMutex.lock ();
1188
1217
lbs.reset ();
1218
+ infoMutex.unlock ();
1189
1219
conf.reset ();
1190
1220
failedNodes.clear ();
1191
1221
path.clear ();
0 commit comments