@@ -715,7 +715,6 @@ int32_t InputStreamImpl::readOneBlock(char * buf, int32_t size, bool shouldUpdat
715
715
*/
716
716
int32_t InputStreamImpl::readInternal (char * buf, int32_t size) {
717
717
int updateMetadataOnFailure = conf->getMaxReadBlockRetry ();
718
- bool isInfoMutexLock = false ;
719
718
720
719
try {
721
720
do {
@@ -731,20 +730,16 @@ int32_t InputStreamImpl::readInternal(char * buf, int32_t size) {
731
730
* Do RPC failover work in updateBlockInfos.
732
731
*/
733
732
updateBlockInfos ();
734
- if (isInfoMutexLock) {
735
- isInfoMutexLock = false ;
736
- infoMutex.unlock ();
737
- }
733
+ }
738
734
739
- /*
740
- * We already have the up-to-date block information,
741
- * Check if we reach the end of file.
742
- */
743
- if (cursor >= getFileLength ()) {
744
- THROW_NO_STACK (HdfsEndOfStream,
745
- " InputStreamImpl: read over EOF, current position: %" PRId64 " , read size: %d, from file: %s" ,
746
- cursor, size, path.c_str ());
747
- }
735
+ /*
736
+ * We already have the up-to-date block information,
737
+ * Check if we reach the end of file.
738
+ */
739
+ if (cursor >= getFileLength ()) {
740
+ THROW_NO_STACK (HdfsEndOfStream,
741
+ " InputStreamImpl: read over EOF, current position: %" PRId64 " , read size: %d, from file: %s" ,
742
+ cursor, size, path.c_str ());
748
743
}
749
744
750
745
/*
@@ -774,9 +769,15 @@ int32_t InputStreamImpl::readInternal(char * buf, int32_t size) {
774
769
* We will update metadata once and try again.
775
770
*/
776
771
if (retval < 0 ) {
777
- infoMutex.lock ();
778
- isInfoMutexLock = true ;
779
- lbs.reset ();
772
+ {
773
+ lock_guard<std::recursive_mutex> lock (infoMutex);
774
+ lbs.reset ();
775
+ /*
776
+ * update block infos right now after lbs is reset
777
+ * to ensure pread can read non-empty lbs.
778
+ */
779
+ updateBlockInfos ();
780
+ }
780
781
endOfCurBlock = 0 ;
781
782
--updateMetadataOnFailure;
782
783
@@ -788,26 +789,13 @@ int32_t InputStreamImpl::readInternal(char * buf, int32_t size) {
788
789
continue ;
789
790
}
790
791
791
- if (isInfoMutexLock) {
792
- isInfoMutexLock = false ;
793
- infoMutex.unlock ();
794
- }
795
792
return retval;
796
793
} while (true );
797
794
} catch (const HdfsCanceled & e) {
798
- if (isInfoMutexLock) {
799
- infoMutex.unlock ();
800
- }
801
795
throw ;
802
796
} catch (const HdfsEndOfStream & e) {
803
- if (isInfoMutexLock) {
804
- infoMutex.unlock ();
805
- }
806
797
throw ;
807
798
} catch (const HdfsException & e) {
808
- if (isInfoMutexLock) {
809
- infoMutex.unlock ();
810
- }
811
799
/*
812
800
* wrap the underlying error and rethrow.
813
801
*/
@@ -827,25 +815,22 @@ int32_t InputStreamImpl::readInternal(char * buf, int32_t size) {
827
815
int32_t InputStreamImpl::preadInternal (char * buf, int32_t size, int64_t position) {
828
816
int64_t cursor = position;
829
817
try {
830
- infoMutex.lock ();
831
- if (!lbs) {
832
- THROW (HdfsIOException, " InputStreamImpl: lbs is empty, cannot pread file: %s, from position %" PRId64 " , size: %d." ,
833
- path.c_str (), cursor, size);
834
- }
835
- int64_t filelen = getFileLength ();
836
- if ((position < 0 ) || (position >= filelen)) {
837
- infoMutex.unlock ();
838
- return -1 ;
839
- }
840
818
int32_t realLen = size;
841
- if ((position + size) > filelen) {
842
- realLen = (int32_t )(filelen - position);
843
- }
819
+ std::vector<shared_ptr<LocatedBlock>> blockRange;
820
+ {
821
+ lock_guard<std::recursive_mutex> lock (infoMutex);
822
+ int64_t filelen = getFileLength ();
823
+ if ((position < 0 ) || (position >= filelen)) {
824
+ return -1 ;
825
+ }
826
+ if ((position + size) > filelen) {
827
+ realLen = (int32_t ) (filelen - position);
828
+ }
844
829
845
- // determine the block and byte range within the block
846
- // corresponding to position and realLen
847
- std::vector<shared_ptr<LocatedBlock>> blockRange = getBlockRange (position, (int64_t )realLen);
848
- infoMutex. unlock ();
830
+ // determine the block and byte range within the block
831
+ // corresponding to position and realLen
832
+ blockRange = getBlockRange (position, (int64_t ) realLen);
833
+ }
849
834
int32_t remaining = realLen;
850
835
int32_t bytesHasRead = 0 ;
851
836
for (shared_ptr<LocatedBlock> blk : blockRange) {
@@ -1213,9 +1198,10 @@ void InputStreamImpl::close() {
1213
1198
prefetchSize = 0 ;
1214
1199
blockReader.reset ();
1215
1200
curBlock.reset ();
1216
- infoMutex.lock ();
1217
- lbs.reset ();
1218
- infoMutex.unlock ();
1201
+ {
1202
+ lock_guard<std::recursive_mutex> lock (infoMutex);
1203
+ lbs.reset ();
1204
+ }
1219
1205
conf.reset ();
1220
1206
failedNodes.clear ();
1221
1207
path.clear ();
0 commit comments