Skip to content

HADOOP-1381. The distance between sync blocks in SequenceFiles should be configurable #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.rmi.server.UID;
import java.security.MessageDigest;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.*;
import org.apache.hadoop.util.Options;
import org.apache.hadoop.fs.*;
Expand Down Expand Up @@ -146,7 +147,7 @@
* </ul>
* </li>
* <li>
* A sync-marker every few <code>100</code> bytes or so.
* A sync-marker every few <code>100</code> kilobytes or so.
* </li>
* </ul>
*
Expand All @@ -165,7 +166,7 @@
* </ul>
* </li>
* <li>
* A sync-marker every few <code>100</code> bytes or so.
* A sync-marker every few <code>100</code> kilobytes or so.
* </li>
* </ul>
*
Expand Down Expand Up @@ -217,8 +218,11 @@ private SequenceFile() {} // no public ctor
private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash

/** The number of bytes between sync points.*/
public static final int SYNC_INTERVAL = 100*SYNC_SIZE;
/**
* The number of bytes between sync points. 100 KB, default.
* Computed as 5 KB * 20 = 100 KB
*/
public static final int SYNC_INTERVAL = 5 * 1024 * SYNC_SIZE; // 5KB*(16+4)

/**
* The compression type used to compress key/value pairs in the
Expand Down Expand Up @@ -856,6 +860,9 @@ public static class Writer implements java.io.Closeable, Syncable {
// starts and ends by scanning for this value.
long lastSyncPos; // position of last sync
byte[] sync; // 16 random bytes
@VisibleForTesting
int syncInterval;

{
try {
MessageDigest digester = MessageDigest.getInstance("MD5");
Expand Down Expand Up @@ -987,7 +994,16 @@ public static Option file(Path value) {
private static Option filesystem(FileSystem fs) {
return new SequenceFile.Writer.FileSystemOption(fs);
}


private static class SyncIntervalOption extends Options.IntegerOption
implements Option {
SyncIntervalOption(int val) {
// If a negative sync interval is provided,
// fall back to the default sync interval.
super(val < 0 ? SYNC_INTERVAL : val);
}
}

public static Option bufferSize(int value) {
return new BufferSizeOption(value);
}
Expand Down Expand Up @@ -1032,11 +1048,15 @@ public static Option compression(CompressionType value,
CompressionCodec codec) {
return new CompressionOption(value, codec);
}


public static Option syncInterval(int value) {
return new SyncIntervalOption(value);
}

/**
* Construct a uncompressed writer from a set of options.
* @param conf the configuration to use
* @param options the options used when creating the writer
* @param opts the options used when creating the writer
* @throws IOException if it fails
*/
Writer(Configuration conf,
Expand All @@ -1062,6 +1082,8 @@ public static Option compression(CompressionType value,
Options.getOption(MetadataOption.class, opts);
CompressionOption compressionTypeOption =
Options.getOption(CompressionOption.class, opts);
SyncIntervalOption syncIntervalOption =
Options.getOption(SyncIntervalOption.class, opts);
// check consistency of options
if ((fileOption == null) == (streamOption == null)) {
throw new IllegalArgumentException("file or stream must be specified");
Expand Down Expand Up @@ -1163,7 +1185,12 @@ public static Option compression(CompressionType value,
"GzipCodec without native-hadoop " +
"code!");
}
init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
this.syncInterval = (syncIntervalOption == null) ?
SYNC_INTERVAL :
syncIntervalOption.getValue();
init(
conf, out, ownStream, keyClass, valueClass,
codec, metadata, syncInterval);
}

/** Create the named file.
Expand All @@ -1176,7 +1203,7 @@ public Writer(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass) throws IOException {
this.compress = CompressionType.NONE;
init(conf, fs.create(name), true, keyClass, valClass, null,
new Metadata());
new Metadata(), SYNC_INTERVAL);
}

/** Create the named file with write-progress reporter.
Expand All @@ -1190,7 +1217,7 @@ public Writer(FileSystem fs, Configuration conf, Path name,
Progressable progress, Metadata metadata) throws IOException {
this.compress = CompressionType.NONE;
init(conf, fs.create(name, progress), true, keyClass, valClass,
null, metadata);
null, metadata, SYNC_INTERVAL);
}

/** Create the named file with write-progress reporter.
Expand All @@ -1206,7 +1233,7 @@ public Writer(FileSystem fs, Configuration conf, Path name,
this.compress = CompressionType.NONE;
init(conf,
fs.create(name, true, bufferSize, replication, blockSize, progress),
true, keyClass, valClass, null, metadata);
true, keyClass, valClass, null, metadata, SYNC_INTERVAL);
}

boolean isCompressed() { return compress != CompressionType.NONE; }
Expand Down Expand Up @@ -1234,18 +1261,21 @@ private void writeFileHeader()

/** Initialize. */
@SuppressWarnings("unchecked")
void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
Class keyClass, Class valClass,
CompressionCodec codec, Metadata metadata)
void init(Configuration config, FSDataOutputStream outStream,
boolean ownStream, Class key, Class val,
CompressionCodec compCodec, Metadata meta,
int syncIntervalVal)
throws IOException {
this.conf = conf;
this.out = out;
this.conf = config;
this.out = outStream;
this.ownOutputStream = ownStream;
this.keyClass = keyClass;
this.valClass = valClass;
this.codec = codec;
this.metadata = metadata;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyClass = key;
this.valClass = val;
this.codec = compCodec;
this.metadata = meta;
this.syncInterval = syncIntervalVal;
SerializationFactory serializationFactory =
new SerializationFactory(config);
this.keySerializer = serializationFactory.getSerializer(keyClass);
if (this.keySerializer == null) {
throw new IOException(
Expand Down Expand Up @@ -1366,7 +1396,7 @@ public synchronized void close() throws IOException {

synchronized void checkAndWriteSync() throws IOException {
if (sync != null &&
out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
out.getPos() >= lastSyncPos+this.syncInterval) { // time to emit sync
sync();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;

/** Tests sync based seek reads/write intervals inside SequenceFiles. */
public class TestSequenceFileSync {
private static final int NUMRECORDS = 2000;
private static final int RECORDSIZE = 80;
private static final Random rand = new Random();
private static final Random RAND = new Random();

private final static String REC_FMT = "%d RECORDID %d : ";

Expand All @@ -46,37 +48,110 @@ private static void forOffset(SequenceFile.Reader reader,
reader.next(key, val);
assertEquals(key.get(), expectedRecord);
final String test = String.format(REC_FMT, expectedRecord, expectedRecord);
assertEquals("Invalid value " + val, 0, val.find(test, 0));
assertEquals(
"Invalid value in iter " + iter + ": " + val,
0,
val.find(test, 0));
}

@Test
public void testDefaultSyncInterval() throws IOException {
// Uses the default sync interval of 100 KB
final Configuration conf = new Configuration();
final FileSystem fs = FileSystem.getLocal(conf);
final Path path = new Path(GenericTestUtils.getTempPath(
"sequencefile.sync.test"));
final IntWritable input = new IntWritable();
final Text val = new Text();
SequenceFile.Writer writer = new SequenceFile.Writer(
conf,
SequenceFile.Writer.file(path),
SequenceFile.Writer.compression(CompressionType.NONE),
SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class)
);
try {
writeSequenceFile(writer, NUMRECORDS*4);
for (int i = 0; i < 5; i++) {
final SequenceFile.Reader reader;

//try different SequenceFile.Reader constructors
if (i % 2 == 0) {
final int buffersize = conf.getInt("io.file.buffer.size", 4096);
reader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(path),
SequenceFile.Reader.bufferSize(buffersize));
} else {
final FSDataInputStream in = fs.open(path);
final long length = fs.getFileStatus(path).getLen();
reader = new SequenceFile.Reader(conf,
SequenceFile.Reader.stream(in),
SequenceFile.Reader.start(0L),
SequenceFile.Reader.length(length));
}

try {
forOffset(reader, input, val, i, 0, 0);
forOffset(reader, input, val, i, 65, 0);
// There would be over 1000 records within
// this sync interval
forOffset(reader, input, val, i, 2000, 1101);
forOffset(reader, input, val, i, 0, 0);
} finally {
reader.close();
}
}
} finally {
fs.delete(path, false);
}
}

@Test
public void testLowSyncpoint() throws IOException {
// Uses a smaller sync interval of 2000 bytes
final Configuration conf = new Configuration();
final FileSystem fs = FileSystem.getLocal(conf);
final Path path = new Path(GenericTestUtils.getTempPath(
"sequencefile.sync.test"));
final IntWritable input = new IntWritable();
final Text val = new Text();
SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
IntWritable.class, Text.class);
SequenceFile.Writer writer = new SequenceFile.Writer(
conf,
SequenceFile.Writer.file(path),
SequenceFile.Writer.compression(CompressionType.NONE),
SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class),
SequenceFile.Writer.syncInterval(20*100)
);
// Ensure the custom sync interval value is set
assertEquals(writer.syncInterval, 20*100);
try {
writeSequenceFile(writer, NUMRECORDS);
for (int i = 0; i < 5 ; i++) {
final SequenceFile.Reader reader;
for (int i = 0; i < 5; i++) {
final SequenceFile.Reader reader;

//try different SequenceFile.Reader constructors
if (i % 2 == 0) {
reader = new SequenceFile.Reader(fs, path, conf);
} else {
final FSDataInputStream in = fs.open(path);
final long length = fs.getFileStatus(path).getLen();
final int buffersize = conf.getInt("io.file.buffer.size", 4096);
reader = new SequenceFile.Reader(in, buffersize, 0L, length, conf);
}
//try different SequenceFile.Reader constructors
if (i % 2 == 0) {
final int bufferSize = conf.getInt("io.file.buffer.size", 4096);
reader = new SequenceFile.Reader(
conf,
SequenceFile.Reader.file(path),
SequenceFile.Reader.bufferSize(bufferSize));
} else {
final FSDataInputStream in = fs.open(path);
final long length = fs.getFileStatus(path).getLen();
reader = new SequenceFile.Reader(
conf,
SequenceFile.Reader.stream(in),
SequenceFile.Reader.start(0L),
SequenceFile.Reader.length(length));
}

try {
try {
forOffset(reader, input, val, i, 0, 0);
forOffset(reader, input, val, i, 65, 0);
// There would be only a few records within
// this sync interval
forOffset(reader, input, val, i, 2000, 21);
forOffset(reader, input, val, i, 0, 0);
} finally {
Expand All @@ -88,7 +163,7 @@ public void testLowSyncpoint() throws IOException {
}
}

public static void writeSequenceFile(SequenceFile.Writer writer,
private static void writeSequenceFile(SequenceFile.Writer writer,
int numRecords) throws IOException {
final IntWritable key = new IntWritable();
final Text val = new Text();
Expand All @@ -100,13 +175,13 @@ public static void writeSequenceFile(SequenceFile.Writer writer,
writer.close();
}

static void randomText(Text val, int id, int recordSize) {
private static void randomText(Text val, int id, int recordSize) {
val.clear();
final StringBuilder ret = new StringBuilder(recordSize);
ret.append(String.format(REC_FMT, id, id));
recordSize -= ret.length();
for (int i = 0; i < recordSize; ++i) {
ret.append(rand.nextInt(9));
ret.append(RAND.nextInt(9));
}
val.set(ret.toString());
}
Expand Down