Skip to content

Database keystore #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ class AnotherModel(models.Model):

Supported field types include: `EncryptedCharField`, `EncryptedTextField`, `EncryptedDateField`, `EncryptedDateTimeField`, `EncryptedEmailField`, and `EncryptedIntegerField`.

## Deterministic Encryption

`DeterministicEncryptedCharField` provides support for [Deterministic AEAD](https://developers.google.com/tink/deterministic-aead) which means value in the field can be queried with exact matches. However, unlike normal AEAD encryption, an attacker can verify that two messages are equal.

Deterministic encryption requires key of type `AES-SIV` and supports Associated Data.

### Associated Data

The encrypted fields make use of `Authenticated Encryption With Associated Data (AEAD)` which offers confidentiality and integrity within the same mode of operation. This allows the caller to specify a cleartext fragment named `additional authenticated data (aad)` to the encryption and decryption operations and receive cryptographic guarantees that the ciphertext data has not been tampered with.
Expand Down
3 changes: 2 additions & 1 deletion tink_fields/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .fields import * # noqa
from tink import aead
from tink import aead, daead

aead.register()
daead.register()
62 changes: 62 additions & 0 deletions tink_fields/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from dataclasses import dataclass
from os.path import exists
from typing import Optional, TypeVar, Type, TYPE_CHECKING
from django.utils.functional import cached_property

from django.core.exceptions import ImproperlyConfigured
from tink import (
aead,
KeysetHandle,
JsonKeysetReader,
cleartext_keyset_handle,
read_keyset_handle,
)

if TYPE_CHECKING:
from .models import Keyset

P = TypeVar("P")


@dataclass
class KeysetConfig:
path: Optional[str] = None
db_name: Optional[str] = None
master_key_aead: Optional[aead.Aead] = None
cleartext: bool = False

def validate(self):
if not self.path and not self.db_name:
raise ImproperlyConfigured("Keyset path or db_name must be set")
if self.db_name and self.path:
raise ImproperlyConfigured("Only one of keyset path or db_name must be set")

if self.path:
if not exists(self.path):
raise ImproperlyConfigured(f"Keyset {self.path} does not exist")

if not self.cleartext and self.master_key_aead is None:
raise ImproperlyConfigured(
f"Encrypted keysets must specify `master_key_aead`"
)

def primitive(self, cls: Type[P]) -> P:
if self.path:
return self._load_from_path.primitive(cls)
if self.db_name:
return self._load_from_db.primitive(cls)

@cached_property
def _load_from_path(self) -> KeysetHandle:
with open(self.path, "r") as f:
reader = JsonKeysetReader(f.read())
if self.cleartext:
return cleartext_keyset_handle.read(reader)
return read_keyset_handle(reader, self.master_key_aead)

@cached_property
def _load_from_db(self) -> "Keyset":
from .models import Keyset

keyset = Keyset.objects.get(name=self.db_name)
return keyset
242 changes: 166 additions & 76 deletions tink_fields/fields.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
from functools import lru_cache
from typing import Any, Callable, Dict, Optional
from typing import Any, Callable, TYPE_CHECKING, Dict
from django.db import models
from django.db.models.lookups import In
from django.core.exceptions import FieldError, ImproperlyConfigured
from django.utils.functional import cached_property
from tink import (
KeysetHandle,
cleartext_keyset_handle,
read_keyset_handle,
JsonKeysetReader,
aead,
daead,
)
from django.conf import settings
from dataclasses import dataclass
from os.path import exists
from django.utils.encoding import force_bytes, force_str
from django.db.backends.base.base import BaseDatabaseWrapper

from tink_fields.config import KeysetConfig

if TYPE_CHECKING:
from django.db.backends.base.base import BaseDatabaseWrapper


__all__ = [
Expand All @@ -24,34 +24,46 @@
"EncryptedIntegerField",
"EncryptedDateField",
"EncryptedDateTimeField",
"EncryptedBinaryField",
"DeterministicEncryptedField",
"DeterministicEncryptedCharField",
"DeterministicEncryptedEmailField",
"DeterministicEncryptedIntegerField",
]

_config: Dict[str, KeysetConfig] = {}

@dataclass
class KeysetConfig:
path: str
master_key_aead: Optional[aead.Aead] = None
cleartext: bool = False

def validate(self):
if not self.path:
raise ImproperlyConfigured("Keyset path cannot be None or empty")
def _get_config(keyset: str) -> KeysetConfig:
global _config

if not exists(self.path):
raise ImproperlyConfigured(f"Keyset {self.path} does not exist")

if not self.cleartext and self.master_key_aead is None:
raise ImproperlyConfigured(f"Encrypted keysets must specify `master_key_aead`")

if keyset in _config:
return _config[keyset]

class EncryptedField(models.Field):
"""A field that uses Tink primitives to protect the confidentiality and integrity of data"""
config = getattr(settings, "TINK_FIELDS_CONFIG", None)
if config is None:
raise ImproperlyConfigured(
f"Could not find `TINK_FIELDS_CONFIG` attribute in settings"
)

if keyset not in config:
raise ImproperlyConfigured(
f"Could not find configuration for keyset `{keyset}` in `TINK_FIELDS_CONFIG`"
)

keyset_config = KeysetConfig(**config[keyset])
keyset_config.validate()
_config[keyset] = keyset_config

return keyset_config


class BaseEncryptedField(models.Field):
_unsupported_properties = ["primary_key", "db_index", "unique"]
_internal_type = "BinaryField"

_keyset: str
_keyset_handle: KeysetHandle
_keyset_config: KeysetConfig
_aad_callback: Callable[[models.Field], bytes]

def __init__(self, *args, **kwargs):
Expand All @@ -62,76 +74,116 @@ def __init__(self, *args, **kwargs):
)

self._keyset = kwargs.pop("keyset", "default")
self._keyset_handle = self._get_tink_keyset_handle()
self._keyset_config = self._get_config()
self._aad_callback = kwargs.pop("aad_callback", lambda x: b"")

super(EncryptedField, self).__init__(*args, **kwargs)
super(BaseEncryptedField, self).__init__(*args, **kwargs)

def _get_config(self) -> Dict[str, Any]:
config = getattr(settings, "TINK_FIELDS_CONFIG", None)
if config is None:
raise ImproperlyConfigured(
f"Could not find `TINK_FIELDS_CONFIG` attribute in settings"
)
return config
def _get_config(self) -> KeysetConfig:
return _get_config(self._keyset)

def _get_tink_keyset_handle(self) -> KeysetHandle:
"""Read the configuration for the requested keyset and return a respective keyset handle"""
config = self._get_config()
def get_internal_type(self) -> str:
return self._internal_type

if self._keyset not in config:
raise ImproperlyConfigured(
f"Could not find configuration for keyset `{self._keyset}` in `TINK_FIELDS_CONFIG`"
)
@cached_property
def validators(self):
# Temporarily pretend to be whatever type of field we're masquerading
# as, for purposes of constructing validators (needed for
# IntegerField and subclasses).
self.__dict__["_internal_type"] = super(
BaseEncryptedField, self
).get_internal_type()
try:
return super(BaseEncryptedField, self).validators
finally:
del self.__dict__["_internal_type"]

keyset_config = KeysetConfig(**config[self._keyset])
keyset_config.validate()
def to_python_prepare(self, value: bytes) -> Any:
if isinstance(self, models.BinaryField):
return value

with open(keyset_config.path, "r") as f:
reader = JsonKeysetReader(f.read())
if keyset_config.cleartext:
return cleartext_keyset_handle.read(reader)
return read_keyset_handle(reader, keyset_config.master_key_aead)
return force_str(value)

@lru_cache(maxsize=None)
def _get_aead_primitive(self) -> aead.Aead:
return self._keyset_handle.primitive(aead.Aead)

def get_internal_type(self) -> str:
return self._internal_type
class EncryptedField(BaseEncryptedField):
"""A field that uses Tink primitives to protect the confidentiality and integrity of data"""

@cached_property
def _aead_primitive(self) -> aead.Aead:
return self._keyset_config.primitive(aead.Aead)

def get_db_prep_save(self, value: Any, connection: BaseDatabaseWrapper) -> Any:
def get_db_prep_save(self, value: Any, connection: "BaseDatabaseWrapper") -> Any:
val = super(EncryptedField, self).get_db_prep_save(value, connection)
if val is not None:
return connection.Database.Binary(
self._get_aead_primitive().encrypt(
self._aead_primitive.encrypt(force_bytes(val), self._aad_callback(self))
)

def from_db_value(self, value, expression, connection, *args):
if value is not None:
return self.to_python(
self.to_python_prepare(
self._aead_primitive.decrypt(bytes(value), self._aad_callback(self))
)
)


class DeterministicEncryptedField(BaseEncryptedField):
"""Field that is similar to EncryptedField, but support exact match lookups"""

_unsupported_properties = []

@cached_property
def _daead_primitive(self) -> daead.DeterministicAead:
return self._keyset_config.primitive(daead.DeterministicAead)

def get_db_prep_value(
self, value: Any, connection: "BaseDatabaseWrapper", prepared=False
) -> Any:
val = super(DeterministicEncryptedField, self).get_db_prep_value(
value, connection, prepared
)
if val is not None:
return connection.Database.Binary(
self._daead_primitive.encrypt_deterministically(
force_bytes(val), self._aad_callback(self)
)
)

def from_db_value(self, value, expression, connection, *args):
if value is not None:
return self.to_python(
force_str(
self._get_aead_primitive().decrypt(
self.to_python_prepare(
self._daead_primitive.decrypt_deterministically(
bytes(value), self._aad_callback(self)
)
)
)

@property
@lru_cache(maxsize=None)
def validators(self):
# Temporarily pretend to be whatever type of field we're masquerading
# as, for purposes of constructing validators (needed for
# IntegerField and subclasses).
self.__dict__["_internal_type"] = super(
EncryptedField, self
).get_internal_type()
try:
return super(EncryptedField, self).validators
finally:
del self.__dict__["_internal_type"]
def get_db_values_all_keys(
self, value: Any, connection: "BaseDatabaseWrapper", prepared=False
) -> Any:
"""Like get_db_prep_value but return array of values encrypted with every keys in the keyset"""
val = super(DeterministicEncryptedField, self).get_db_prep_value(
value, connection, prepared
)
if val is None:
return []

out = []
aad = self._aad_callback(self)
# XXX: This would run another query. Is there any way to signal that we want a cached all using
# the same primitive set interface?
for items in self._daead_primitive._primitive_set.all():
for key in items:
out.append(
connection.Database.Binary(
key.identifier
+ key.primitive.encrypt_deterministically(force_bytes(val), aad)
)
)

return out


def get_prep_lookup(self):
Expand All @@ -143,12 +195,32 @@ def get_prep_lookup(self):
)


for name, lookup in models.Field.class_lookups.items():
if name != "isnull":
lookup_class = type(
"EncryptedField" + name, (lookup,), {"get_prep_lookup": get_prep_lookup}
class DeterministicEncryptedFieldExactLookup(In):
lookup_name = "exact"

def get_prep_lookup(self):
self.rhs = [self.rhs]
return super().get_prep_lookup()

def get_db_prep_lookup(self, value, connection):
assert len(value) == 1
return (
"%s",
self.lhs.output_field.get_db_values_all_keys(
list(value)[0], connection, prepared=True
),
)
EncryptedField.register_lookup(lookup_class)


for name, lookup in models.Field.class_lookups.items():
for cls in (EncryptedField, DeterministicEncryptedField):
if name != "isnull":
lookup_class = type(
cls.__name__ + name, (lookup,), {"get_prep_lookup": get_prep_lookup}
)
cls.register_lookup(lookup_class)

DeterministicEncryptedField.register_lookup(DeterministicEncryptedFieldExactLookup)


class EncryptedTextField(EncryptedField, models.TextField):
Expand All @@ -173,3 +245,21 @@ class EncryptedDateField(EncryptedField, models.DateField):

class EncryptedDateTimeField(EncryptedField, models.DateTimeField):
pass


class EncryptedBinaryField(EncryptedField, models.BinaryField):
"""Encrypted raw binary data, must be under 2^32 bytes (4.295GB)"""


class DeterministicEncryptedCharField(DeterministicEncryptedField, models.CharField):
pass


class DeterministicEncryptedEmailField(DeterministicEncryptedField, models.EmailField):
pass


class DeterministicEncryptedIntegerField(
DeterministicEncryptedField, models.IntegerField
):
pass
Empty file.
Empty file.
Loading