Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.pixelsdb.pixels.common.exception.SinglePointIndexException;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.index.IndexProto;
import io.pixelsdb.pixels.common.index.LatestVersionCache.CacheEntry;

import java.util.List;

Expand All @@ -39,8 +38,8 @@ public CachingSinglePointIndex()
if (cacheEnabled)
{
long capacity = Long.parseLong(config.getProperty("index.cache.capacity"));
long expireAfterAccessSeconds = Long.parseLong(config.getProperty("index.cache.expiration.seconds"));
this.cache = new LatestVersionCache(capacity, expireAfterAccessSeconds);
long expirationSeconds = Long.parseLong(config.getProperty("index.cache.expiration.seconds"));
this.cache = new LatestVersionCache(capacity, expirationSeconds);
} else
{
this.cache = null;
Expand All @@ -52,10 +51,16 @@ public long getUniqueRowId(IndexProto.IndexKey key) throws SinglePointIndexExcep
{
if (cache != null)
{
CacheEntry cacheEntry = cache.get(key);
if (cacheEntry != null && cacheEntry.timestamp <= key.getTimestamp())
final String cacheKey = LatestVersionCache.buildCacheKey(key);
final String cacheValue = cache.get(cacheKey);

if (cacheValue != null)
{
return cacheEntry.rowId;
long[] entry = LatestVersionCache.parseCacheValue(cacheValue);
if (entry[0] <= key.getTimestamp())
{
return entry[1];
}
}
}

Expand All @@ -68,7 +73,9 @@ public final boolean putEntry(IndexProto.IndexKey key, long rowId) throws Single
boolean success = putEntryInternal(key, rowId);
if (isUnique() && cache != null && success)
{
cache.put(key, rowId);
final String cacheKey = LatestVersionCache.buildCacheKey(key);
final String cacheValue = LatestVersionCache.buildCacheValue(key.getTimestamp(), rowId);
cache.put(cacheKey, cacheValue);
}
return success;
}
Expand All @@ -81,7 +88,9 @@ public boolean putPrimaryEntries(List<IndexProto.PrimaryIndexEntry> entries) thr
{
for (IndexProto.PrimaryIndexEntry entry : entries)
{
cache.put(entry.getIndexKey(), entry.getRowId());
final String cacheKey = LatestVersionCache.buildCacheKey(entry.getIndexKey());
final String cacheValue = LatestVersionCache.buildCacheValue(entry.getIndexKey().getTimestamp(), entry.getRowId());
cache.put(cacheKey, cacheValue);
}
}
return success;
Expand All @@ -95,7 +104,9 @@ public boolean putSecondaryEntries(List<IndexProto.SecondaryIndexEntry> entries)
{
for (IndexProto.SecondaryIndexEntry entry : entries)
{
cache.put(entry.getIndexKey(), entry.getRowId());
final String cacheKey = LatestVersionCache.buildCacheKey(entry.getIndexKey());
final String cacheValue = LatestVersionCache.buildCacheValue(entry.getIndexKey().getTimestamp(), entry.getRowId());
cache.put(cacheKey, cacheValue);
}
}
return success;
Expand All @@ -107,7 +118,9 @@ public long updatePrimaryEntry(IndexProto.IndexKey key, long rowId) throws Singl
long previousRowId = updatePrimaryEntryInternal(key, rowId);
if (cache != null)
{
cache.put(key, rowId);
final String cacheKey = LatestVersionCache.buildCacheKey(key);
final String cacheValue = LatestVersionCache.buildCacheValue(key.getTimestamp(), rowId);
cache.put(cacheKey, cacheValue);
}
return previousRowId;
}
Expand All @@ -118,7 +131,9 @@ public List<Long> updateSecondaryEntry(IndexProto.IndexKey key, long rowId) thro
List<Long> previousRowIds = updateSecondaryEntryInternal(key, rowId);
if (isUnique() && cache != null)
{
cache.put(key, rowId);
final String cacheKey = LatestVersionCache.buildCacheKey(key);
final String cacheValue = LatestVersionCache.buildCacheValue(key.getTimestamp(), rowId);
cache.put(cacheKey, cacheValue);
}
return previousRowIds;
}
Expand All @@ -131,7 +146,9 @@ public List<Long> updatePrimaryEntries(List<IndexProto.PrimaryIndexEntry> entrie
{
for (IndexProto.PrimaryIndexEntry entry : entries)
{
cache.put(entry.getIndexKey(), entry.getRowId());
final String cacheKey = LatestVersionCache.buildCacheKey(entry.getIndexKey());
final String cacheValue = LatestVersionCache.buildCacheValue(entry.getIndexKey().getTimestamp(), entry.getRowId());
cache.put(cacheKey, cacheValue);
}
}
return previousRowIds;
Expand All @@ -145,7 +162,9 @@ public List<Long> updateSecondaryEntries(List<IndexProto.SecondaryIndexEntry> en
{
for (IndexProto.SecondaryIndexEntry entry : entries)
{
cache.put(entry.getIndexKey(), entry.getRowId());
final String cacheKey = LatestVersionCache.buildCacheKey(entry.getIndexKey());
final String cacheValue = LatestVersionCache.buildCacheValue(entry.getIndexKey().getTimestamp(), entry.getRowId());
cache.put(cacheKey, cacheValue);
}
}
return previousRowIds;
Expand All @@ -157,7 +176,8 @@ public long deleteUniqueEntry(IndexProto.IndexKey indexKey) throws SinglePointIn
long deleteRowId = deleteUniqueEntryInternal(indexKey);
if (cache != null && deleteRowId >= 0)
{
cache.invalidate(indexKey);
final String cacheKey = LatestVersionCache.buildCacheKey(indexKey);
cache.invalidate(cacheKey);
}
return deleteRowId;
}
Expand All @@ -168,7 +188,8 @@ public List<Long> deleteEntry(IndexProto.IndexKey key) throws SinglePointIndexEx
List<Long> deletedRowIds = deleteEntryInternal(key);
if (isUnique() && cache != null && !deletedRowIds.isEmpty())
{
cache.invalidate(key);
final String cacheKey = LatestVersionCache.buildCacheKey(key);
cache.invalidate(cacheKey);
}
return deletedRowIds;
}
Expand All @@ -181,7 +202,8 @@ public List<Long> deleteEntries(List<IndexProto.IndexKey> keys) throws SinglePoi
{
for (IndexProto.IndexKey key : keys)
{
cache.invalidate(key);
final String cacheKey = LatestVersionCache.buildCacheKey(key);
cache.invalidate(cacheKey);
}
}
return deletedRowIds;
Expand All @@ -195,7 +217,8 @@ public List<Long> purgeEntries(List<IndexProto.IndexKey> indexKeys) throws Singl
{
for (IndexProto.IndexKey key : indexKeys)
{
cache.invalidate(key);
final String cacheKey = LatestVersionCache.buildCacheKey(key);
cache.invalidate(cacheKey);
}
}
return purgedRowIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,95 +23,72 @@
import com.github.benmanes.caffeine.cache.Cache;
import io.pixelsdb.pixels.index.IndexProto;

import java.util.Objects;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

public class LatestVersionCache
{
private final Cache<CacheKey, CacheEntry> cache;
private final Cache<String, String> cache;

/**
* A wrapper for {@link IndexProto.IndexKey} that is used as a key in the cache.
* The {@link #equals(Object)} and {@link #hashCode()} methods are implemented based on
* the table ID, index ID, and key value, ignoring the timestamp. This allows cache
* lookups to succeed for the same logical key regardless of the transaction timestamp.
*/
private static class CacheKey
public LatestVersionCache(long capacity, long expirationSeconds)
{
private final IndexProto.IndexKey indexKey;

public CacheKey(IndexProto.IndexKey indexKey)
{
this.indexKey = indexKey;
}

public IndexProto.IndexKey getIndexKey()
{
return indexKey;
}

@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

CacheKey other = (CacheKey) o;
// Compare based on tableId
return indexKey.getTableId() == other.indexKey.getTableId() &&
indexKey.getIndexId() == other.indexKey.getIndexId() &&
Objects.equals(indexKey.getKey(), other.indexKey.getKey());
}

@Override
public int hashCode()
{
return Objects.hash(indexKey.getTableId(), indexKey.getIndexId(), indexKey.getKey());
}
this.cache = Caffeine.newBuilder()
.maximumSize(capacity)
.expireAfterWrite(expirationSeconds, TimeUnit.SECONDS)
.build();
}

public static class CacheEntry
public String get(String key)
{
final long rowId;
final long timestamp;
return cache.getIfPresent(key);
}

CacheEntry (long rowId, long timestamp)
{
this.rowId = rowId;
this.timestamp = timestamp;
}
public void put(String key, String value)
{
cache.put(key, value);
}

public LatestVersionCache(long maximumSize, long expireAfterAccessSeconds)
public void invalidate(String key)
{
this.cache = Caffeine.newBuilder()
.maximumSize(maximumSize)
.expireAfterAccess(expireAfterAccessSeconds, TimeUnit.SECONDS)
.build();
cache.invalidate(key);
}

public CacheEntry get(IndexProto.IndexKey key)
public static String buildCacheKey(IndexProto.IndexKey key)
{
return cache.getIfPresent(new CacheKey(key));
String indexKey = key.getKey().toString(StandardCharsets.ISO_8859_1);
return new StringBuilder(20 + 20 + indexKey.length())
.append(key.getTableId())
.append(key.getIndexId())
.append(indexKey)
.toString();
}

public void put(IndexProto.IndexKey key, long rowId)
public static String buildCacheValue(long timestamp, long rowId)
{
CacheKey cacheKey = new CacheKey(key);
long newTimestamp = key.getTimestamp();
cache.asMap().compute(cacheKey, (k, existingEntry) -> {
if (existingEntry == null || newTimestamp >= existingEntry.timestamp)
{
return new CacheEntry(rowId, newTimestamp);
} else
{
return existingEntry;
}
});
char[] chars = new char[16];
for (int i = 0; i < 8; i++)
{
chars[i] = (char) ((timestamp >> (i * 8)) & 0xFF);
chars[8 + i] = (char) ((rowId >> (i * 8)) & 0xFF);
}
return new String(chars);
}

public void invalidate(IndexProto.IndexKey key)
public static long[] parseCacheValue(String value)
{
cache.invalidate(new CacheKey(key));
if (value == null || value.length() != 16)
{
return null;
}
long timestamp = 0;
long rowId = 0;
char[] chars = value.toCharArray();

for (int i = 0; i < 8; i++)
{
timestamp |= (long) (chars[i] & 0xFF) << (i * 8);
rowId |= (long) (chars[8 + i] & 0xFF) << (i * 8);
}
return new long[] { timestamp, rowId };
}
}
41 changes: 41 additions & 0 deletions pixels-common/src/main/resources/pixels.properties
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,47 @@ index.rockset.read.only=false
index.rocksdb.data.path=/tmp/rocksdb
# rocksdb write buffer size (default to 64MB)
index.rocksdb.write.buffer.size=67108864
# rocksdb max write buffer number (default to 3)
index.rocksdb.max.write.buffer.number=3
# rocksdb max background flush threads (default to 2)
index.rocksdb.max.background.flushes=2
# rocksdb max background compactions (default to 4)
index.rocksdb.max.background.compactions=4
# rocksdb max open files (default to 4096)
index.rocksdb.max.open.files=4096
# rocksdb block cache capacity (default to 1GB)
index.rocksdb.block.cache.capacity=1073741824
# rocksdb block cache shard bits (default to 6, i.e., 64 shards)
index.rocksdb.block.cache.shard.bits=6
# rocksdb block size (default to 16KB)
index.rocksdb.block.size=16384
# rocksdb min write buffer number to merge (default to 2)
index.rocksdb.min.write.buffer.number.to.merge=2
# rocksdb file number compaction trigger (default to 4)
index.rocksdb.level0.file.num.compaction.trigger=4
# rocksdb max bytes for level base (default to 256MB)
index.rocksdb.max.bytes.for.level.base=268435456
rocksdb max bytes for level multiplier (default to 10)
index.rocksdb.max.bytes.for.level.multiplier=10
# rocksdb target file size base (default to 64MB)
index.rocksdb.target.file.size.base=67108864
# rocksdb file size multiplier (default to 1)
index.rocksdb.target.file.size.multiplier=1
# rocksdb max subcompactions
index.rocksdb.max.subcompactions=1
# rocksdb compression type (e.g. NO_COMPRESSION, SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZ2_COMPRESSION, LZ4_COMPRESSION, LZ4HC_COMPRESSION, ZSTD_COMPRESSION)
index.rocksdb.compression.type=LZ4_COMPRESSION
# rocksdb bottommost compression type
index.rocksdb.bottommost.compression.type=ZSTD_COMPRESSION
# Compaction style determines how RocksDB merges SST files. (e.g. UNIVERSAL, LEVEL)
index.rocksdb.compaction.style=LEVEL
# Whether to enable RocksDB internal statistics collection
index.rocksdb.stats.enabled=false
# Directory where RocksDB will write statistics logs
index.rocksdb.stats.path=/tmp/rocksDBStats
# Time interval (in seconds) between statistics dumps
index.rocksdb.stats.interval=60

# Whether to enable the latest version cache for SinglePointIndex
index.cache.enabled=false
# The maximum number of entries in the cache
Expand Down
Loading