Skip to content

Commit ff6b864

Browse files
tswastleahecole
authored andcommitted
docs: add scheduled query samples (#83)
* docs: add scheduled query samples * test: opt-out of type annotations for now * test: use environment variable for project ID * set quota project * consolidate config creation to conserve quota
1 parent f26155d commit ff6b864

8 files changed

+463
-10
lines changed

bigquery-datatransfer/snippets/conftest.py

+67-8
Original file line numberDiff line numberDiff line change
@@ -12,34 +12,93 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import datetime
16+
import os
17+
import uuid
18+
19+
from google.api_core import client_options
1520
import google.api_core.exceptions
1621
import google.auth
1722
from google.cloud import bigquery
1823
from google.cloud import bigquery_datatransfer
1924
import pytest
2025

2126

27+
def temp_suffix():
28+
now = datetime.datetime.now()
29+
return f"{now.strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex[:8]}"
30+
31+
32+
@pytest.fixture(scope="session")
33+
def bigquery_client(default_credentials):
34+
credentials, project_id = default_credentials
35+
return bigquery.Client(credentials=credentials, project=project_id)
36+
37+
38+
@pytest.fixture(scope="session")
39+
def dataset_id(bigquery_client, project_id):
40+
dataset_id = f"bqdts_{temp_suffix()}"
41+
bigquery_client.create_dataset(f"{project_id}.{dataset_id}")
42+
yield dataset_id
43+
bigquery_client.delete_dataset(dataset_id, delete_contents=True)
44+
45+
2246
@pytest.fixture(scope="session")
2347
def default_credentials():
2448
return google.auth.default(["https://www.googleapis.com/auth/cloud-platform"])
2549

2650

2751
@pytest.fixture(scope="session")
28-
def project_id(default_credentials):
29-
_, project_id = default_credentials
30-
return project_id
52+
def project_id():
53+
return os.environ["GOOGLE_CLOUD_PROJECT"]
3154

3255

3356
@pytest.fixture(scope="session")
34-
def bigquery_client(default_credentials):
35-
credentials, project_id = default_credentials
36-
return bigquery.Client(credentials=credentials, project=project_id)
57+
def service_account_name(default_credentials):
58+
credentials, _ = default_credentials
59+
# Note: this property is not available when running with user account
60+
# credentials, but only service account credentials are used in our test
61+
# infrastructure.
62+
return credentials.service_account_email
3763

3864

3965
@pytest.fixture(scope="session")
40-
def transfer_client(default_credentials):
66+
def transfer_client(default_credentials, project_id):
4167
credentials, _ = default_credentials
42-
return bigquery_datatransfer.DataTransferServiceClient(credentials=credentials)
68+
options = client_options.ClientOptions(quota_project_id=project_id)
69+
70+
transfer_client = bigquery_datatransfer.DataTransferServiceClient(
71+
credentials=credentials, client_options=options
72+
)
73+
74+
# Ensure quota is always attributed to the correct project.
75+
bigquery_datatransfer.DataTransferServiceClient = lambda: transfer_client
76+
77+
return transfer_client
78+
79+
80+
@pytest.fixture(scope="session")
81+
def transfer_config_name(transfer_client, project_id, dataset_id, service_account_name):
82+
from . import manage_transfer_configs, scheduled_query
83+
84+
# Use the transfer_client fixture so we know quota is attributed to the
85+
# correct project.
86+
assert transfer_client is not None
87+
88+
# To conserve limited BQ-DTS quota, this fixture creates only one transfer
89+
# config for a whole session and is used to test the scheduled_query.py and
90+
# the delete operation in manage_transfer_configs.py.
91+
transfer_config = scheduled_query.create_scheduled_query(
92+
{
93+
"project_id": project_id,
94+
"dataset_id": dataset_id,
95+
"service_account_name": service_account_name,
96+
}
97+
)
98+
yield transfer_config.name
99+
manage_transfer_configs.delete_config(
100+
{"transfer_config_name": transfer_config.name}
101+
)
43102

44103

45104
@pytest.fixture

bigquery-datatransfer/snippets/copy_dataset_test.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,17 @@ def source_dataset_id(bigquery_client, project_id):
4242

4343

4444
def test_copy_dataset(
45-
capsys, project_id, destination_dataset_id, source_dataset_id, to_delete_configs
45+
capsys,
46+
transfer_client,
47+
project_id,
48+
destination_dataset_id,
49+
source_dataset_id,
50+
to_delete_configs,
4651
):
52+
# Use the transfer_client fixture so we know quota is attributed to the
53+
# correct project.
54+
assert transfer_client is not None
55+
4756
transfer_config = copy_dataset.copy_dataset(
4857
{
4958
"destination_project_id": project_id,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
def list_configs(override_values={}):
17+
# [START bigquerydatatransfer_list_configs]
18+
from google.cloud import bigquery_datatransfer
19+
20+
transfer_client = bigquery_datatransfer.DataTransferServiceClient()
21+
22+
project_id = "my-project"
23+
# [END bigquerydatatransfer_list_configs]
24+
# To facilitate testing, we replace values with alternatives
25+
# provided by the testing harness.
26+
project_id = override_values.get("project_id", project_id)
27+
# [START bigquerydatatransfer_list_configs]
28+
parent = transfer_client.common_project_path(project_id)
29+
30+
configs = transfer_client.list_transfer_configs(parent=parent)
31+
print("Got the following configs:")
32+
for config in configs:
33+
print(f"\tID: {config.name}, Schedule: {config.schedule}")
34+
# [END bigquerydatatransfer_list_configs]
35+
36+
37+
def update_config(override_values={}):
38+
# [START bigquerydatatransfer_update_config]
39+
from google.cloud import bigquery_datatransfer
40+
from google.protobuf import field_mask_pb2
41+
42+
transfer_client = bigquery_datatransfer.DataTransferServiceClient()
43+
44+
transfer_config_name = "projects/1234/locations/us/transferConfigs/abcd"
45+
new_display_name = "My Transfer Config"
46+
# [END bigquerydatatransfer_update_config]
47+
# To facilitate testing, we replace values with alternatives
48+
# provided by the testing harness.
49+
new_display_name = override_values.get("new_display_name", new_display_name)
50+
transfer_config_name = override_values.get(
51+
"transfer_config_name", transfer_config_name
52+
)
53+
# [START bigquerydatatransfer_update_config]
54+
55+
transfer_config = bigquery_datatransfer.TransferConfig(name=transfer_config_name)
56+
transfer_config.display_name = new_display_name
57+
58+
transfer_config = transfer_client.update_transfer_config(
59+
{
60+
"transfer_config": transfer_config,
61+
"update_mask": field_mask_pb2.FieldMask(paths=["display_name"]),
62+
}
63+
)
64+
65+
print(f"Updated config: '{transfer_config.name}'")
66+
print(f"New display name: '{transfer_config.display_name}'")
67+
# [END bigquerydatatransfer_update_config]
68+
# Return the config name for testing purposes, so that it can be deleted.
69+
return transfer_config
70+
71+
72+
def update_credentials_with_service_account(override_values={}):
73+
# [START bigquerydatatransfer_update_credentials]
74+
from google.cloud import bigquery_datatransfer
75+
from google.protobuf import field_mask_pb2
76+
77+
transfer_client = bigquery_datatransfer.DataTransferServiceClient()
78+
79+
service_account_name = "abcdef-test-sa@abcdef-test.iam.gserviceaccount.com"
80+
transfer_config_name = "projects/1234/locations/us/transferConfigs/abcd"
81+
# [END bigquerydatatransfer_update_credentials]
82+
# To facilitate testing, we replace values with alternatives
83+
# provided by the testing harness.
84+
service_account_name = override_values.get(
85+
"service_account_name", service_account_name
86+
)
87+
transfer_config_name = override_values.get(
88+
"transfer_config_name", transfer_config_name
89+
)
90+
# [START bigquerydatatransfer_update_credentials]
91+
92+
transfer_config = bigquery_datatransfer.TransferConfig(name=transfer_config_name)
93+
94+
transfer_config = transfer_client.update_transfer_config(
95+
{
96+
"transfer_config": transfer_config,
97+
"update_mask": field_mask_pb2.FieldMask(paths=["service_account_name"]),
98+
"service_account_name": service_account_name,
99+
}
100+
)
101+
102+
print("Updated config: '{}'".format(transfer_config.name))
103+
# [END bigquerydatatransfer_update_credentials]
104+
# Return the config name for testing purposes, so that it can be deleted.
105+
return transfer_config
106+
107+
108+
def schedule_backfill(override_values={}):
109+
# [START bigquerydatatransfer_schedule_backfill]
110+
import datetime
111+
112+
from google.cloud import bigquery_datatransfer
113+
114+
transfer_client = bigquery_datatransfer.DataTransferServiceClient()
115+
116+
transfer_config_name = "projects/1234/locations/us/transferConfigs/abcd"
117+
# [END bigquerydatatransfer_schedule_backfill]
118+
# To facilitate testing, we replace values with alternatives
119+
# provided by the testing harness.
120+
transfer_config_name = override_values.get(
121+
"transfer_config_name", transfer_config_name
122+
)
123+
# [START bigquerydatatransfer_schedule_backfill]
124+
now = datetime.datetime.now(datetime.timezone.utc)
125+
start_time = now - datetime.timedelta(days=5)
126+
end_time = now - datetime.timedelta(days=2)
127+
128+
# Some data sources, such as scheduled_query only support daily run.
129+
# Truncate start_time and end_time to midnight time (00:00AM UTC).
130+
start_time = datetime.datetime(
131+
start_time.year, start_time.month, start_time.day, tzinfo=datetime.timezone.utc
132+
)
133+
end_time = datetime.datetime(
134+
end_time.year, end_time.month, end_time.day, tzinfo=datetime.timezone.utc
135+
)
136+
137+
response = transfer_client.schedule_transfer_runs(
138+
parent=transfer_config_name,
139+
start_time=start_time,
140+
end_time=end_time,
141+
)
142+
143+
print("Started transfer runs:")
144+
for run in response.runs:
145+
print(f"backfill: {run.run_time} run: {run.name}")
146+
# [END bigquerydatatransfer_schedule_backfill]
147+
return response.runs
148+
149+
150+
def delete_config(override_values={}):
151+
# [START bigquerydatatransfer_delete_transfer]
152+
import google.api_core.exceptions
153+
from google.cloud import bigquery_datatransfer
154+
155+
transfer_client = bigquery_datatransfer.DataTransferServiceClient()
156+
157+
transfer_config_name = "projects/1234/locations/us/transferConfigs/abcd"
158+
# [END bigquerydatatransfer_delete_transfer]
159+
# To facilitate testing, we replace values with alternatives
160+
# provided by the testing harness.
161+
transfer_config_name = override_values.get(
162+
"transfer_config_name", transfer_config_name
163+
)
164+
# [START bigquerydatatransfer_delete_transfer]
165+
try:
166+
transfer_client.delete_transfer_config(name=transfer_config_name)
167+
except google.api_core.exceptions.NotFound:
168+
print("Transfer config not found.")
169+
else:
170+
print(f"Deleted transfer config: {transfer_config_name}")
171+
# [END bigquerydatatransfer_delete_transfer]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from . import manage_transfer_configs
16+
17+
18+
def test_list_configs(capsys, project_id, transfer_config_name):
19+
manage_transfer_configs.list_configs({"project_id": project_id})
20+
out, _ = capsys.readouterr()
21+
assert "Got the following configs:" in out
22+
assert transfer_config_name in out
23+
24+
25+
def test_update_config(capsys, transfer_config_name):
26+
manage_transfer_configs.update_config(
27+
{
28+
"new_display_name": "name from test_update_config",
29+
"transfer_config_name": transfer_config_name,
30+
}
31+
)
32+
out, _ = capsys.readouterr()
33+
assert "Updated config:" in out
34+
assert transfer_config_name in out
35+
assert "name from test_update_config" in out
36+
37+
38+
def test_update_credentials_with_service_account(
39+
capsys, project_id, service_account_name, transfer_config_name
40+
):
41+
manage_transfer_configs.update_credentials_with_service_account(
42+
{
43+
"project_id": project_id,
44+
"service_account_name": service_account_name,
45+
"transfer_config_name": transfer_config_name,
46+
}
47+
)
48+
out, _ = capsys.readouterr()
49+
assert "Updated config:" in out
50+
assert transfer_config_name in out
51+
52+
53+
def test_schedule_backfill(capsys, transfer_config_name):
54+
runs = manage_transfer_configs.schedule_backfill(
55+
{
56+
"transfer_config_name": transfer_config_name,
57+
}
58+
)
59+
out, _ = capsys.readouterr()
60+
assert "Started transfer runs:" in out
61+
# Run IDs should include the transfer name in their path.
62+
assert transfer_config_name in out
63+
# Check that there are runs for 5, 4, 3, and 2 days ago.
64+
assert len(runs) == 4
65+
66+
67+
def test_delete_config(capsys, transfer_config_name):
68+
# transfer_config_name fixture in conftest.py calls the delete config
69+
# sample. To conserve limited BQ-DTS quota we only make basic checks.
70+
assert len(transfer_config_name) != 0

0 commit comments

Comments
 (0)