Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions check-rules/spotbugs-excludes.xml
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@
<Bug pattern="SING_SINGLETON_HAS_NONPRIVATE_CONSTRUCTOR"/>
<Class name="org.apache.ignite.internal.partition.replicator.schema.TableDefinitionDiff"/>
</Match>
<Match>
<!-- Done that way on purpose. -->
<Bug pattern="RV_NEGATING_RESULT_OF_COMPARETO"/>
<Class name="org.apache.ignite.internal.storage.pagememory.index.sorted.io.SortedIndexTreeIo"/>
<Method name="compare"/>
</Match>
<!-- end of false-positive exclusions -->


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -1427,4 +1428,27 @@ public static boolean shouldSwitchToRequestsExecutor(ThreadOperation... required
return true;
}
}

/**
* Creates a comparator of lists that compares them lexicographically using the provided comparator for list elements.
*
* @param comparator Comparator for list elements.
* @param <T> Type of list's elements.
* @return Comparator of lists.
*/
public static <T> Comparator<List<T>> lexicographicListComparator(Comparator<? super T> comparator) {
return (l, r) -> {
int length = Math.min(l.size(), r.size());

for (int i = 0; i < length; i++) {
int cmp = comparator.compare(l.get(i), r.get(i));

if (cmp != 0) {
return cmp;
}
}

return Integer.compare(l.size(), r.size());
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,31 @@ public UnsafeByteBufferAccessor(ByteBuffer buff) {
capacity = buff.capacity();
}

/**
* Constructor that initializes the accessor with a memory address and capacity.
*
* @param addr Memory address.
* @param capacity Capacity in bytes.
*/
public UnsafeByteBufferAccessor(long addr, int capacity) {
this.bytes = null;
this.addr = addr;
this.capacity = capacity;
}

/**
* Constructor that initializes the accessor with a byte array, offset, and length.
*
* @param bytes Byte array.
* @param from Offset in the array.
* @param length Length in bytes.
*/
public UnsafeByteBufferAccessor(byte[] bytes, int from, int length) {
this.bytes = bytes;
this.addr = GridUnsafe.BYTE_ARR_OFF + from;
this.capacity = length;
}

@Override
public byte get(int p) {
return GridUnsafe.getByte(bytes, addr + p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@ public void compareSingleColumnTuplesWithNulls() {
validate(descNullFirstComparator, tuple1, tuple2);
validate(ascNullLastComparator, tuple2, tuple1);
validate(descNullLastComparator, tuple2, tuple1);

if (supportsPartialComparison()) {
ByteBuffer partialTuple2 = tuple2.duplicate().order(ByteOrder.LITTLE_ENDIAN).limit(tuple2.limit() - 1);

validate(ascNullFirstComparator, tuple1, partialTuple2);
validate(descNullFirstComparator, tuple1, partialTuple2);
validate(ascNullLastComparator, partialTuple2, tuple1);
validate(descNullLastComparator, partialTuple2, tuple1);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,173 @@

package org.apache.ignite.internal.storage.pagememory;

import static java.util.Comparator.comparing;
import static org.apache.ignite.internal.util.IgniteUtils.lexicographicListComparator;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation;
import org.apache.ignite.internal.configuration.SystemLocalConfiguration;
import org.apache.ignite.internal.configuration.SystemPropertyView;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.StorageEngine;
import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
import org.apache.ignite.internal.storage.pagememory.index.sorted.comparator.JitComparator;
import org.apache.ignite.internal.storage.pagememory.index.sorted.comparator.JitComparatorGenerator;
import org.apache.ignite.internal.storage.pagememory.index.sorted.comparator.JitComparatorOptions;
import org.apache.ignite.internal.type.NativeType;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.VisibleForTesting;

/** Abstract implementation of the storage engine based on memory {@link PageMemory}. */
public abstract class AbstractPageMemoryStorageEngine implements StorageEngine {
public static final String LEGACY_PAGE_MEMERY_SORTED_INDEX_COMPARATOR_PROPERTY = "legacyPageMemorySortedIndexComparator";

protected final @Nullable SystemLocalConfiguration systemLocalConfig;

private final HybridClock clock;

private boolean useLegacySortedIndexComparator = false;

/**
* This map is used to reuse comparators for sorted indexes with the same set of columns and their collations. It is beneficial to reuse
* comparators because otherwise every comparator will use its own generated class, which bloats metaspace and doesn't allow JVM's JIT
* to be as efficient.
*/
private final ConcurrentMap<StorageSortedIndexDescriptor, CachedComparator> cachedSortedIndexComparators
= new ConcurrentSkipListMap<>(comparing(
StorageSortedIndexDescriptor::columns,
lexicographicListComparator(comparing(StorageSortedIndexColumnDescriptor::type)
.thenComparing(StorageSortedIndexColumnDescriptor::nullable)
.thenComparing(StorageSortedIndexColumnDescriptor::nullsFirst)
.thenComparing(StorageSortedIndexColumnDescriptor::asc)
)
));

/** Constructor. */
AbstractPageMemoryStorageEngine(HybridClock clock) {
AbstractPageMemoryStorageEngine(@Nullable SystemLocalConfiguration systemLocalConfig, HybridClock clock) {
this.systemLocalConfig = systemLocalConfig;
this.clock = clock;
}

@Override
public void start() throws StorageException {
if (systemLocalConfig != null) {
SystemPropertyView legacyComparator = systemLocalConfig.value().properties()
.get(LEGACY_PAGE_MEMERY_SORTED_INDEX_COMPARATOR_PROPERTY);

if (legacyComparator != null && "true".equalsIgnoreCase(legacyComparator.propertyValue())) {
useLegacySortedIndexComparator = true;
}
}
}

/**
* Creates a Global remove ID for structures based on a {@link BplusTree}, always creating monotonically increasing ones even after
* recovery node, so that there are no errors after restoring trees.
*/
public AtomicLong generateGlobalRemoveId() {
return new AtomicLong(clock.nowLong());
}

/**
* Creates a new instance of {@link JitComparator} for the given sorted index descriptor.
*/
@VisibleForTesting
public static JitComparator createNewJitComparator(StorageSortedIndexDescriptor desc) {
List<StorageSortedIndexColumnDescriptor> columns = desc.columns();
List<CatalogColumnCollation> collations = new ArrayList<>(columns.size());
List<NativeType> types = new ArrayList<>(columns.size());
List<Boolean> nullableFlags = new ArrayList<>(columns.size());

for (StorageSortedIndexColumnDescriptor col : columns) {
collations.add(CatalogColumnCollation.get(col.asc(), col.nullsFirst()));
types.add(col.type());
// Nulls can still be passed from the outside as lower/upper bounds during the search, even if the column is not nullable.
nullableFlags.add(true);
}

return JitComparatorGenerator.createComparator(JitComparatorOptions.builder()
.columnCollations(collations)
.columnTypes(types)
.nullableFlags(nullableFlags)
.supportPrefixes(true)
.supportPartialComparison(true)
.build()
);
}

/**
* Creates or retrieves from cache a {@link JitComparator} for the given sorted index descriptor. Returns a cached comparator value if
* it already exists (was not disposed with {@link #disposeSortedIndexComparator(StorageSortedIndexDescriptor)}) for a given descriptor.
*/
public @Nullable JitComparator createSortedIndexComparator(StorageSortedIndexDescriptor indexDescriptor) {
if (useLegacySortedIndexComparator) {
return null;
}

CachedComparator c = cachedSortedIndexComparators.compute(indexDescriptor, (desc, cmp) -> {
if (cmp != null) {
return cmp.incrementUsage();
}

JitComparator jitComparator = createNewJitComparator(desc);

return new CachedComparator(jitComparator);
});

return c.jitComparator();
}

/**
* Marks that a comparator, created previously with {@link #createSortedIndexComparator(StorageSortedIndexDescriptor)}, will no longer
* be used, and the internal cache of comparators may react to this information by removing the comparator from the cache and freeing
* associated resources.
*/
public void disposeSortedIndexComparator(StorageSortedIndexDescriptor indexDescriptor) {
if (useLegacySortedIndexComparator) {
return;
}

cachedSortedIndexComparators.compute(indexDescriptor, (desc, cmp) -> {
assert cmp != null;

return cmp.decrementUsage();
});
}

private static class CachedComparator {
private final JitComparator comparator;
private final int usageCount;

private CachedComparator(JitComparator comparator, int usageCount) {
assert usageCount > 0;

this.comparator = comparator;
this.usageCount = usageCount;
}

CachedComparator(JitComparator comparator) {
this(comparator, 1);
}

JitComparator jitComparator() {
return comparator;
}

CachedComparator incrementUsage() {
return new CachedComparator(comparator, usageCount + 1);
}

@Nullable AbstractPageMemoryStorageEngine.CachedComparator decrementUsage() {
return usageCount == 1 ? null : new CachedComparator(comparator, usageCount - 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ public class PersistentPageMemoryStorageEngine extends AbstractPageMemoryStorage

private final StorageConfiguration storageConfig;

private final @Nullable SystemLocalConfiguration systemLocalConfig;

private final PageIoRegistry ioRegistry;

private final Path storagePath;
Expand Down Expand Up @@ -152,13 +150,12 @@ public PersistentPageMemoryStorageEngine(
ExecutorService commonExecutorService,
HybridClock clock
) {
super(clock);
super(systemLocalConfig, clock);

this.igniteInstanceName = igniteInstanceName;
this.metricManager = metricManager;
this.storageConfig = storageConfig;
this.engineConfig = ((PersistentPageMemoryStorageEngineExtensionConfiguration) storageConfig.engines()).aipersist();
this.systemLocalConfig = systemLocalConfig;
this.ioRegistry = ioRegistry;
this.storagePath = storagePath;
this.longJvmPauseDetector = longJvmPauseDetector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.SystemLocalConfiguration;
import org.apache.ignite.internal.configuration.SystemLocalExtensionConfiguration;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.metrics.MetricManager;
Expand Down Expand Up @@ -60,13 +62,16 @@ public StorageEngine createEngine(
) throws StorageException {
StorageConfiguration storageConfig = configRegistry.getConfiguration(StorageExtensionConfiguration.KEY).storage();

SystemLocalConfiguration systemLocalConfig = configRegistry.getConfiguration(SystemLocalExtensionConfiguration.KEY).system();

PageIoRegistry ioRegistry = new PageIoRegistry();

ioRegistry.loadFromServiceLoader();

return new VolatilePageMemoryStorageEngine(
igniteInstanceName,
storageConfig,
systemLocalConfig,
ioRegistry,
failureManager,
clock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.ignite.internal.configuration.SystemLocalConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.logger.IgniteLogger;
Expand All @@ -46,6 +47,7 @@
import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineExtensionConfiguration;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.jetbrains.annotations.Nullable;

/** Storage engine implementation based on {@link VolatilePageMemory}. */
public class VolatilePageMemoryStorageEngine extends AbstractPageMemoryStorageEngine {
Expand Down Expand Up @@ -80,18 +82,20 @@ public class VolatilePageMemoryStorageEngine extends AbstractPageMemoryStorageEn
*
* @param igniteInstanceName Ignite instance name.
* @param storageConfig Storage engine and storage profiles configurations.
* @param systemLocalConfig Local system configuration.
* @param ioRegistry IO registry.
* @param failureProcessor Failure processor.
* @param clock Hybrid Logical Clock.
*/
public VolatilePageMemoryStorageEngine(
String igniteInstanceName,
StorageConfiguration storageConfig,
@Nullable SystemLocalConfiguration systemLocalConfig,
PageIoRegistry ioRegistry,
FailureProcessor failureProcessor,
HybridClock clock
) {
super(clock);
super(systemLocalConfig, clock);

this.igniteInstanceName = igniteInstanceName;
this.storageConfig = storageConfig;
Expand Down
Loading