@@ -146,6 +146,12 @@ object BlockFetcherIterator {
146
146
}
147
147
148
148
protected def splitLocalRemoteBlocks (): ArrayBuffer [FetchRequest ] = {
149
+ // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them
150
+ // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
151
+ // nodes, rather than blocking on reading output from one node.
152
+ val maxRequestSize = math.max(maxBytesInFlight / 5 , 1L )
153
+ logInfo(" maxBytesInFlight: " + maxBytesInFlight + " , maxRequestSize: " + maxRequestSize)
154
+
149
155
// Split local and remote blocks. Remote blocks are further split into FetchRequests of size
150
156
// at most maxBytesInFlight in order to limit the amount of data in flight.
151
157
val remoteRequests = new ArrayBuffer [FetchRequest ]
@@ -157,11 +163,6 @@ object BlockFetcherIterator {
157
163
_numBlocksToFetch += localBlocksToFetch.size
158
164
} else {
159
165
numRemote += blockInfos.size
160
- // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them
161
- // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
162
- // nodes, rather than blocking on reading output from one node.
163
- val minRequestSize = math.max(maxBytesInFlight / 5 , 1L )
164
- logInfo(" maxBytesInFlight: " + maxBytesInFlight + " , minRequest: " + minRequestSize)
165
166
val iterator = blockInfos.iterator
166
167
var curRequestSize = 0L
167
168
var curBlocks = new ArrayBuffer [(BlockId , Long )]
@@ -176,11 +177,12 @@ object BlockFetcherIterator {
176
177
} else if (size < 0 ) {
177
178
throw new BlockException (blockId, " Negative block size " + size)
178
179
}
179
- if (curRequestSize >= minRequestSize ) {
180
+ if (curRequestSize >= maxRequestSize ) {
180
181
// Add this FetchRequest
181
182
remoteRequests += new FetchRequest (address, curBlocks)
182
183
curRequestSize = 0
183
184
curBlocks = new ArrayBuffer [(BlockId , Long )]
185
+ logDebug(s " Creating fetch request of $curRequestSize at $address" )
184
186
}
185
187
}
186
188
// Add in the final request
@@ -189,7 +191,7 @@ object BlockFetcherIterator {
189
191
}
190
192
}
191
193
}
192
- logInfo(" Getting " + _numBlocksToFetch + " non-zero-bytes blocks out of " +
194
+ logInfo(" Getting " + _numBlocksToFetch + " non-empty blocks out of " +
193
195
totalBlocks + " blocks" )
194
196
remoteRequests
195
197
}
@@ -224,8 +226,8 @@ object BlockFetcherIterator {
224
226
sendRequest(fetchRequests.dequeue())
225
227
}
226
228
227
- val numGets = remoteRequests.size - fetchRequests.size
228
- logInfo(" Started " + numGets + " remote gets in " + Utils .getUsedTimeMs(startTime))
229
+ val numFetches = remoteRequests.size - fetchRequests.size
230
+ logInfo(" Started " + numFetches + " remote fetches in" + Utils .getUsedTimeMs(startTime))
229
231
230
232
// Get Local Blocks
231
233
startTime = System .currentTimeMillis
@@ -325,7 +327,7 @@ object BlockFetcherIterator {
325
327
}
326
328
327
329
copiers = startCopiers(conf.getInt(" spark.shuffle.copier.threads" , 6 ))
328
- logInfo(" Started " + fetchRequestsSync.size + " remote gets in " +
330
+ logInfo(" Started " + fetchRequestsSync.size + " remote fetches in " +
329
331
Utils .getUsedTimeMs(startTime))
330
332
331
333
// Get Local Blocks
0 commit comments