Skip to content

CASSANDRA-20636 - Fix: handle MAX_SEGMENT_SIZE < chunkSize for MmappedRegions::updateState #4189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/io/util/MmappedRegions.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public boolean extend(CompressionMetadata compressionMetadata, int chunkSize)
private void updateState(long length, int chunkSize)
{
// make sure the regions span whole chunks
long maxSize = (long) (MAX_SEGMENT_SIZE / chunkSize) * chunkSize;
long maxSize = Math.max(chunkSize, (long) (MAX_SEGMENT_SIZE / chunkSize) * chunkSize);
state.length = length;
long pos = state.getPosition();
while (pos < length)
Expand Down
79 changes: 62 additions & 17 deletions test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Rule;
Expand Down Expand Up @@ -88,6 +90,9 @@
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.SelfRefCounted;
import org.mockito.Mockito;

import static java.lang.String.format;
Expand All @@ -97,10 +102,12 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;

public class SSTableReaderTest
{

public static final String KEYSPACE1 = "SSTableReaderTest";
public static final String CF_STANDARD = "Standard1";
public static final String CF_STANDARD2 = "Standard2";
Expand All @@ -111,6 +118,7 @@ public class SSTableReaderTest
public static final String CF_STANDARD_LOW_INDEX_INTERVAL = "StandardLowIndexInterval";
public static final String CF_STANDARD_SMALL_BLOOM_FILTER = "StandardSmallBloomFilter";

private final List<Ref<?>> refsToRelease = new ArrayList<>();
private IPartitioner partitioner;

Token t(int i)
Expand Down Expand Up @@ -143,6 +151,28 @@ public static void defineSchema() throws Exception
CompactionManager.instance.disableAutoCompaction();
}

@After
public void teardown()
{
Throwable exceptions = null;
for (Ref<?> ref : refsToRelease)
{
try
{
ref.release();
}
catch (Throwable exc)
{
exceptions = Throwables.merge(exceptions, exc);
}
}

if (exceptions != null)
fail("Unable to release all tracked references " + exceptions);

refsToRelease.clear();
}

@Test
public void testGetPositionsForRanges()
{
Expand Down Expand Up @@ -310,7 +340,6 @@ public void testOnDiskSizeForRanges()
}
}


@Test
public void testOnDiskSizeCompressedBoundaries()
{
Expand Down Expand Up @@ -344,7 +373,6 @@ public void testOnDiskSizeCompressedBoundaries()
onDiskSizeForRanges(sstable, Collections.singleton(new Range<>(partitioner.getMinimumToken(), t0(k - 1))))); // inclusive end
}


long onDiskSizeForRanges(SSTableReader sstable, Collection<Range<Token>> ranges)
{
return sstable.onDiskSizeForPartitionPositions(sstable.getPositionsForRanges(ranges));
Expand Down Expand Up @@ -375,12 +403,22 @@ private String cut(String s, int n)
return s.substring(0, s.length() - n);
}


@Test
public void testSpannedIndexPositions() throws IOException
{
doTestSpannedIndexPositions(PageAware.PAGE_SIZE);
}

@Test
public void testSpannedIndexPositionsWithMaxSegmentSizeSmallerThanPageSize() throws IOException
{
doTestSpannedIndexPositions(PageAware.PAGE_SIZE - 1);
}

public void doTestSpannedIndexPositions(int maxSegmentSize) throws IOException
{
int originalMaxSegmentSize = MmappedRegions.MAX_SEGMENT_SIZE;
MmappedRegions.MAX_SEGMENT_SIZE = PageAware.PAGE_SIZE;
MmappedRegions.MAX_SEGMENT_SIZE = maxSegmentSize;

try
{
Expand Down Expand Up @@ -655,7 +693,6 @@ public void testGetPositionsBloomFilterStats()
assertEquals(fpCount + 2, sstable.getFilterTracker().getFalsePositiveCount());
}


@Test
public void testGetPositionsListenerCalls()
{
Expand Down Expand Up @@ -775,12 +812,12 @@ private SSTableReaderWithFilter prepareGetPositions()
Util.flush(store);
CompactionManager.instance.performMaximal(store, false);

SSTableReaderWithFilter sstable = (SSTableReaderWithFilter) store.getLiveSSTables().iterator().next();
sstable = (SSTableReaderWithFilter) sstable.cloneWithNewStart(dk(3));
return sstable;
return (SSTableReaderWithFilter) trackReleaseableRef(() -> {
SSTableReader reader = store.getLiveSSTables().iterator().next();
return reader.cloneWithNewStart(dk(3));
});
}


@Test
public void testOpeningSSTable() throws Exception
{
Expand Down Expand Up @@ -895,7 +932,8 @@ private static void checkOpenedBigTable(String ks, String cf, ColumnFamilyStore
components.remove(Components.SUMMARY);

target = SSTableReader.open(store, desc, components, store.metadata);
try {
try
{
assertTrue("Bloomfilter was not recreated", bloomModified < bloomFile.lastModified());
assertTrue("Summary was not recreated", summaryModified < summaryFile.lastModified());
}
Expand Down Expand Up @@ -1054,9 +1092,8 @@ public void testLoadingSummaryUsesCorrectPartitioner() throws Exception
if (sstable instanceof IndexSummarySupport<?>)
{
new IndexSummaryComponent(((IndexSummarySupport<?>) sstable).getIndexSummary(), sstable.getFirst(), sstable.getLast()).save(sstable.descriptor.fileFor(Components.SUMMARY), true);
SSTableReader reopened = SSTableReader.open(store, sstable.descriptor);
SSTableReader reopened = trackReleaseableRef(() -> SSTableReader.open(store, sstable.descriptor));
assert reopened.getFirst().getToken() instanceof LocalToken;
reopened.selfRef().release();
}
}
}
Expand All @@ -1065,7 +1102,7 @@ public void testLoadingSummaryUsesCorrectPartitioner() throws Exception
* see CASSANDRA-5407
*/
@Test
public void testGetScannerForNoIntersectingRanges() throws Exception
public void testGetScannerForNoIntersectingRanges()
{
ColumnFamilyStore store = discardSSTables(KEYSPACE1, CF_STANDARD);
partitioner = store.getPartitioner();
Expand Down Expand Up @@ -1240,7 +1277,7 @@ private <R extends SSTableReader & IndexSummarySupport<R>> void testIndexSummary
txn.update(replacement, true);
txn.finish();
}
R reopen = (R) SSTableReader.open(store, sstable.descriptor);
R reopen = (R) trackReleaseableRef(() -> SSTableReader.open(store, sstable.descriptor));
assert reopen.getIndexSummary().getSamplingLevel() == sstable.getIndexSummary().getSamplingLevel() + 1;
}

Expand Down Expand Up @@ -1311,9 +1348,9 @@ public void testMoveAndOpenSSTable() throws IOException
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_MOVE_AND_OPEN);
SSTableReader sstable = getNewSSTable(cfs);
SSTableReader sstable = trackReleaseableRef(() -> getNewSSTable(cfs));

cfs.clearUnsafe();
sstable.selfRef().release();
File tmpdir = new File(Files.createTempDirectory("testMoveAndOpen"));
tmpdir.deleteOnExit();
SSTableId id = SSTableIdFactory.instance.defaultBuilder().generator(Stream.empty()).get();
Expand All @@ -1325,7 +1362,8 @@ public void testMoveAndOpenSSTable() throws IOException
assertFalse(f.exists());
assertTrue(sstable.descriptor.fileFor(c).exists());
}
SSTableReader.moveAndOpenSSTable(cfs, sstable.descriptor, notLiveDesc, sstable.components, false);
trackReleaseableRef(() -> SSTableReader.moveAndOpenSSTable(cfs, sstable.descriptor, notLiveDesc, sstable.components, false));

// make sure the files were moved:
for (Component c : sstable.components)
{
Expand Down Expand Up @@ -1435,4 +1473,11 @@ private ColumnFamilyStore discardSSTables(String ks, String cf)
cfs.discardSSTables(System.currentTimeMillis());
return cfs;
}

private <T extends SelfRefCounted<T>> T trackReleaseableRef(Supplier<T> refSupplier)
{
T ref = refSupplier.get();
refsToRelease.add(ref.selfRef());
return ref;
}
}