Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Tiered Caching] Serializers for ehcache (#12709) #12736

Merged
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Metrics Framework] Adds support for asynchronous gauge metric type. ([#12642](https://github.com/opensearch-project/OpenSearch/issues/12642))
- Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601))
- [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542))
- [Tiered caching] Add serializer integration to allow ehcache disk cache to use non-primitive values ([#12709](https://github.com/opensearch-project/OpenSearch/pull/12709))
- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

Expand Down Expand Up @@ -106,8 +107,11 @@ public MockDiskCacheFactory(long delay, int maxSize) {
}

@Override
@SuppressWarnings({ "unchecked" })
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
return new Builder<K, V>().setMaxSize(maxSize)
return new Builder<K, V>().setKeySerializer((Serializer<K, byte[]>) config.getKeySerializer())
.setValueSerializer((Serializer<V, byte[]>) config.getValueSerializer())
.setMaxSize(maxSize)
.setDeliberateDelay(delay)
.setRemovalListener(config.getRemovalListener())
.build();
Expand All @@ -123,6 +127,8 @@ public static class Builder<K, V> extends ICacheBuilder<K, V> {

int maxSize;
long delay;
Serializer<K, byte[]> keySerializer;
Serializer<V, byte[]> valueSerializer;

@Override
public ICache<K, V> build() {
Expand All @@ -138,5 +144,16 @@ public Builder<K, V> setDeliberateDelay(long millis) {
this.delay = millis;
return this;
}

public Builder<K, V> setKeySerializer(Serializer<K, byte[]> keySerializer) {
this.keySerializer = keySerializer;
return this;
}

public Builder<K, V> setValueSerializer(Serializer<V, byte[]> valueSerializer) {
this.valueSerializer = valueSerializer;
return this;
}

}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,33 @@
package org.opensearch.cache.store.disk;

import org.opensearch.cache.EhcacheDiskCacheSettings;
import org.opensearch.common.Randomness;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.bytes.CompositeBytesReference;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand All @@ -51,6 +60,8 @@ public void testBasicGetAndPut() throws IOException {
.setIsEventListenerModeSync(true)
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -89,6 +100,8 @@ public void testBasicGetAndPutUsingFactory() throws IOException {
new CacheConfig.Builder<String, String>().setValueType(String.class)
.setKeyType(String.class)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(
Settings.builder()
.put(
Expand Down Expand Up @@ -149,6 +162,8 @@ public void testConcurrentPut() throws Exception {
.setIsEventListenerModeSync(true) // For accurate count
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -194,6 +209,8 @@ public void testEhcacheParallelGets() throws Exception {
.setIsEventListenerModeSync(true) // For accurate count
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -237,6 +254,8 @@ public void testEhcacheKeyIterator() throws Exception {
.setIsEventListenerModeSync(true)
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -274,6 +293,8 @@ public void testEvictions() throws Exception {
.setThreadPoolAlias("ehcacheTest")
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -304,6 +325,8 @@ public void testComputeIfAbsentConcurrently() throws Exception {
.setThreadPoolAlias("ehcacheTest")
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -373,6 +396,8 @@ public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception
.setThreadPoolAlias("ehcacheTest")
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -430,6 +455,8 @@ public void testComputeIfAbsentWithNullValueLoading() throws Exception {
.setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache")
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -491,6 +518,8 @@ public void testEhcacheKeyIteratorWithRemove() throws IOException {
.setIsEventListenerModeSync(true)
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -525,6 +554,50 @@ public void testEhcacheKeyIteratorWithRemove() throws IOException {

}

public void testBasicGetAndPutBytesReference() throws Exception {
Settings settings = Settings.builder().build();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, BytesReference> ehCacheDiskCachingTier = new EhcacheDiskCache.Builder<String, BytesReference>()
.setThreadPoolAlias("ehcacheTest")
.setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache")
.setKeySerializer(new StringSerializer())
.setValueSerializer(new BytesReferenceSerializer())
.setKeyType(String.class)
.setValueType(BytesReference.class)
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES * 20) // bigger so no evictions happen
.setExpireAfterAccess(TimeValue.MAX_VALUE)
.setRemovalListener(new MockRemovalListener<>())
.build();
int randomKeys = randomIntBetween(10, 100);
int valueLength = 100;
Random rand = Randomness.get();
Map<String, BytesReference> keyValueMap = new HashMap<>();
for (int i = 0; i < randomKeys; i++) {
byte[] valueBytes = new byte[valueLength];
rand.nextBytes(valueBytes);
keyValueMap.put(UUID.randomUUID().toString(), new BytesArray(valueBytes));

// Test a non-BytesArray implementation of BytesReference.
byte[] compositeBytes1 = new byte[valueLength];
byte[] compositeBytes2 = new byte[valueLength];
rand.nextBytes(compositeBytes1);
rand.nextBytes(compositeBytes2);
BytesReference composite = CompositeBytesReference.of(new BytesArray(compositeBytes1), new BytesArray(compositeBytes2));
keyValueMap.put(UUID.randomUUID().toString(), composite);
}
for (Map.Entry<String, BytesReference> entry : keyValueMap.entrySet()) {
ehCacheDiskCachingTier.put(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, BytesReference> entry : keyValueMap.entrySet()) {
BytesReference value = ehCacheDiskCachingTier.get(entry.getKey());
assertEquals(entry.getValue(), value);
}
ehCacheDiskCachingTier.close();
}
}

private static String generateRandomString(int length) {
String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
StringBuilder randomString = new StringBuilder(length);
Expand All @@ -546,4 +619,25 @@ public void onRemoval(RemovalNotification<K, V> notification) {
evictionMetric.inc();
}
}

static class StringSerializer implements Serializer<String, byte[]> {
private final Charset charset = StandardCharsets.UTF_8;

@Override
public byte[] serialize(String object) {
return object.getBytes(charset);
}

@Override
public String deserialize(byte[] bytes) {
if (bytes == null) {
return null;
}
return new String(bytes, charset);
}

public boolean equals(String object, byte[] bytes) {
return object.equals(deserialize(bytes));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.serializer;

import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;

import java.util.Arrays;

/**
* A serializer which transforms BytesReference to byte[].
* The type of BytesReference is NOT preserved after deserialization, but nothing in opensearch should care.
*/
public class BytesReferenceSerializer implements Serializer<BytesReference, byte[]> {
// This class does not get passed to ehcache itself, so it's not required that classes match after deserialization.

public BytesReferenceSerializer() {}

@Override
public byte[] serialize(BytesReference object) {
return BytesReference.toBytes(object);
}

@Override
public BytesReference deserialize(byte[] bytes) {
if (bytes == null) {
return null;

Check warning on line 33 in server/src/main/java/org/opensearch/common/cache/serializer/BytesReferenceSerializer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/cache/serializer/BytesReferenceSerializer.java#L33

Added line #L33 was not covered by tests
}
return new BytesArray(bytes);
}

@Override
public boolean equals(BytesReference object, byte[] bytes) {
return Arrays.equals(serialize(object), bytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.serializer;

/**
* Defines an interface for serializers, to be used by pluggable caches.
* T is the class of the original object, and U is the serialized class.
*/
public interface Serializer<T, U> {
/**
* Serializes an object.
* @param object A non-serialized object.
* @return The serialized representation of the object.
*/
U serialize(T object);

/**
* Deserializes bytes into an object.
* @param bytes The serialized representation.
* @return The original object.
*/
T deserialize(U bytes);

/**
* Compares an object to a serialized representation of an object.
* @param object A non-serialized objet
* @param bytes Serialized representation of an object
* @return true if representing the same object, false if not
*/
boolean equals(T object, U bytes);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/** A package for serializers used in caches. */
package org.opensearch.common.cache.serializer;
Loading
Loading