Skip to content

Commit 4b26f16

Browse files
szetszwosymious
authored andcommitted
RATIS-1220. FileStore stream to send small packets for MappedByteBuffer and NettyFileRegion. (apache#338)
* RATIS-1220. FileStore stream to send small packets for MappedByteBuffer and NettyFileRegion. * Fix a bug. * Fix checkstyle * Use closeAsync() instaed of try-with-resource.
1 parent f83c66a commit 4b26f16

File tree

4 files changed

+167
-58
lines changed

4 files changed

+167
-58
lines changed

ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.slf4j.LoggerFactory;
3131

3232
import java.io.IOException;
33-
import java.io.RandomAccessFile;
3433
import java.nio.ByteBuffer;
3534
import java.nio.channels.SeekableByteChannel;
3635
import java.nio.file.Files;
@@ -68,10 +67,6 @@ long getCommittedSize() {
6867
"File " + getRelativePath() + " size is unknown.");
6968
}
7069

71-
void flush() throws IOException {
72-
// no-op
73-
}
74-
7570
ByteString read(CheckedFunction<Path, Path, IOException> resolver, long offset, long length, boolean readCommitted)
7671
throws IOException {
7772
if (readCommitted && offset + length > getCommittedSize()) {
@@ -186,8 +181,7 @@ CompletableFuture<Integer> submitCreate(
186181
+ close + ") @" + id + ":" + index;
187182
final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
188183
if (out == null) {
189-
out = new FileStore.FileStoreDataChannel(new RandomAccessFile(resolver.apply(getRelativePath()).toFile(),
190-
"rw"));
184+
out = new FileStore.FileStoreDataChannel(resolver.apply(getRelativePath()));
191185
}
192186
return write(0L, data, close, sync);
193187
}, name);

ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,22 +259,25 @@ public CompletableFuture<FileStoreDataChannel> createDataChannel(String p) {
259259
return CompletableFuture.supplyAsync(() -> {
260260
try {
261261
final Path full = resolve(normalize(p));
262-
return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw"));
262+
return new FileStoreDataChannel(full);
263263
} catch (IOException e) {
264264
throw new CompletionException("Failed to create " + p, e);
265265
}
266266
}, writer);
267267
}
268268

269269
static class FileStoreDataChannel implements StateMachine.DataChannel {
270+
private final Path path;
270271
private final RandomAccessFile randomAccessFile;
271272

272-
FileStoreDataChannel(RandomAccessFile file) {
273-
randomAccessFile = file;
273+
FileStoreDataChannel(Path path) throws FileNotFoundException {
274+
this.path = path;
275+
this.randomAccessFile = new RandomAccessFile(path.toFile(), "rw");
274276
}
275277

276278
@Override
277279
public void force(boolean metadata) throws IOException {
280+
LOG.debug("force({}) at {}", metadata, path);
278281
randomAccessFile.getChannel().force(metadata);
279282
}
280283

ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public long write(String path, long offset, boolean close, ByteBuffer buffer, bo
153153
return WriteReplyProto.parseFrom(reply).getLength();
154154
}
155155

156-
public DataStreamOutput getStreamOutput(String path, int dataSize) {
156+
public DataStreamOutput getStreamOutput(String path, long dataSize) {
157157
final StreamWriteRequestProto header = StreamWriteRequestProto.newBuilder()
158158
.setPath(ProtoUtils.toByteString(path))
159159
.setLength(dataSize)

ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java

Lines changed: 159 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import org.apache.ratis.examples.filestore.FileStoreClient;
2525
import org.apache.ratis.protocol.DataStreamReply;
2626
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
27-
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator;
2827
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
28+
import org.apache.ratis.util.JavaUtils;
2929
import org.apache.ratis.util.Preconditions;
3030

3131
import java.io.File;
@@ -34,35 +34,74 @@
3434
import java.nio.MappedByteBuffer;
3535
import java.nio.channels.FileChannel;
3636
import java.util.ArrayList;
37+
import java.util.Arrays;
3738
import java.util.Collections;
3839
import java.util.HashMap;
3940
import java.util.List;
4041
import java.util.Map;
42+
import java.util.Optional;
4143
import java.util.concurrent.CompletableFuture;
44+
import java.util.function.BiFunction;
4245

4346
/**
4447
* Subcommand to generate load in filestore data stream state machine.
4548
*/
4649
@Parameters(commandDescription = "Load Generator for FileStore DataStream")
4750
public class DataStream extends Client {
51+
enum Type {
52+
DirectByteBuffer(DirectByteBufferType::new),
53+
MappedByteBuffer(MappedByteBufferType::new),
54+
NettyFileRegion(NettyFileRegionType::new);
4855

49-
@Parameter(names = {"--type"}, description = "DirectByteBuffer, MappedByteBuffer, NettyFileRegion", required = true)
50-
private String dataStreamType = "NettyFileRegion";
56+
private final BiFunction<String, DataStream, TransferType> constructor;
57+
58+
Type(BiFunction<String, DataStream, TransferType> constructor) {
59+
this.constructor = constructor;
60+
}
61+
62+
BiFunction<String, DataStream, TransferType> getConstructor() {
63+
return constructor;
64+
}
65+
66+
static Type valueOfIgnoreCase(String s) {
67+
for (Type type : values()) {
68+
if (type.name().equalsIgnoreCase(s)) {
69+
return type;
70+
}
71+
}
72+
return null;
73+
}
74+
}
75+
76+
// To be used as a Java annotation attribute value
77+
private static final String DESCRIPTION = "[DirectByteBuffer, MappedByteBuffer, NettyFileRegion]";
78+
79+
{
80+
// Assert if the description is correct.
81+
final String expected = Arrays.asList(Type.values()).toString();
82+
Preconditions.assertTrue(expected.equals(DESCRIPTION),
83+
() -> "Unexpected description: " + DESCRIPTION + " does not equal to the expected string " + expected);
84+
}
85+
86+
@Parameter(names = {"--type"}, description = DESCRIPTION, required = true)
87+
private String dataStreamType = Type.NettyFileRegion.name();
5188

5289
@Parameter(names = {"--syncSize"}, description = "Sync every syncSize, syncSize % bufferSize should be zero," +
5390
"-1 means on sync", required = true)
5491
private int syncSize = -1;
5592

93+
int getSyncSize() {
94+
return syncSize;
95+
}
96+
5697
private boolean checkParam() {
5798
if (syncSize != -1 && syncSize % getBufferSizeInBytes() != 0) {
5899
System.err.println("Error: syncSize % bufferSize should be zero");
59100
return false;
60101
}
61102

62-
if (!dataStreamType.equals("DirectByteBuffer") &&
63-
!dataStreamType.equals("MappedByteBuffer") &&
64-
!dataStreamType.equals("NettyFileRegion")) {
65-
System.err.println("Error: dataStreamType should be one of DirectByteBuffer, MappedByteBuffer, transferTo");
103+
if (Type.valueOfIgnoreCase(dataStreamType) == null) {
104+
System.err.println("Error: dataStreamType should be one of " + DESCRIPTION);
66105
return false;
67106
}
68107

@@ -101,18 +140,11 @@ private Map<String, List<CompletableFuture<DataStreamReply>>> streamWrite(
101140
final long fileLength = file.length();
102141
Preconditions.assertTrue(fileLength == getFileSizeInBytes(), "Unexpected file size: expected size is "
103142
+ getFileSizeInBytes() + " but actual size is " + fileLength);
104-
FileInputStream fis = new FileInputStream(file);
105-
final DataStreamOutput dataStreamOutput = fileStoreClient.getStreamOutput(path, (int) file.length());
106-
107-
if (dataStreamType.equals("DirectByteBuffer")) {
108-
fileMap.put(path, writeByDirectByteBuffer(dataStreamOutput, fis.getChannel()));
109-
} else if (dataStreamType.equals("MappedByteBuffer")) {
110-
fileMap.put(path, writeByMappedByteBuffer(dataStreamOutput, fis.getChannel()));
111-
} else if (dataStreamType.equals("NettyFileRegion")) {
112-
fileMap.put(path, writeByNettyFileRegion(dataStreamOutput, file));
113-
}
114143

115-
dataStreamOutput.closeAsync();
144+
final Type type = Optional.ofNullable(Type.valueOfIgnoreCase(dataStreamType))
145+
.orElseThrow(IllegalStateException::new);
146+
final TransferType writer = type.getConstructor().apply(path, this);
147+
fileMap.put(path, writer.transfer(fileStoreClient));
116148
}
117149
return fileMap;
118150
}
@@ -134,46 +166,126 @@ private long waitStreamFinish(Map<String, List<CompletableFuture<DataStreamReply
134166
return totalBytes;
135167
}
136168

137-
private List<CompletableFuture<DataStreamReply>> writeByDirectByteBuffer(DataStreamOutput dataStreamOutput,
138-
FileChannel fileChannel) throws IOException {
139-
final int fileSize = getFileSizeInBytes();
140-
final int bufferSize = getBufferSizeInBytes();
141-
if (fileSize <= 0) {
142-
return Collections.emptyList();
169+
abstract static class TransferType {
170+
private final String path;
171+
private final File file;
172+
private final long fileSize;
173+
private final int bufferSize;
174+
private final long syncSize;
175+
private long syncPosition = 0;
176+
177+
TransferType(String path, DataStream cli) {
178+
this.path = path;
179+
this.file = new File(path);
180+
this.fileSize = cli.getFileSizeInBytes();
181+
this.bufferSize = cli.getBufferSizeInBytes();
182+
this.syncSize = cli.getSyncSize();
183+
184+
final long actualSize = file.length();
185+
Preconditions.assertTrue(actualSize == fileSize, () -> "Unexpected file size: expected size is "
186+
+ fileSize + " but actual size is " + actualSize + ", path=" + path);
187+
}
188+
189+
File getFile() {
190+
return file;
191+
}
192+
193+
int getBufferSize() {
194+
return bufferSize;
195+
}
196+
197+
long getPacketSize(long offset) {
198+
return Math.min(bufferSize, fileSize - offset);
143199
}
144-
List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
145-
final ByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
146200

147-
for(long offset = 0L; offset < fileSize;) {
148-
final ByteBuf buf = alloc.directBuffer(bufferSize);
149-
final int bytesRead = buf.writeBytes(fileChannel, bufferSize);
201+
boolean isSync(long position) {
202+
if (syncSize > 0) {
203+
if (position >= fileSize || position - syncPosition >= syncSize) {
204+
syncPosition = position;
205+
return true;
206+
}
207+
}
208+
return false;
209+
}
210+
211+
List<CompletableFuture<DataStreamReply>> transfer(FileStoreClient client) throws IOException {
212+
if (fileSize <= 0) {
213+
return Collections.emptyList();
214+
}
215+
216+
final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
217+
final DataStreamOutput out = client.getStreamOutput(path, fileSize);
218+
try (FileInputStream fis = new FileInputStream(file)) {
219+
final FileChannel in = fis.getChannel();
220+
for (long offset = 0L; offset < fileSize; ) {
221+
offset += write(in, out, offset, futures);
222+
}
223+
} catch (Throwable e) {
224+
throw new IOException("Failed to transfer " + path);
225+
} finally {
226+
futures.add(out.closeAsync());
227+
}
228+
return futures;
229+
}
230+
231+
abstract long write(FileChannel in, DataStreamOutput out, long offset,
232+
List<CompletableFuture<DataStreamReply>> futures) throws IOException;
233+
234+
@Override
235+
public String toString() {
236+
return JavaUtils.getClassSimpleName(getClass()) + "{" + path + ", size=" + fileSize + "}";
237+
}
238+
}
239+
240+
static class DirectByteBufferType extends TransferType {
241+
DirectByteBufferType(String path, DataStream cli) {
242+
super(path, cli);
243+
}
244+
245+
@Override
246+
long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures)
247+
throws IOException {
248+
final int bufferSize = getBufferSize();
249+
final ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize);
250+
final int bytesRead = buf.writeBytes(in, bufferSize);
150251
if (bytesRead < 0) {
151-
throw new IllegalStateException("Failed to read " + fileSize
152-
+ " byte(s). The channel has reached end-of-stream at " + offset);
252+
throw new IllegalStateException("Failed to read " + bufferSize + " byte(s) from " + this
253+
+ ". The channel has reached end-of-stream at " + offset);
153254
} else if (bytesRead > 0) {
154-
offset += bytesRead;
155-
final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(buf.nioBuffer(),
156-
syncSize > 0 && (offset == fileSize || offset % syncSize == 0));
255+
final CompletableFuture<DataStreamReply> f = out.writeAsync(buf.nioBuffer(), isSync(offset + bytesRead));
157256
f.thenRun(buf::release);
158257
futures.add(f);
159258
}
259+
return bytesRead;
160260
}
161-
162-
return futures;
163261
}
164262

165-
private List<CompletableFuture<DataStreamReply>> writeByMappedByteBuffer(DataStreamOutput dataStreamOutput,
166-
FileChannel fileChannel) throws IOException {
167-
List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
168-
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, getFileSizeInBytes());
169-
futures.add(dataStreamOutput.writeAsync(mappedByteBuffer));
170-
return futures;
263+
static class MappedByteBufferType extends TransferType {
264+
MappedByteBufferType(String path, DataStream cli) {
265+
super(path, cli);
266+
}
267+
268+
@Override
269+
long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures)
270+
throws IOException {
271+
final long packetSize = getPacketSize(offset);
272+
final MappedByteBuffer mappedByteBuffer = in.map(FileChannel.MapMode.READ_ONLY, offset, packetSize);
273+
final int remaining = mappedByteBuffer.remaining();
274+
futures.add(out.writeAsync(mappedByteBuffer, isSync(offset + remaining)));
275+
return remaining;
276+
}
171277
}
172278

173-
private List<CompletableFuture<DataStreamReply>> writeByNettyFileRegion(
174-
DataStreamOutput dataStreamOutput, File file) {
175-
List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
176-
futures.add(dataStreamOutput.writeAsync(file));
177-
return futures;
279+
static class NettyFileRegionType extends TransferType {
280+
NettyFileRegionType(String path, DataStream cli) {
281+
super(path, cli);
282+
}
283+
284+
@Override
285+
long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures) {
286+
final long packetSize = getPacketSize(offset);
287+
futures.add(out.writeAsync(getFile(), offset, packetSize, isSync(offset + packetSize)));
288+
return packetSize;
289+
}
178290
}
179291
}

0 commit comments

Comments
 (0)