-
Notifications
You must be signed in to change notification settings - Fork 4
1. Getting Started
from pydynamodb import connect
cursor = connect(aws_access_key_id="aws_access_key_id",
aws_secret_access_key="aws_secret_access_key",
region_name="region_name").cursor()
cursor.execute('SELECT * FROM "ddb_table_name"')
print(cursor.fetchall()) from pydynamodb import connect
cursor = connect(aws_access_key_id="aws_access_key_id",
aws_secret_access_key="aws_secret_access_key",
region_name="region_name").cursor()
cursor.execute('SELECT * FROM "ddb_table_name"')
rows = cursor.fetchall()
for row in rows:
print(row)PyDynamoDB is able to serialize the parameters which passed to DDB.
from pydynamodb import connect
cursor = connect(aws_access_key_id="aws_access_key_id",
aws_secret_access_key="aws_secret_access_key",
region_name="region_name").cursor()
cursor.execute("""INSERT INTO "ddb_table_name" VALUE {
'partition_key' = ?, 'sort_key' = ?,
'col_str' = ?, 'col_num' = ?, 'col_byte' = ?,
'col_ss' = ?, 'col_ns' = ?, 'col_bs' = ?,
'col_list' = ?, 'col_map' = ?, 'col_nested' = ?
}""", ["pkey_value", "skey_value", "str", 100, b"ABC", # String, Number, Bytes
{"str", "str"}, {100, 100}, {b"A", b"B"}, # String/Numnber/Bytes Set
["str", 100, b"ABC"], # List
{"key1": "val", "key2": "val"}, # Map
["str", 100, {"key1": "val"}] # Nested Structure
])
cursor.execute('SELECT col_nested FROM "ddb_table_name" WHERE partition_key = ?', ["pkey_value"])
print(cursor.fetchall())PyDynamoDB is able to deserialize the result set to Python built-in data types.
cursor.execute("""
SELECT col_nested FROM "ddb_table_name" WHERE partition_key = ? AND sort_key = ?
""", ["pkey_value", "skey_value"])
assert cursor.fetchone() == (["str", 100, {"key1": "val"}])DDB is a NoSQL database. That means except key schema, the data in each row may have flexible columns or types. PyDynamoDB cannot get a completed result set description before fetching all result data. So you have to use fetch* method to iterate the whole result set, then call cursor.description to get the full columns description.
from pydynamodb import connect
cursor = connect(aws_access_key_id="aws_access_key_id",
aws_secret_access_key="aws_secret_access_key",
region_name="region_name").cursor()
cursor.execute('SELECT * FROM "ddb_table_name"')
print(cursor.fetchall())
print(cursor.description)Dict Result Set will return the dataset as a list of Dict structure.
from pydynamodb import connect
cursor = connect(aws_access_key_id="aws_access_key_id",
aws_secret_access_key="aws_secret_access_key",
region_name="region_name",
cursor_class=DictCursor).cursor()
cursor.execute('SELECT * FROM "ddb_table_name"')
print(cursor.fetchall()) [{"col1": "value", "col2": ["subset"...]}, {"col1": "value", "col2": ["subset"...]}, ...]DDB can only support limited data types. PyDynamoDB is able to support date and datetime conversion when you write the this kind of type data. The data will be stored in DDB with ISO 8601 format as string. If want to get correct type back, you could use Annotation Functions in read operations.
- DATE(column [, 'date format']) - Use ISO 8601 to convert data by default. For the special format, the second parameter should be given.
- DATETIME(column [, 'datetime format']) - Use ISO 8601 to convert data by default. For the special format, the second parameter should be given.
- NUMBER(column) - Convert to float
- BOOL(column) - Convert to bool
Write date & datetime data:
from datetime import date, datetime
sql = """
INSERT INTO "ddb_table_name" VALUE {
'key_partition': ?, 'key_sort': ?, 'col_date': ?, 'col_datetime': ?
}
"""
cursor.execute(sql, [
"pk_value", 0, date(2022, 10, 18), datetime(2022, 10, 18, 13, 55, 34),
])Read data:
cursor.execute("""
SELECT DATE(col_date), DATETIME(col_datetime) FROM "ddb_table_name"
WHERE key_partition = 'pk_value' AND key_sort = 0
"""- SUBSTR(column, start [, length]) - Extract and return a substring with a predefined length starting at a specified position in a source string
- SUBSTRING(column, start [, length]) - Same as SUBSTR
- REPLACE(column, pattern [, replacement]) - Return a copy of a string with each instance of a substring replaced by another substring
- TRIM(column) - Return a copy of a string that has specified characters removed from the beginning and the end of a string
- UPPER(column) - Return a copy of a string with all of the characters converted to uppercase
- LOWER(column) - Return a copy of a string with all the characters converted to lowercase
You could set alias for columns in select statement.
cursor.execute("""
SELECT DATE(col_date) col1, DATETIME(col_datetime) col2 FROM "ddb_table_name"
WHERE key_partition = 'pk_value' AND key_sort = 0
"""
assert [d[0] for d in cursor.description] == ["col1", "col2"]Support Boto3 credentials.
from pydynamodb import connect
cursor = connect(aws_access_key_id="aws_access_key_id",
aws_secret_access_key="aws_secret_access_key",
region_name="region_name").cursor() from pydynamodb import connect
cursor = connect(aws_access_key_id="aws_access_key_id",
aws_secret_access_key="aws_secret_access_key",
aws_session_token="aws_session_token",
region_name="region_name").cursor()You will be prompted to enter the MFA code. The program execution will be blocked until the MFA code is entered.
from pydynamodb import connect
cursor = connect(duration_seconds=3600,
serial_number="arn:aws:iam::ACCOUNT_NUMBER_WITHOUT_HYPHENS:mfa/MFA_DEVICE_ID",
region_name="region_name").cursor()The shared credentials file has a default location of ~/.aws/credentials.
If you use the default profile, there is no need to specify credential information.
from pydynamodb import connect
cursor = connect(region_name="region_name").cursor() from pydynamodb import connect
cursor = connect(profile_name="profile_name",
region_name="region_name").cursor() from pydynamodb import connect
cursor = connect(role_arn="role_arn",
role_session_name="PyDynamoDB-session",
duration_seconds=3600,
region_name="region_name").cursor()Serial Number and MFA token code are required.
from pydynamodb import connect
cursor = connect(role_arn="role_arn",
role_session_name="PyDynamoDB-session",
duration_seconds=3600,
serial_number="arn:aws:iam::ACCOUNT_NUMBER_WITHOUT_HYPHENS:mfa/MFA_DEVICE_ID",
token_code="7766933",
region_name="region_name").cursor() from pydynamodb import connect
cursor = connect(role_arn="role_arn",
duration_seconds=3600,
principal_arn="principal_arn",
saml_assertion="saml_assertion",
region_name="region_name").cursor() from pydynamodb import connect
cursor = connect(role_arn="role_arn",
role_session_name="PyDynamoDB-session",
duration_seconds=3600,
web_identity_token="web_identity_token",
provider_id="www.amazon.com",
region_name="region_name").cursor()No need to specify credential information.
from pydynamodb import connect
cursor = connect(region_name="region_name").cursor()