Skip to content

CRDT Set prototype #140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions qbit-core/src/commonMain/kotlin/qbit/Conn.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import qbit.index.Indexer
import qbit.index.InternalDb
import qbit.index.RawEntity
import qbit.ns.Namespace
import qbit.resolving.lastWriterWinsResolve
import qbit.resolving.crdtResolve
import qbit.resolving.logsDiff
import qbit.serialization.*
import qbit.spi.Storage
Expand Down Expand Up @@ -122,14 +122,14 @@ class QConn(
}
}

override suspend fun update(trxLog: TrxLog, newLog: TrxLog, newDb: InternalDb) {
override suspend fun update(trxLog: TrxLog, baseDb: InternalDb, newLog: TrxLog, newDb: InternalDb) {
val (log, db) =
if (hasConcurrentTrx(trxLog)) {
mergeLogs(trxLog, this.trxLog, newLog, newDb)
mergeLogs(trxLog, this.trxLog, newLog, baseDb, newDb)
} else {
newLog to newDb
}
storage.overwrite(Namespace("refs")["head"], newLog.hash.bytes)
storage.overwrite(Namespace("refs")["head"], log.hash.bytes)
this.trxLog = log
this.db = db
}
Expand All @@ -141,6 +141,7 @@ class QConn(
baseLog: TrxLog,
committedLog: TrxLog,
committingLog: TrxLog,
baseDb: InternalDb,
newDb: InternalDb
): Pair<TrxLog, InternalDb> {
val logsDifference = logsDiff(baseLog, committedLog, committingLog, resolveNode)
Expand All @@ -149,7 +150,7 @@ class QConn(
.logAEntities()
.toEavsList()
val reconciliationEavs = logsDifference
.reconciliationEntities(lastWriterWinsResolve { db.attr(it) })
.reconciliationEntities(crdtResolve(baseDb::pullEntity, db::attr))
.toEavsList()

val mergedDb = newDb
Expand Down
82 changes: 68 additions & 14 deletions qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import kotlin.reflect.KClass
* - List<Ref>
*/

val scalarRange = 0u..31u
val listRange = 32u..63u
val pnCounterRange = 64u..95u
val registerRange = 96u..127u
val setRange = 128u..159u

@Suppress("UNCHECKED_CAST")
sealed class DataType<out T : Any> {

Expand All @@ -31,35 +37,63 @@ sealed class DataType<out T : Any> {
private val values: Array<DataType<*>>
get() = arrayOf(QBoolean, QByte, QInt, QLong, QString, QBytes, QGid, QRef)

fun ofCode(code: Byte): DataType<*>? =
if (code <= 19) {
values.firstOrNull { it.code == code }
} else {
values.map { it.list() }.firstOrNull { it.code == code }
}
fun ofCode(code: Byte): DataType<*>? = when(code.toUByte()) {
in scalarRange -> values.firstOrNull { it.code == code }
in listRange -> ofCode((code.toUByte() - listRange.first).toByte())?.list()
in pnCounterRange -> ofCode((code.toUByte() - pnCounterRange.first).toByte())?.counter()
in registerRange -> ofCode((code.toUByte() - registerRange.first).toByte())?.register()
in setRange -> ofCode((code.toUByte() - setRange.first).toByte())?.set()
else -> null
}

fun <T : Any> ofValue(value: T?): DataType<T>? = when (value) {
fun <T : Any> ofValue(value: T?): DataType<T>? = when (value) { // TODO REFACTOR
is Boolean -> QBoolean as DataType<T>
is Byte -> QByte as DataType<T>
is Int -> QInt as DataType<T>
is Long -> QLong as DataType<T>
is String -> QString as DataType<T>
is ByteArray -> QBytes as DataType<T>
is Gid -> QGid as DataType<T>
is List<*> -> value.firstOrNull()?.let { ofValue(it)?.list() } as DataType<T>
is List<*> -> value.firstOrNull()?.let { ofValue(it)?.list() } as DataType<T>?
else -> QRef as DataType<T>
}
}

fun isScalar(): Boolean = code.toUByte() in scalarRange

fun list(): QList<T> {
// TODO: make types hierarchy: Type -> List | (Scalar -> (Ref | Value))
require(!isList()) { "Nested lists is not allowed" }
require(this.isScalar()) { "Nested wrappers is not allowed" }
return QList(this)
}

fun isList(): Boolean = (code.toInt().and(32)) > 0
fun isList(): Boolean = code.toUByte() in listRange

fun ref(): Boolean = this == QRef || this is QList<*> && this.itemsType == QRef
fun counter(): QCounter<T> {
require(this is QByte || this is QInt || this is QLong) { "Only primitive number values are allowed in counters" }
return QCounter(this)
}

fun isCounter(): Boolean = code.toUByte() in pnCounterRange

fun register(): QRegister<T> {
require(this.isScalar()) { "Nested wrappers is not allowed" }
return QRegister(this)
}

fun isRegister(): Boolean = code.toUByte() in registerRange

fun set(): QSet<T> {
require(this.isScalar()) { "Nested wrappers is not allowed" }
return QSet(this)
}

fun isSet(): Boolean = code.toUByte() in setRange

fun ref(): Boolean = this == QRef ||
this is QList<*> && this.itemsType == QRef ||
this is QRegister<*> && this.itemsType == QRef ||
this is QSet<*> && this.itemsType == QRef

fun value(): Boolean = !ref()

Expand All @@ -73,15 +107,35 @@ sealed class DataType<out T : Any> {
is QBytes -> ByteArray::class
is QGid -> Gid::class
is QList<*> -> this.itemsType.typeClass()
is QCounter<*> -> this.primitiveType.typeClass()
is QRegister<*> -> this.itemsType.typeClass()
is QSet<*> -> this.itemsType.typeClass()
QRef -> Any::class
}
}

}

data class QList<out I : Any>(val itemsType: DataType<I>) : DataType<List<I>>() {

override val code = (32 + itemsType.code).toByte()
override val code = (listRange.first.toByte() + itemsType.code).toByte()

}

data class QCounter<out I : Any>(val primitiveType: DataType<I>) : DataType<I>() {

override val code = (pnCounterRange.first.toByte() + primitiveType.code).toByte()

}

data class QRegister<out I : Any>(val itemsType: DataType<I>) : DataType<I>() {

override val code = (registerRange.first.toByte() + itemsType.code).toByte()

}

data class QSet<out I : Any>(val itemsType: DataType<I>) : DataType<Set<I>>() {

override val code = (setRange.first.toByte() + itemsType.code).toByte()

}

Expand Down Expand Up @@ -134,4 +188,4 @@ object QGid : DataType<Gid>() {
}

fun isListOfVals(list: List<Any>?) =
list == null || list.isEmpty() || list.firstOrNull()?.let { DataType.ofValue(it)?.value() } ?: true
list == null || list.isEmpty() || list.firstOrNull()?.let { DataType.ofValue(it)?.value() } ?: true // TODO REFACTOR
14 changes: 14 additions & 0 deletions qbit-core/src/commonMain/kotlin/qbit/api/model/Register.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package qbit.api.model

import kotlinx.serialization.Serializable

@Serializable
class Register<T>(
private var entries: List<T>
) {
fun getValues(): List<T> = entries

fun setValue(t: T) {
entries = listOf(t)
}
}
8 changes: 4 additions & 4 deletions qbit-core/src/commonMain/kotlin/qbit/factoring/EntityGraph.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ internal data class EntityBuilder(
return DetachedEntity(gid!!, attrValues)
}

private fun resolveRefs(attrVallue: Any, resolve: (Any) -> Gid): Any {
private fun resolveRefs(attrValue: Any, resolve: (Any) -> Gid): Any {
return when {
attrVallue is Ref -> resolve(attrVallue.obj)
attrVallue is List<*> && attrVallue.firstOrNull() is Ref -> (attrVallue as List<Ref>).map { resolve(it.obj) }
else -> attrVallue
attrValue is Ref -> resolve(attrValue.obj)
attrValue is List<*> && attrValue.firstOrNull() is Ref -> (attrValue as List<Ref>).map { resolve(it.obj) }
else -> attrValue
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ import kotlinx.serialization.modules.SerializersModule
import kotlinx.serialization.modules.SerializersModuleCollector
import qbit.api.QBitException
import qbit.api.gid.Gid
import qbit.api.model.Attr
import qbit.api.model.Eav
import qbit.api.model.Entity
import qbit.api.model.Tombstone
import qbit.api.model.*
import qbit.api.tombstone
import qbit.collections.IdentityMap
import qbit.factoring.*
Expand Down Expand Up @@ -99,6 +96,18 @@ internal class EntityEncoder(
ValueKind.REF_LIST -> {
serializeRefList(value as Iterable<Any>)
}
ValueKind.VALUE_REGISTER -> {
(value as Register<Any>).getValues()
}
ValueKind.REF_REGISTER -> {
serializeRefList((value as Register<Any>).getValues())
}
ValueKind.VALUE_SET -> {
(value as Set<Any>).toList()
}
ValueKind.REF_SET -> {
serializeRefList(value as Set<Any>)
}
}

val fieldPointer = Pointer(
Expand Down Expand Up @@ -185,7 +194,7 @@ internal class EntityEncoder(

enum class ValueKind {

SCALAR_VALUE, SCALAR_REF, VALUE_LIST, REF_LIST;
SCALAR_VALUE, SCALAR_REF, VALUE_LIST, REF_LIST, VALUE_REGISTER, REF_REGISTER, VALUE_SET, REF_SET;

companion object {
fun of(descriptor: SerialDescriptor, index: Int, value: Any): ValueKind {
Expand All @@ -194,7 +203,10 @@ enum class ValueKind {
isScalarValue(value) -> {
SCALAR_VALUE
}
isScalarRef(elementDescriptor) -> {
isScalarRef(
elementDescriptor,
value
) -> {
SCALAR_REF
}
isValueList(
Expand All @@ -209,6 +221,30 @@ enum class ValueKind {
) -> {
REF_LIST
}
isValueRegister(
elementDescriptor,
value
) -> {
VALUE_REGISTER
}
isRefRegister(
elementDescriptor,
value
) -> {
REF_REGISTER
}
isValueSet(
elementDescriptor,
value
) -> {
VALUE_SET
}
isRefSet(
elementDescriptor,
value
) -> {
REF_SET
}
else -> {
throw AssertionError("Writing primitive via encodeSerializableElement")
}
Expand All @@ -219,8 +255,8 @@ enum class ValueKind {
// other primitive values are encoded directly via encodeXxxElement
value is Gid || value is ByteArray

private fun isScalarRef(elementDescriptor: SerialDescriptor) =
elementDescriptor.kind == StructureKind.CLASS
private fun isScalarRef(elementDescriptor: SerialDescriptor, value: Any) =
elementDescriptor.kind == StructureKind.CLASS && value !is Register<*>

private fun isValueList(elementDescriptor: SerialDescriptor, value: Any) =
elementDescriptor.kind == StructureKind.LIST &&
Expand All @@ -231,6 +267,21 @@ enum class ValueKind {
private fun isRefList(elementDescriptor: SerialDescriptor, value: Any) =
elementDescriptor.kind == StructureKind.LIST && value is List<*>

private fun isValueRegister(elementDescriptor: SerialDescriptor, value: Any) =
value is Register<*> && //TODO DEDUPLICATE
(elementDescriptor.getElementDescriptor(0).getElementDescriptor(0).kind is PrimitiveKind ||
elementDescriptor.getElementDescriptor(0).getElementDescriptor(0).kind == StructureKind.LIST) // ByteArray

private fun isRefRegister(elementDescriptor: SerialDescriptor, value: Any) = // TODO REFACTOR
value is Register<*> && elementDescriptor.getElementDescriptor(0).getElementDescriptor(0).kind is StructureKind.CLASS

private fun isValueSet(elementDescriptor: SerialDescriptor, value: Any) =
value is Set<*> &&
(elementDescriptor.getElementDescriptor(0).kind is PrimitiveKind ||
elementDescriptor.getElementDescriptor(0).kind == StructureKind.LIST) // ByteArray

private fun isRefSet(elementDescriptor: SerialDescriptor, value: Any) =
value is Set<*> && elementDescriptor.getElementDescriptor(0).kind is StructureKind.CLASS
}

}
Expand Down
4 changes: 2 additions & 2 deletions qbit-core/src/commonMain/kotlin/qbit/index/IndexDb.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class IndexDb(
val attrValues = rawEntity.entries.map {
val attr = schema[it.key]
require(attr != null) { "There is no attribute with name ${it.key}" }
require(attr.list || it.value.size == 1) { "Corrupted ${attr.name} of $gid - it is scalar, but multiple values has been found: ${it.value}" }
require(attr.list || DataType.ofCode(attr.type)!!.isRegister() || DataType.ofCode(attr.type)!!.isSet() || it.value.size == 1) { "Corrupted ${attr.name} of $gid - it is scalar, but multiple values has been found: ${it.value}" }
val value =
if (attr.list) it.value.map { e -> fixNumberType(attr, e) }
if (attr.list || DataType.ofCode(attr.type)!!.isRegister() || DataType.ofCode(attr.type)!!.isSet()) it.value.map { e -> fixNumberType(attr, e) }
else fixNumberType(attr, it.value[0])
attr to value
}
Expand Down
Loading