Skip to content

Commit

Permalink
MNEMONIC-232: Improve the MneDurableInputSession to accept multiple p…
Browse files Browse the repository at this point in the history
…aths
  • Loading branch information
Wang, Gang(Gary) committed Mar 23, 2017
1 parent 7799786 commit 9373f5e
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class MneDurableInputSession<V>
private Iterator<String> m_fp_iter;

public MneDurableInputSession(TaskAttemptContext taskAttemptContext,
Configuration configuration, Path path, String prefix) {
Configuration configuration, Path[] paths, String prefix) {
if (null == taskAttemptContext && null == configuration) {
throw new ConfigurationException("Session is not configured properly");
}
Expand All @@ -55,15 +55,17 @@ public MneDurableInputSession(TaskAttemptContext taskAttemptContext,
} else {
setConfiguration(configuration);
}
initialize(path, prefix);
initialize(paths, prefix);
}

public void initialize(Path path, String prefix) {
if (!Files.isRegularFile(Paths.get(path.toString()), LinkOption.NOFOLLOW_LINKS)) {
throw new UnsupportedOperationException();
}
public void initialize(Path[] paths, String prefix) {
List<String> fpathlist = new ArrayList<String>();
fpathlist.add(path.toString());
for (Path p : paths) {
if (!Files.isRegularFile(Paths.get(p.toString()), LinkOption.NOFOLLOW_LINKS)) {
throw new UnsupportedOperationException();
}
fpathlist.add(p.toString());
}
m_fp_iter = fpathlist.iterator();
readConfig(prefix);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
Expand All @@ -46,7 +47,7 @@ public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
public MneMapredRecordReader(FileSplit fileSplit, JobConf conf) throws IOException {
m_fileSplit = fileSplit;
m_session = new MneDurableInputSession<V>(null, conf,
m_fileSplit.getPath(), MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
new Path[]{m_fileSplit.getPath()}, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
m_iter = m_session.iterator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
Expand Down Expand Up @@ -51,7 +52,7 @@ public void close() throws IOException {
public void initialize(InputSplit inputSplit, TaskAttemptContext context) {
FileSplit split = (FileSplit) inputSplit;
m_session = new MneDurableInputSession<V>(context, null,
split.getPath(), MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
new Path[]{split.getPath()}, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
m_iter = m_session.iterator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@
import org.apache.mnemonic.DurableType;
import org.apache.mnemonic.Utils;
import org.apache.mnemonic.hadoop.MneConfigHelper;
import org.apache.mnemonic.hadoop.MneDurableInputSession;
import org.apache.mnemonic.hadoop.MneDurableInputValue;
import org.apache.mnemonic.hadoop.MneDurableOutputSession;
import org.apache.mnemonic.hadoop.MneDurableOutputValue;
import org.apache.mnemonic.hadoop.mapreduce.MneInputFormat;
import org.apache.mnemonic.hadoop.mapreduce.MneOutputFormat;
import org.apache.mnemonic.sessions.SessionIterator;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
Expand All @@ -73,7 +75,6 @@ public class MneMapreduceChunkDataTest {
private long m_reccnt = 5000L;
private volatile long m_checksum;
private volatile long m_totalsize = 0L;
private List<String> m_partfns;
private Unsafe unsafe;

@BeforeClass
Expand All @@ -82,7 +83,6 @@ public void setUp() throws Exception {
System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
m_conf = new JobConf();
m_rand = Utils.createRandom();
m_partfns = new ArrayList<String>();
unsafe = Utils.getUnsafe();

try {
Expand Down Expand Up @@ -164,6 +164,7 @@ public void testWriteChunkData() throws Exception {

@Test(enabled = true, dependsOnMethods = { "testWriteChunkData" })
public void testReadChunkData() throws Exception {
List<String> partfns = new ArrayList<String>();
long reccnt = 0L;
long tsize = 0L;
Checksum cs = new CRC32();
Expand All @@ -174,14 +175,14 @@ public void testReadChunkData() throws Exception {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
m_partfns.add(listfiles[idx].getName());
partfns.add(listfiles[idx].getName());
}
}
Collections.sort(m_partfns); // keep the order for checksum
for (int idx = 0; idx < m_partfns.size(); ++idx) {
System.out.println(String.format("Verifying : %s", m_partfns.get(idx)));
Collections.sort(partfns); // keep the order for checksum
for (int idx = 0; idx < partfns.size(); ++idx) {
System.out.println(String.format("Verifying : %s", partfns.get(idx)));
FileSplit split = new FileSplit(
new Path(m_workdir, m_partfns.get(idx)), 0, 0L, new String[0]);
new Path(m_workdir, partfns.get(idx)), 0, 0L, new String[0]);
InputFormat<NullWritable, MneDurableInputValue<DurableChunk<?>>> inputFormat =
new MneInputFormat<MneDurableInputValue<DurableChunk<?>>, DurableChunk<?>>();
RecordReader<NullWritable, MneDurableInputValue<DurableChunk<?>>> reader =
Expand All @@ -204,4 +205,47 @@ public void testReadChunkData() throws Exception {
AssertJUnit.assertEquals(m_checksum, cs.getValue());
System.out.println(String.format("The checksum of chunk is %d", m_checksum));
}

@Test(enabled = true, dependsOnMethods = { "testWriteChunkData" })
public void testBatchReadChunkDataUsingInputSession() throws Exception {
List<String> partfns = new ArrayList<String>();
long reccnt = 0L;
long tsize = 0L;
Checksum cs = new CRC32();
cs.reset();
File folder = new File(m_workdir.toString());
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
partfns.add(listfiles[idx].getName());
}
}
Collections.sort(partfns); // keep the order for checksum
List<Path> paths = new ArrayList<Path>();
for (String fns : partfns) {
paths.add(new Path(m_workdir, fns));
System.out.println(String.format("[Batch Mode] Added : %s", fns));
}
MneDurableInputSession<MneDurableInputValue<DurableChunk<?>>> m_session =
new MneDurableInputSession<MneDurableInputValue<DurableChunk<?>>>(m_tacontext, null,
paths.toArray(new Path[0]), MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
SessionIterator<MneDurableInputValue<DurableChunk<?>>, ?> m_iter = m_session.iterator();
MneDurableInputValue<DurableChunk<?>> dchkval = null;
while (m_iter.hasNext()) {
dchkval = m_iter.next();
byte b;
for (int j = 0; j < dchkval.getValue().getSize(); ++j) {
b = unsafe.getByte(dchkval.getValue().get() + j);
cs.update(b);
}
tsize += dchkval.getValue().getSize();
++reccnt;
}
AssertJUnit.assertEquals(m_reccnt, reccnt);
AssertJUnit.assertEquals(m_totalsize, tsize);
AssertJUnit.assertEquals(m_checksum, cs.getValue());
System.out.println(String.format("The checksum of chunk is %d [Batch Mode]", m_checksum));
}
}

0 comments on commit 9373f5e

Please sign in to comment.