Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from os import environ

from airflow import DAG
from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator

TABLE_NAME = environ.get('DYNAMO_TABLE_NAME', 'ExistingDynamoDbTableName')
BUCKET_NAME = environ.get('S3_BUCKET_NAME', 'ExistingS3BucketName')


with DAG(
dag_id='example_dynamodb_to_s3',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as dag:

# [START howto_transfer_dynamodb_to_s3]
backup_db = DynamoDBToS3Operator(
task_id='backup_db',
dynamodb_table_name=TABLE_NAME,
s3_bucket_name=BUCKET_NAME,
# Max output file size in bytes. If the Table is too large, multiple files will be created.
file_size=1000,
Comment on lines +36 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please extend the example/docs to include also the segment explanation?

To parallelize the replication, users can create multiple tasks of DynamoDBToS3Operator.
For instance to replicate with parallelism of 2, create two tasks like:
.. code-block:: python
op1 = DynamoDBToS3Operator(
task_id="replicator-1",
dynamodb_table_name="hello",
dynamodb_scan_kwargs={
"TotalSegments": 2,
"Segment": 0,
},
...,
)
op2 = DynamoDBToS3Operator(
task_id="replicator-2",
dynamodb_table_name="hello",
dynamodb_scan_kwargs={
"TotalSegments": 2,
"Segment": 1,
},
...,

(and remove the segment example from the comments)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changed should be in b0278a1

)
# [END howto_transfer_dynamodb_to_s3]
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from os import environ

from airflow import DAG
from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator

TABLE_NAME = environ.get('DYNAMO_TABLE_NAME', 'ExistingDynamoDbTableName')
BUCKET_NAME = environ.get('S3_BUCKET_NAME', 'ExistingS3BucketName')


with DAG(
dag_id='example_dynamodb_to_s3_segmented',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as dag:

# [START howto_transfer_dynamodb_to_s3_segmented]
# Segmenting allows the transfer to be parallelized into {segment} number of parallel tasks.
backup_db_segment_1 = DynamoDBToS3Operator(
task_id='backup-1',
dynamodb_table_name=TABLE_NAME,
s3_bucket_name=BUCKET_NAME,
# Max output file size in bytes. If the Table is too large, multiple files will be created.
file_size=1000,
dynamodb_scan_kwargs={
"TotalSegments": 2,
"Segment": 0,
},
)

backup_db_segment_2 = DynamoDBToS3Operator(
task_id="backup-2",
dynamodb_table_name=TABLE_NAME,
s3_bucket_name=BUCKET_NAME,
# Max output file size in bytes. If the Table is too large, multiple files will be created.
file_size=1000,
dynamodb_scan_kwargs={
"TotalSegments": 2,
"Segment": 1,
},
)
# [END howto_transfer_dynamodb_to_s3_segmented]
29 changes: 4 additions & 25 deletions airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,37 +55,16 @@ def _upload_file_to_s3(
class DynamoDBToS3Operator(BaseOperator):
"""
Replicates records from a DynamoDB table to S3.
It scans a DynamoDB table and write the received records to a file
It scans a DynamoDB table and writes the received records to a file
on the local filesystem. It flushes the file to S3 once the file size
exceeds the file size limit specified by the user.

Users can also specify a filtering criteria using dynamodb_scan_kwargs
to only replicate records that satisfy the criteria.

To parallelize the replication, users can create multiple tasks of DynamoDBToS3Operator.
For instance to replicate with parallelism of 2, create two tasks like:

.. code-block:: python

op1 = DynamoDBToS3Operator(
task_id="replicator-1",
dynamodb_table_name="hello",
dynamodb_scan_kwargs={
"TotalSegments": 2,
"Segment": 0,
},
...,
)

op2 = DynamoDBToS3Operator(
task_id="replicator-2",
dynamodb_table_name="hello",
dynamodb_scan_kwargs={
"TotalSegments": 2,
"Segment": 1,
},
...,
)
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/transfer:DynamoDBToS3Operator`

:param dynamodb_table_name: Dynamodb table to replicate data from
:param s3_bucket_name: S3 bucket to replicate data to
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ hooks:
transfers:
- source-integration-name: Amazon DynamoDB
target-integration-name: Amazon Simple Storage Service (S3)
how-to-guide: /docs/apache-airflow-providers-amazon/operators/transfer/dynamodb_to_s3.rst
python-module: airflow.providers.amazon.aws.transfers.dynamodb_to_s3
- source-integration-name: Google Cloud Storage (GCS)
target-integration-name: Amazon Simple Storage Service (S3)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.


Amazon DynamoDB to S3 Transfer Operator
=======================================

Use the DynamoDBToS3Operator transfer to copy the contents of an existing Amazon DynamoDB table
to an existing Amazon Simple Storage Service (S3) bucket.

Prerequisite Tasks
^^^^^^^^^^^^^^^^^^

.. include:: ../_partials/prerequisite_tasks.rst

.. _howto/transfer:DynamoDBToS3Operator:

DynamoDB To S3 Operator
^^^^^^^^^^^^^^^^^^^^^^^

This operator replicates records from a DynamoDB table to a file in an S3 bucket.
It scans a DynamoDB table and writes the received records to a file on the local
filesystem. It flushes the file to S3 once the file size exceeds the file size limit
specified by the user.

Users can also specify a filtering criteria using dynamodb_scan_kwargs to only replicate
records that satisfy the criteria.

To get more information visit:
:class:`~airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBToS3Operator`

Example usage:

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_dynamodb_to_s3.py
:language: python
:dedent: 4
:start-after: [START howto_transfer_dynamodb_to_s3]
:end-before: [END howto_transfer_dynamodb_to_s3]

To parallelize the replication, users can create multiple DynamoDBToS3Operator tasks using the
``TotalSegments`` parameter. For instance to replicate with parallelism of 2, create two tasks:

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_dynamodb_to_s3_segmented.py
:language: python
:dedent: 4
:start-after: [START howto_transfer_dynamodb_to_s3_segmented]
:end-before: [END howto_transfer_dynamodb_to_s3_segmented]