Skip to content

A DynamoDB library on top of Pydantic and boto3.

License

Notifications You must be signed in to change notification settings

fennelmarkets/dyntastic

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

45 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

dyntastic

CI codecov pypi license

A DynamoDB library on top of Pydantic and boto3.

Installation

pip3 install dyntastic

If the Pydantic binaries are too large for you (they can exceed 90MB), use the following:

pip3 uninstall pydantic  # if pydantic is already installed
pip3 install dyntastic --no-binary pydantic

Usage

The core functionality of this library is provided by the Dyntastic class.

Dyntastic is a subclass of Pydantic's BaseModel, so can be used in all the same places a Pydantic model can be used (FastAPI, etc).

import uuid
from datetime import datetime
from typing import Optional

from dyntastic import Dyntastic
from pydantic import Field

class Product(Dyntastic):
    __table_name__ = "products"
    __hash_key__ = "product_id"

    product_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    name: str
    description: Optional[str] = None
    price: float
    tax: Optional[float] = None


class Event(Dyntastic):
    __table_name__ = "events"
    __hash_key__ = "event_id"
    __range_key__ = "timestamp"

    event_id: str
    timestamp: datetime
    data: dict

# All your favorite pydantic functionality still works:

p = Product(name="bread", price=3.49)
# Product(product_id='d2e91c30-e701-422f-b71b-465b02749f18', name='bread', description=None, price=3.49, tax=None)

p.model_dump()
# {'product_id': 'd2e91c30-e701-422f-b71b-465b02749f18', 'name': 'bread', 'description': None, 'price': 3.49, 'tax': None}

p.model_dump_json()
# '{"product_id": "d2e91c30-e701-422f-b71b-465b02749f18", "name": "bread", "description": null, "price": 3.49, "tax": null}'

Inserting into DynamoDB

Using the Product example from above, simply:

product = Product(name="bread", description="Sourdough Bread", price=3.99)
product.product_id
# d2e91c30-e701-422f-b71b-465b02749f18

# Nothing is written to DynamoDB until .save() is called:
product.save()

Getting Items from DynamoDB

Product.get("d2e91c30-e701-422f-b71b-465b02749f18")
# Product(product_id='d2e91c30-e701-422f-b71b-465b02749f18', name='bread', description="Sourdough Bread", price=3.99, tax=None)

The range key must be provided if one is defined:

Event.get("d2e91c30-e701-422f-b71b-465b02749f18", "2022-02-12T18:27:55.837Z")

Consistent reads are supported:

Event.get(..., consistent_read=True)

A DoesNotExist error is raised by get if a key is not found:

Product.get("nonexistent")
# Traceback (most recent call last):
#   ...
# dyntastic.exceptions.DoesNotExist

Use safe_get instead to return None if the key is not found:

Product.safe_get("nonexistent")
# None

Querying Items in DynamoDB

# A is shorthand for the Attr class (i.e. attribute)
from dyntastic import A

# auto paging iterable
for event in Event.query("some_event_id"):
    print(event)


Event.query("some_event_id", per_page=10)
Event.query("some_event_id")
Event.query("some_event_id", range_key_condition=A.timestamp < datetime(2022, 2, 13))
Event.query("some_event_id", filter_condition=A.some_field == "foo")

# query an index
Event.query(A.my_other_field == 12345, index="my_other_field-index")

# note: Must provide a condition expression rather than just the value
Event.query(123545, index="my_other_field-index")  # errors!

# query an index with an optional filter expression
filter_expression = None
if filter_value:
    filter_expression = A('filter_field').eq(filter_value)
Event.query(
    A.my_other_field == 12345,
    index="my_other_field-index",
    filter_expression=filter_expression
)

# consistent read
Event.query("some_event_id", consistent_read=True)

# specifies the order for index traversal, the default is ascending order
# returns the results in the order in which they are stored by sort key value
Event.query("some_event_id", range_key_condition=A.version.begins_with("2023"), scan_index_forward=False)

DynamoDB Indexes using a KEYS_ONLY or INCLUDE projection are supported:

for event in Event.query("2023-09-22", index="date-keys-only-index"):
    event.id
    # "..."
    event.timestamp
    # datetime(...)

    event.data
    # ValueError: Dyntastic instance was loaded from a KEYS_ONLY or INCLUDE index.
    #             Call refresh() to load the full item, or pass load_full_item=True
    #             to query() or scan()

# automatically fetch the full items
for event in Event.query("2023-09-22", index="date-keys-only-index", load_full_item=True):
    event.data
    # {...}

If you need to manually handle pagination, use query_page:

page = Event.query_page(...)
page.items
# [...]
page.has_more
# True
page.last_evaluated_key
# {"event_id": "some_event_id", "timestamp": "..."}

Event.query_page(..., last_evaluated_key=page.last_evaluated_key)

Scanning Items in DynamoDB

Scanning is done identically to querying, except there are no hash key or range key conditions.

# auto paging iterable
for event in Event.scan():
    pass

Event.scan((A.my_field < 5) & (A.some_other_field.is_in(["a", "b", "c"])))
Event.scan(..., consistent_read=True)

Updating Items in DynamoDB

Examples:

my_item.update(A.my_field.set("new_value"))
my_item.update(A.my_field.set(A.another_field))
my_item.update(A.my_int.set(A.another_int - 10))
my_item.update(A.my_int.set(A.my_int + 1))
my_item.update(A.my_list.append("new_element"))
my_item.update(A.some_attribute.set_default("value_if_not_already_present"))

my_item.update(A.my_field.remove())
my_item.update(A.my_list.remove(2))  # remove by index

my_item.update(A.my_string_set.add("new_element"))
my_item.update(A.my_string_set.add({"new_1", "new_2"}))
my_item.update(A.my_string_set.delete("element_to_remove"))
my_item.update(A.my_string_set.delete({"remove_1", "remove_2"}))

# Multiple updates can be performed at once
my_item.update(
    A.my_field.set("new_value"),
    A.my_int.set(A.my_int + 1),
    ...
)

The data is automatically refreshed after the update request. To disable this behavior, pass refresh=False:

my_item.update(..., refresh=False)

Supports conditions:

my_item.update(..., condition=A.my_field == "something")

By default, if the condition is not met, the update call will be a noop. To instead error in this situation, pass require_condition=True:

my_item.update(..., require_condition=True)

Batch Reads

Multiple items can be read from a table at the same time using the batch_get function.

Note that DynamoDB limits the number of items that can be read at one time to 100 items or 16MB, whichever comes first.

Note that if any of the provided keys are missing from dynamo, they will simply be excluded in the result set.

MyModel.batch_get(["hash_key_1", "hash_key_2", "hash_key_3"])
# => [MyModel(...), MyModel(...)]

For models with a range key defined:

MyModel.batch_get([("hash_key_1", "range_key_1"), ("hash_key_2", "range_key_2")])
# => [MyModel(...), MyModel(...)]

Batch Writes

Save and delete operations may also be performed in batches.

Note that DynamoDB limits the number of items that can be written in a single batch to 25 items or 16MB, whichever comes first. Dyntastic will automatically batch in chunks of 25, or less if desired.

with MyModel.batch_writer():
    MyModel(id="0").delete()
    MyModel(id="1").save()
    MyModel(id="2").save()

# all operations are performed once the `with` context is exited

To configure a smaller batch size, for example when each item is relatively large:

with MyModel.batch_writer(batch_size=2):
    MyModel(id="1").save()
    MyModel(id="2").save()
    # the previous two models are written immediately, since the batch size was reached
    MyModel(id="3).save()

# The final operation is performed here now that the `with` context has exited

Transactions

Dyntastic supports DynamoDB transactions. Transactions are performed using the transaction context manager and can be used to perform operations across one or multiple tables that reside in the same region.

from dyntastic import transaction

with transaction():
    item1 = SomeTable(...)
    item2 = AnotherTable.get(...)
    item1.save()
    item2.update(A.something.set("..."))

Note that DynamoDB limits the number of items that can be written in a single transaction to 100 items or 4MB, whichever comes first. Dyntastic can automatically flush the transaction in chunks of 100 (or fewer if desired) by passing auto_commit=True.

For example, to commit every 50 items:

with transaction(auto_commit=True, commit_every=50):
    item1 = SomeTable(...)
    item2 = AnotherTable.get(...)
    item1.save()
    item2.update(A.something.set("..."))

Create a DynamoDB Table

This functionality is currently meant only for use in unit tests as it does not support configuring throughput.

To create a table with no secondary indexes:

MyModel.create_table()

# Do not wait until the table creation is complete (subsequent operations
# may error if they are performed before the table creation is finished)
MyModel.create_table(wait=False)

To define global secondary indexes (creating local secondary indexes is not currently supported):

# All of the following are equivalent
index1 = "my_field"
index1 = Index("my_field")
index1 = Index("my_field", index_name="my_field-index")

# Range keys are also supported
index2 = Index("my_field", "my_second_field")
index2 = Index("my_field", "my_second_field", index_name="my_field_my_second_field-index")

MyModel.create_table(index1, index2)

Dynamic table names

In some circumstances you may want the table name to be defined dynamically. This can be done by setting the __table_name__ attribute to a Callable that returns the table name from the source of your choice. In the example below, we are using an environment variable.

import os
from dyntastic import Dyntastic

os.environ["MY_TABLE_NAME"] = "my_table"

class Product(Dyntastic):
    __table_name__ = lambda: os.getenv("MY_TABLE_NAME")
    __hash_key__ = "product_id"

    product_id: str

Custom dynamodb endpoint or region for local development

To explicitly define an AWS region or DynamoDB endpoint url (for using a local dynamodb docker instance, for example), set __table_region__ or __table_host__. These attributes can be a string or a Callable that returns a string.

from dyntastic import Dyntastic

class Product(Dyntastic):
    __table_name__ = "products"
    __table_region__ = "us-east-1"
    __table_host__ = "http://localhost:8000"
    __hash_key__ = "product_id"

    product_id: str

You can also set the environment variables DYNTASTIC_HOST and/or DYNTASTIC_REGION to control the behavior of the underlying boto3 client and resource objects.

Note: if both the environment variables and the class attributes are set, the class attributes will take precedence.

import os
from dyntastic import Dyntastic

os.environ["DYNTASTIC_HOST"] = "http://localhost:8000"
os.environ["DYNTASTIC_REGION"] = "us-east-1"

class Product(Dyntastic):
    __table_name__ = "products"
    __hash_key__ = "product_id"

    product_id: str

Contributing / Developer Setup

Make sure just is installed on your system

To setup the dev environment and install dependencies:

# create and activate a new venv
python3 -m venv .venv
. .venv/bin/activate

# install all dev dependencies
just install-dev

# to automatically run pre-commit before all commits
pre-commit install

After making changes, lint all code + run tests:

just pre-commit

# or individually:
just isort
just black
just flake8
just mypy
just test

# run a specific test/tests
just test tests/test_save.py tests/test_get.py
just test tests/some_save.py::test_save_aliased_item

About

A DynamoDB library on top of Pydantic and boto3.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Python 99.4%
  • Just 0.6%