diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/traffic/OffHeapTrafficRecorder.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/traffic/OffHeapTrafficRecorder.java index b3c2ebd19..7bf11129d 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/traffic/OffHeapTrafficRecorder.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/traffic/OffHeapTrafficRecorder.java @@ -23,11 +23,19 @@ package io.evitadb.store.traffic; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.util.Pool; import io.evitadb.api.configuration.ServerOptions; import io.evitadb.api.query.Query; import io.evitadb.api.requestResponse.mutation.Mutation; import io.evitadb.core.traffic.TrafficRecorder; +import io.evitadb.store.kryo.ObservableOutput; +import io.evitadb.store.offsetIndex.model.StorageRecord; +import io.evitadb.store.query.QuerySerializationKryoConfigurer; +import io.evitadb.store.service.KryoFactory; +import io.evitadb.store.traffic.data.QueryContainer; import io.evitadb.store.traffic.data.SessionTraffic; +import io.evitadb.store.wal.WalKryoConfigurer; import io.evitadb.utils.Assert; import javax.annotation.Nonnull; @@ -54,6 +62,15 @@ * @author Jan Novotný (novotny@fg.cz), FG Forrest a.s. (c) 2024 */ public class OffHeapTrafficRecorder implements TrafficRecorder, Closeable { + private static final Pool OFF_HEAP_TRAFFIC_RECORDER_KRYO_POOL = new Pool<>(true, false) { + @Override + protected Kryo create() { + return KryoFactory.createKryo( + WalKryoConfigurer.INSTANCE + .andThen(QuerySerializationKryoConfigurer.INSTANCE) + ); + } + }; /** * Size of a single memory slot used for storing queries and mutations. */ @@ -115,7 +132,16 @@ public void recordQuery(@Nonnull UUID sessionId, @Nonnull Query query, int total final SessionTraffic sessionTraffic = this.trackedSessionsIndex.get(sessionId); if (sessionTraffic != null) { final int blockPeek = prepareStorageBlock(sessionTraffic); - // TODO JNO - serialize query + /* TODO JNO - tady bude potřeba naučit StorageRecord zapisovat do různých regionů */ + /* TODO JNO - možná strčit ObservableOutput přímo dovnitř SessionTrafficu a peek neřešit (ten je uvnitř) */ + /* TODO JNO - možná udělat chytrý OutputStream, který si bude umět doalokovat paměť, když bude chybět */ + final StorageRecord queryContainerStorageRecord = new StorageRecord<>( + OFF_HEAP_TRAFFIC_RECORDER_KRYO_POOL.obtain(), + new ObservableOutput<>(), + 0L, + false, + new QueryContainer(sessionId, query, totalRecordCount, primaryKeys) + ); } } diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/traffic/data/QueryContainer.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/traffic/data/QueryContainer.java index 9d407f2ea..f9b11c4ec 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/traffic/data/QueryContainer.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/traffic/data/QueryContainer.java @@ -23,10 +23,21 @@ package io.evitadb.store.traffic.data; +import io.evitadb.api.query.Query; + +import javax.annotation.Nonnull; +import java.util.UUID; + /** - * TODO JNO - document me + * Container for a query and its metadata. * * @author Jan Novotný (novotny@fg.cz), FG Forrest a.s. (c) 2024 */ -public class QueryContainer { +public record QueryContainer( + @Nonnull UUID sessionId, + @Nonnull Query query, + int totalRecordCount, + @Nonnull int[] primaryKeys +) { + }