Skip to content

Commit

Permalink
[CELEBORN-925][TEST] Refine PartitionFilesSorterSuiteJ
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

As title

### Why are the changes needed?

This PR

1. Strengthening assertion conditions.
2. Enabling the previously ignored `testLargeFile` scenario.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass GA

Closes apache#1848 from cfmcgrady/refine-partition-files-sorter-suite.

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
  • Loading branch information
cfmcgrady authored and waitinfuture committed Aug 29, 2023
1 parent fda7353 commit 2766908
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public long getFileLength() {
return bytesFlushed;
}

public long updateBytesFlushed(int numBytes) {
public long updateBytesFlushed(long numBytes) {
bytesFlushed += numBytes;
return bytesFlushed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Random;

import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
Expand All @@ -40,6 +39,7 @@
import org.apache.celeborn.common.meta.FileInfo;
import org.apache.celeborn.common.unsafe.Platform;
import org.apache.celeborn.common.util.CelebornExitKind;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.service.deploy.worker.WorkerSource;
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
Expand All @@ -48,18 +48,19 @@ public class PartitionFilesSorterSuiteJ {

private static Logger logger = LoggerFactory.getLogger(PartitionFilesSorterSuiteJ.class);

private Random random = new Random();
private File shuffleFile;
private FileInfo fileInfo;
public final int CHUNK_SIZE = 8 * 1024 * 1024;
private String originFileName;
private long originFileLen;
private FileWriter fileWriter;
private long sortTimeout = 16 * 1000;
private UserIdentifier userIdentifier = new UserIdentifier("mock-tenantId", "mock-name");

public void prepare(boolean largefile) throws IOException {
private static final int MAX_MAP_ID = 50;

public long[] prepare(int mapCount) throws IOException {
long[] partitionSize = new long[MAX_MAP_ID];
byte[] batchHeader = new byte[16];
Random random = new Random();
shuffleFile = File.createTempFile("Celeborn", "sort-suite");

originFileName = shuffleFile.getAbsolutePath();
Expand All @@ -68,13 +69,8 @@ public void prepare(boolean largefile) throws IOException {
FileChannel channel = fileOutputStream.getChannel();
Map<Integer, Integer> batchIds = new HashMap<>();

int maxMapId = 50;
int mapCount = 1000;
if (largefile) {
mapCount = 15000;
}
for (int i = 0; i < mapCount; i++) {
int mapId = random.nextInt(maxMapId);
int mapId = random.nextInt(MAX_MAP_ID);
int currentAttemptId = 0;
int batchId =
batchIds.compute(
Expand All @@ -87,6 +83,7 @@ public void prepare(boolean largefile) throws IOException {
}
return v;
});
// [63.9k, 192k + 63.9k]
int dataSize = random.nextInt(192 * 1024) + 65525;
byte[] mockedData = new byte[dataSize];
Platform.putInt(batchHeader, Platform.BYTE_ARRAY_OFFSET, mapId);
Expand All @@ -102,15 +99,12 @@ public void prepare(boolean largefile) throws IOException {
while (buf2.hasRemaining()) {
channel.write(buf2);
}
partitionSize[mapId] = partitionSize[mapId] + batchHeader.length + mockedData.length;
}
originFileLen = channel.size();
fileInfo.getChunkOffsets().add(originFileLen);
fileInfo.updateBytesFlushed((int) originFileLen);
System.out.println(
shuffleFile.getAbsolutePath()
+ " filelen "
+ (double) originFileLen / 1024 / 1024.0
+ "MB");
fileInfo.updateBytesFlushed(originFileLen);
logger.info(shuffleFile.getAbsolutePath() + " filelen: " + Utils.bytesToString(originFileLen));

CelebornConf conf = new CelebornConf();
conf.set(CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(), "0.8");
Expand All @@ -126,41 +120,57 @@ public void prepare(boolean largefile) throws IOException {
fileWriter = Mockito.mock(FileWriter.class);
when(fileWriter.getFile()).thenAnswer(i -> shuffleFile);
when(fileWriter.getFileInfo()).thenAnswer(i -> fileInfo);
return partitionSize;
}

public void clean() throws IOException {
// origin file
JavaUtils.deleteRecursively(shuffleFile);
// sorted file
JavaUtils.deleteRecursively(new File(shuffleFile.getPath() + ".sorted"));
// index file
JavaUtils.deleteRecursively(new File(shuffleFile.getPath() + ".index"));
}

public void clean() {
shuffleFile.delete();
private void check(int mapCount, int startMapIndex, int endMapIndex) throws IOException {
try {
long[] partitionSize = prepare(mapCount);
CelebornConf conf = new CelebornConf();
conf.set(CelebornConf.SHUFFLE_CHUNK_SIZE().key(), "8m");
PartitionFilesSorter partitionFilesSorter =
new PartitionFilesSorter(MemoryManager.instance(), conf, new WorkerSource(conf));
FileInfo info =
partitionFilesSorter.getSortedFileInfo(
"application-1",
originFileName,
fileWriter.getFileInfo(),
startMapIndex,
endMapIndex);
long totalSizeToFetch = 0;
for (int i = startMapIndex; i < endMapIndex; i++) {
totalSizeToFetch += partitionSize[i];
}
long numChunks = totalSizeToFetch / conf.shuffleChunkSize() + 1;
Assert.assertTrue(0 < info.numChunks() && info.numChunks() <= numChunks);
long actualTotalChunkSize = info.getLastChunkOffset() - info.getChunkOffsets().get(0);
Assert.assertTrue(totalSizeToFetch == actualTotalChunkSize);
} finally {
clean();
}
}

@Test
public void testSmallFile() throws InterruptedException, IOException {
prepare(false);
CelebornConf conf = new CelebornConf();
PartitionFilesSorter partitionFilesSorter =
new PartitionFilesSorter(MemoryManager.instance(), conf, new WorkerSource(conf));
FileInfo info =
partitionFilesSorter.getSortedFileInfo(
"application-1", originFileName, fileWriter.getFileInfo(), 5, 10);
Thread.sleep(1000);
System.out.println(info.toString());
Assert.assertTrue(info.numChunks() > 0);
clean();
public void testSmallFile() throws IOException {
int startMapIndex = random.nextInt(5);
int endMapIndex = startMapIndex + random.nextInt(5) + 5;
check(1000, startMapIndex, endMapIndex);
}

@Test
@Ignore
public void testLargeFile() throws InterruptedException, IOException {
prepare(true);
CelebornConf conf = new CelebornConf();
PartitionFilesSorter partitionFilesSorter =
new PartitionFilesSorter(MemoryManager.instance(), conf, new WorkerSource(conf));
FileInfo info =
partitionFilesSorter.getSortedFileInfo(
"application-1", originFileName, fileWriter.getFileInfo(), 5, 10);
Thread.sleep(30000);
System.out.println(info.toString());
Assert.assertTrue(info.numChunks() > 0);
clean();
public void testLargeFile() throws IOException {
int startMapIndex = random.nextInt(5);
int endMapIndex = startMapIndex + random.nextInt(5) + 5;
check(15000, startMapIndex, endMapIndex);
}

@Test
Expand Down

0 comments on commit 2766908

Please sign in to comment.