41
41
import java .util .concurrent .atomic .AtomicBoolean ;
42
42
import java .util .concurrent .atomic .AtomicReference ;
43
43
44
- import com .google .common .base .Preconditions ;
45
-
46
44
import org .apache .hadoop .HadoopIllegalArgumentException ;
47
45
import org .apache .hadoop .classification .InterfaceAudience ;
46
+ import org .apache .hadoop .crypto .CryptoProtocolVersion ;
48
47
import org .apache .hadoop .fs .CanSetDropBehind ;
49
48
import org .apache .hadoop .fs .CreateFlag ;
50
49
import org .apache .hadoop .fs .FSOutputSummer ;
55
54
import org .apache .hadoop .fs .permission .FsPermission ;
56
55
import org .apache .hadoop .hdfs .client .HdfsDataOutputStream ;
57
56
import org .apache .hadoop .hdfs .client .HdfsDataOutputStream .SyncFlag ;
58
- import org .apache .hadoop .crypto .CryptoProtocolVersion ;
59
57
import org .apache .hadoop .hdfs .protocol .BlockStoragePolicy ;
60
58
import org .apache .hadoop .hdfs .protocol .DSQuotaExceededException ;
61
59
import org .apache .hadoop .hdfs .protocol .DatanodeInfo ;
83
81
import org .apache .hadoop .hdfs .server .namenode .NotReplicatedYetException ;
84
82
import org .apache .hadoop .hdfs .server .namenode .RetryStartFileException ;
85
83
import org .apache .hadoop .hdfs .server .namenode .SafeModeException ;
84
+ import org .apache .hadoop .hdfs .util .ByteArrayManager ;
86
85
import org .apache .hadoop .io .EnumSetWritable ;
87
86
import org .apache .hadoop .io .IOUtils ;
88
87
import org .apache .hadoop .ipc .RemoteException ;
99
98
import org .htrace .TraceScope ;
100
99
101
100
import com .google .common .annotations .VisibleForTesting ;
101
+ import com .google .common .base .Preconditions ;
102
102
import com .google .common .cache .CacheBuilder ;
103
103
import com .google .common .cache .CacheLoader ;
104
104
import com .google .common .cache .LoadingCache ;
@@ -143,6 +143,7 @@ public class DFSOutputStream extends FSOutputSummer
143
143
144
144
private final DFSClient dfsClient ;
145
145
private final long dfsclientSlowLogThresholdMs ;
146
+ private final ByteArrayManager byteArrayManager ;
146
147
private Socket s ;
147
148
// closed is accessed by different threads under different locks.
148
149
private volatile boolean closed = false ;
@@ -181,14 +182,41 @@ public class DFSOutputStream extends FSOutputSummer
181
182
private static final BlockStoragePolicySuite blockStoragePolicySuite =
182
183
BlockStoragePolicySuite .createDefaultSuite ();
183
184
185
+ /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
186
+ private Packet createPacket (int packetSize , int chunksPerPkt , long offsetInBlock ,
187
+ long seqno ) throws InterruptedIOException {
188
+ final byte [] buf ;
189
+ final int bufferSize = PacketHeader .PKT_MAX_HEADER_LEN + packetSize ;
190
+
191
+ try {
192
+ buf = byteArrayManager .newByteArray (bufferSize );
193
+ } catch (InterruptedException ie ) {
194
+ final InterruptedIOException iioe = new InterruptedIOException (
195
+ "seqno=" + seqno );
196
+ iioe .initCause (ie );
197
+ throw iioe ;
198
+ }
199
+
200
+ return new Packet (buf , chunksPerPkt , offsetInBlock , seqno , getChecksumSize ());
201
+ }
202
+
203
+ /**
204
+ * For heartbeat packets, create buffer directly by new byte[]
205
+ * since heartbeats should not be blocked.
206
+ */
207
+ private Packet createHeartbeatPacket () throws InterruptedIOException {
208
+ final byte [] buf = new byte [PacketHeader .PKT_MAX_HEADER_LEN ];
209
+ return new Packet (buf , 0 , 0 , Packet .HEART_BEAT_SEQNO , getChecksumSize ());
210
+ }
211
+
184
212
private static class Packet {
185
213
private static final long HEART_BEAT_SEQNO = -1L ;
186
214
final long seqno ; // sequencenumber of buffer in block
187
215
final long offsetInBlock ; // offset in block
188
216
boolean syncBlock ; // this packet forces the current block to disk
189
217
int numChunks ; // number of chunks currently in packet
190
218
final int maxChunks ; // max chunks in packet
191
- final byte [] buf ;
219
+ private byte [] buf ;
192
220
private boolean lastPacketInBlock ; // is this the last packet in block?
193
221
194
222
/**
@@ -210,13 +238,6 @@ private static class Packet {
210
238
final int dataStart ;
211
239
int dataPos ;
212
240
213
- /**
214
- * Create a heartbeat packet.
215
- */
216
- Packet (int checksumSize ) {
217
- this (0 , 0 , 0 , HEART_BEAT_SEQNO , checksumSize );
218
- }
219
-
220
241
/**
221
242
* Create a new packet.
222
243
*
@@ -225,15 +246,15 @@ private static class Packet {
225
246
* @param chunksPerPkt maximum number of chunks per packet.
226
247
* @param offsetInBlock offset in bytes into the HDFS block.
227
248
*/
228
- Packet (int pktSize , int chunksPerPkt , long offsetInBlock ,
229
- long seqno , int checksumSize ) {
249
+ private Packet (byte [] buf , int chunksPerPkt , long offsetInBlock , long seqno ,
250
+ int checksumSize ) {
230
251
this .lastPacketInBlock = false ;
231
252
this .numChunks = 0 ;
232
253
this .offsetInBlock = offsetInBlock ;
233
254
this .seqno = seqno ;
234
-
235
- buf = new byte [ PacketHeader . PKT_MAX_HEADER_LEN + pktSize ] ;
236
-
255
+
256
+ this . buf = buf ;
257
+
237
258
checksumStart = PacketHeader .PKT_MAX_HEADER_LEN ;
238
259
checksumPos = checksumStart ;
239
260
dataStart = checksumStart + (chunksPerPkt * checksumSize );
@@ -304,6 +325,11 @@ void writeTo(DataOutputStream stm) throws IOException {
304
325
buf [headerStart +header .getSerializedSize () + checksumLen + dataLen -1 ] ^= 0xff ;
305
326
}
306
327
}
328
+
329
+ private void releaseBuffer (ByteArrayManager bam ) {
330
+ bam .release (buf );
331
+ buf = null ;
332
+ }
307
333
308
334
// get the packet's last byte's offset in the block
309
335
long getLastByteOffsetBlock () {
@@ -547,7 +573,7 @@ public void run() {
547
573
}
548
574
// get packet to be sent.
549
575
if (dataQueue .isEmpty ()) {
550
- one = new Packet ( getChecksumSize ()); // heartbeat packet
576
+ one = createHeartbeatPacket ();
551
577
} else {
552
578
one = dataQueue .getFirst (); // regular data packet
553
579
}
@@ -907,6 +933,8 @@ public void run() {
907
933
lastAckedSeqno = seqno ;
908
934
ackQueue .removeFirst ();
909
935
dataQueue .notifyAll ();
936
+
937
+ one .releaseBuffer (byteArrayManager );
910
938
}
911
939
} catch (Exception e ) {
912
940
if (!responderClosed ) {
@@ -1657,6 +1685,7 @@ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
1657
1685
1658
1686
this .dfsclientSlowLogThresholdMs =
1659
1687
dfsClient .getConf ().dfsclientSlowIoWarningThresholdMs ;
1688
+ this .byteArrayManager = dfsClient .getClientContext ().getByteArrayManager ();
1660
1689
}
1661
1690
1662
1691
/** Construct a new output stream for creating a file. */
@@ -1836,8 +1865,8 @@ protected synchronized void writeChunk(byte[] b, int offset, int len,
1836
1865
}
1837
1866
1838
1867
if (currentPacket == null ) {
1839
- currentPacket = new Packet (packetSize , chunksPerPacket ,
1840
- bytesCurBlock , currentSeqno ++, getChecksumSize () );
1868
+ currentPacket = createPacket (packetSize , chunksPerPacket ,
1869
+ bytesCurBlock , currentSeqno ++);
1841
1870
if (DFSClient .LOG .isDebugEnabled ()) {
1842
1871
DFSClient .LOG .debug ("DFSClient writeChunk allocating new packet seqno=" +
1843
1872
currentPacket .seqno +
@@ -1884,8 +1913,7 @@ protected synchronized void writeChunk(byte[] b, int offset, int len,
1884
1913
// indicate the end of block and reset bytesCurBlock.
1885
1914
//
1886
1915
if (bytesCurBlock == blockSize ) {
1887
- currentPacket = new Packet (0 , 0 , bytesCurBlock ,
1888
- currentSeqno ++, getChecksumSize ());
1916
+ currentPacket = createPacket (0 , 0 , bytesCurBlock , currentSeqno ++);
1889
1917
currentPacket .lastPacketInBlock = true ;
1890
1918
currentPacket .syncBlock = shouldSyncBlock ;
1891
1919
waitAndQueueCurrentPacket ();
@@ -1972,17 +2000,17 @@ private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
1972
2000
// Nothing to send right now,
1973
2001
// but sync was requested.
1974
2002
// Send an empty packet
1975
- currentPacket = new Packet (packetSize , chunksPerPacket ,
1976
- bytesCurBlock , currentSeqno ++, getChecksumSize () );
2003
+ currentPacket = createPacket (packetSize , chunksPerPacket ,
2004
+ bytesCurBlock , currentSeqno ++);
1977
2005
}
1978
2006
} else {
1979
2007
if (isSync && bytesCurBlock > 0 ) {
1980
2008
// Nothing to send right now,
1981
2009
// and the block was partially written,
1982
2010
// and sync was requested.
1983
2011
// So send an empty sync packet.
1984
- currentPacket = new Packet (packetSize , chunksPerPacket ,
1985
- bytesCurBlock , currentSeqno ++, getChecksumSize () );
2012
+ currentPacket = createPacket (packetSize , chunksPerPacket ,
2013
+ bytesCurBlock , currentSeqno ++);
1986
2014
} else {
1987
2015
// just discard the current packet since it is already been sent.
1988
2016
currentPacket = null ;
@@ -2186,7 +2214,7 @@ public synchronized void close() throws IOException {
2186
2214
2187
2215
if (bytesCurBlock != 0 ) {
2188
2216
// send an empty packet to mark the end of the block
2189
- currentPacket = new Packet (0 , 0 , bytesCurBlock , currentSeqno ++, getChecksumSize () );
2217
+ currentPacket = createPacket (0 , 0 , bytesCurBlock , currentSeqno ++);
2190
2218
currentPacket .lastPacketInBlock = true ;
2191
2219
currentPacket .syncBlock = shouldSyncBlock ;
2192
2220
}
0 commit comments