From 27669080fbfb009cbe9b0f3bce555561acfadb59 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Tue, 29 Aug 2023 18:16:02 +0800 Subject: [PATCH] [CELEBORN-925][TEST] Refine PartitionFilesSorterSuiteJ ### What changes were proposed in this pull request? As title ### Why are the changes needed? This PR 1. Strengthening assertion conditions. 2. Enabling the previously ignored `testLargeFile` scenario. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GA Closes #1848 from cfmcgrady/refine-partition-files-sorter-suite. Authored-by: Fu Chen Signed-off-by: zky.zhoukeyong --- .../apache/celeborn/common/meta/FileInfo.java | 2 +- .../storage/PartitionFilesSorterSuiteJ.java | 98 ++++++++++--------- 2 files changed, 55 insertions(+), 45 deletions(-) diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java index 426f9285094..3b9a540d21a 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java @@ -114,7 +114,7 @@ public long getFileLength() { return bytesFlushed; } - public long updateBytesFlushed(int numBytes) { + public long updateBytesFlushed(long numBytes) { bytesFlushed += numBytes; return bytesFlushed; } diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java index ab480065ff6..0c121eedc6f 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java @@ -29,7 +29,6 @@ import java.util.Random; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; import org.slf4j.Logger; @@ -40,6 +39,7 @@ import org.apache.celeborn.common.meta.FileInfo; import org.apache.celeborn.common.unsafe.Platform; import org.apache.celeborn.common.util.CelebornExitKind; +import org.apache.celeborn.common.util.JavaUtils; import org.apache.celeborn.common.util.Utils; import org.apache.celeborn.service.deploy.worker.WorkerSource; import org.apache.celeborn.service.deploy.worker.memory.MemoryManager; @@ -48,18 +48,19 @@ public class PartitionFilesSorterSuiteJ { private static Logger logger = LoggerFactory.getLogger(PartitionFilesSorterSuiteJ.class); + private Random random = new Random(); private File shuffleFile; private FileInfo fileInfo; - public final int CHUNK_SIZE = 8 * 1024 * 1024; private String originFileName; private long originFileLen; private FileWriter fileWriter; - private long sortTimeout = 16 * 1000; private UserIdentifier userIdentifier = new UserIdentifier("mock-tenantId", "mock-name"); - public void prepare(boolean largefile) throws IOException { + private static final int MAX_MAP_ID = 50; + + public long[] prepare(int mapCount) throws IOException { + long[] partitionSize = new long[MAX_MAP_ID]; byte[] batchHeader = new byte[16]; - Random random = new Random(); shuffleFile = File.createTempFile("Celeborn", "sort-suite"); originFileName = shuffleFile.getAbsolutePath(); @@ -68,13 +69,8 @@ public void prepare(boolean largefile) throws IOException { FileChannel channel = fileOutputStream.getChannel(); Map batchIds = new HashMap<>(); - int maxMapId = 50; - int mapCount = 1000; - if (largefile) { - mapCount = 15000; - } for (int i = 0; i < mapCount; i++) { - int mapId = random.nextInt(maxMapId); + int mapId = random.nextInt(MAX_MAP_ID); int currentAttemptId = 0; int batchId = batchIds.compute( @@ -87,6 +83,7 @@ public void prepare(boolean largefile) throws IOException { } return v; }); + // [63.9k, 192k + 63.9k] int dataSize = random.nextInt(192 * 1024) + 65525; byte[] mockedData = new byte[dataSize]; Platform.putInt(batchHeader, Platform.BYTE_ARRAY_OFFSET, mapId); @@ -102,15 +99,12 @@ public void prepare(boolean largefile) throws IOException { while (buf2.hasRemaining()) { channel.write(buf2); } + partitionSize[mapId] = partitionSize[mapId] + batchHeader.length + mockedData.length; } originFileLen = channel.size(); fileInfo.getChunkOffsets().add(originFileLen); - fileInfo.updateBytesFlushed((int) originFileLen); - System.out.println( - shuffleFile.getAbsolutePath() - + " filelen " - + (double) originFileLen / 1024 / 1024.0 - + "MB"); + fileInfo.updateBytesFlushed(originFileLen); + logger.info(shuffleFile.getAbsolutePath() + " filelen: " + Utils.bytesToString(originFileLen)); CelebornConf conf = new CelebornConf(); conf.set(CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(), "0.8"); @@ -126,41 +120,57 @@ public void prepare(boolean largefile) throws IOException { fileWriter = Mockito.mock(FileWriter.class); when(fileWriter.getFile()).thenAnswer(i -> shuffleFile); when(fileWriter.getFileInfo()).thenAnswer(i -> fileInfo); + return partitionSize; + } + + public void clean() throws IOException { + // origin file + JavaUtils.deleteRecursively(shuffleFile); + // sorted file + JavaUtils.deleteRecursively(new File(shuffleFile.getPath() + ".sorted")); + // index file + JavaUtils.deleteRecursively(new File(shuffleFile.getPath() + ".index")); } - public void clean() { - shuffleFile.delete(); + private void check(int mapCount, int startMapIndex, int endMapIndex) throws IOException { + try { + long[] partitionSize = prepare(mapCount); + CelebornConf conf = new CelebornConf(); + conf.set(CelebornConf.SHUFFLE_CHUNK_SIZE().key(), "8m"); + PartitionFilesSorter partitionFilesSorter = + new PartitionFilesSorter(MemoryManager.instance(), conf, new WorkerSource(conf)); + FileInfo info = + partitionFilesSorter.getSortedFileInfo( + "application-1", + originFileName, + fileWriter.getFileInfo(), + startMapIndex, + endMapIndex); + long totalSizeToFetch = 0; + for (int i = startMapIndex; i < endMapIndex; i++) { + totalSizeToFetch += partitionSize[i]; + } + long numChunks = totalSizeToFetch / conf.shuffleChunkSize() + 1; + Assert.assertTrue(0 < info.numChunks() && info.numChunks() <= numChunks); + long actualTotalChunkSize = info.getLastChunkOffset() - info.getChunkOffsets().get(0); + Assert.assertTrue(totalSizeToFetch == actualTotalChunkSize); + } finally { + clean(); + } } @Test - public void testSmallFile() throws InterruptedException, IOException { - prepare(false); - CelebornConf conf = new CelebornConf(); - PartitionFilesSorter partitionFilesSorter = - new PartitionFilesSorter(MemoryManager.instance(), conf, new WorkerSource(conf)); - FileInfo info = - partitionFilesSorter.getSortedFileInfo( - "application-1", originFileName, fileWriter.getFileInfo(), 5, 10); - Thread.sleep(1000); - System.out.println(info.toString()); - Assert.assertTrue(info.numChunks() > 0); - clean(); + public void testSmallFile() throws IOException { + int startMapIndex = random.nextInt(5); + int endMapIndex = startMapIndex + random.nextInt(5) + 5; + check(1000, startMapIndex, endMapIndex); } @Test - @Ignore - public void testLargeFile() throws InterruptedException, IOException { - prepare(true); - CelebornConf conf = new CelebornConf(); - PartitionFilesSorter partitionFilesSorter = - new PartitionFilesSorter(MemoryManager.instance(), conf, new WorkerSource(conf)); - FileInfo info = - partitionFilesSorter.getSortedFileInfo( - "application-1", originFileName, fileWriter.getFileInfo(), 5, 10); - Thread.sleep(30000); - System.out.println(info.toString()); - Assert.assertTrue(info.numChunks() > 0); - clean(); + public void testLargeFile() throws IOException { + int startMapIndex = random.nextInt(5); + int endMapIndex = startMapIndex + random.nextInt(5) + 5; + check(15000, startMapIndex, endMapIndex); } @Test