Skip to content

Commit e57b152

Browse files
authored
Add 'Client.get_job' API wrapper. (#3804)
* Allow assigning 'None' to '_TypedProperty' properties. * Ensure that configuration properties are copied when (re)loading jobs.
1 parent f0a69ce commit e57b152

File tree

7 files changed

+462
-41
lines changed

7 files changed

+462
-41
lines changed

bigquery/.coveragerc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ exclude_lines =
99
pragma: NO COVER
1010
# Ignore debug-only repr
1111
def __repr__
12+
# Ignore abstract methods
13+
raise NotImplementedError

bigquery/google/cloud/bigquery/_helpers.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,8 @@ def _validate(self, value):
299299
300300
:raises: ValueError on a type mismatch.
301301
"""
302+
if value is None:
303+
return
302304
if not isinstance(value, self.property_type):
303305
raise ValueError('Required type: %s' % (self.property_type,))
304306

@@ -396,6 +398,14 @@ def __init__(self, name, type_, value):
396398
self.type_ = type_
397399
self.value = value
398400

401+
def __eq__(self, other):
402+
if not isinstance(other, ScalarQueryParameter):
403+
return NotImplemented
404+
return(
405+
self.name == other.name and
406+
self.type_ == other.type_ and
407+
self.value == other.value)
408+
399409
@classmethod
400410
def positional(cls, type_, value):
401411
"""Factory for positional paramater.
@@ -473,6 +483,14 @@ def __init__(self, name, array_type, values):
473483
self.array_type = array_type
474484
self.values = values
475485

486+
def __eq__(self, other):
487+
if not isinstance(other, ArrayQueryParameter):
488+
return NotImplemented
489+
return(
490+
self.name == other.name and
491+
self.array_type == other.array_type and
492+
self.values == other.values)
493+
476494
@classmethod
477495
def positional(cls, array_type, values):
478496
"""Factory for positional parameters.
@@ -566,6 +584,14 @@ def __init__(self, name, *sub_params):
566584
types[sub.name] = sub.type_
567585
values[sub.name] = sub.value
568586

587+
def __eq__(self, other):
588+
if not isinstance(other, StructQueryParameter):
589+
return NotImplemented
590+
return(
591+
self.name == other.name and
592+
self.struct_types == other.struct_types and
593+
self.struct_values == other.struct_values)
594+
569595
@classmethod
570596
def positional(cls, *sub_params):
571597
"""Factory for positional parameters.
@@ -636,6 +662,18 @@ def to_api_repr(self):
636662
return resource
637663

638664

665+
def _query_param_from_api_repr(resource):
666+
"""Helper: construct concrete query parameter from JSON resource."""
667+
qp_type = resource['parameterType']
668+
if 'arrayType' in qp_type:
669+
klass = ArrayQueryParameter
670+
elif 'structTypes' in qp_type:
671+
klass = StructQueryParameter
672+
else:
673+
klass = ScalarQueryParameter
674+
return klass.from_api_repr(resource)
675+
676+
639677
class QueryParametersProperty(object):
640678
"""Custom property type, holding query parameter instances."""
641679

bigquery/google/cloud/bigquery/client.py

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,35 @@ def job_from_resource(self, resource):
187187
return QueryJob.from_api_repr(resource, self)
188188
raise ValueError('Cannot parse job resource')
189189

190+
def get_job(self, job_id, project=None):
191+
"""Fetch a job for the project associated with this client.
192+
193+
See
194+
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
195+
196+
:type job_id: str
197+
:param job_id: Name of the job.
198+
199+
:type project: str
200+
:param project:
201+
project ID owning the job (defaults to the client's project)
202+
203+
:rtype: :class:`~google.cloud.bigquery.job._AsyncJob`
204+
:returns:
205+
Concrete job instance, based on the resource returned by the API.
206+
"""
207+
extra_params = {'projection': 'full'}
208+
209+
if project is None:
210+
project = self.project
211+
212+
path = '/projects/{}/jobs/{}'.format(project, job_id)
213+
214+
resource = self._connection.api_request(
215+
method='GET', path=path, query_params=extra_params)
216+
217+
return self.job_from_resource(resource)
218+
190219
def list_jobs(self, max_results=None, page_token=None, all_users=None,
191220
state_filter=None):
192221
"""List jobs for the project associated with this client.
@@ -237,14 +266,14 @@ def list_jobs(self, max_results=None, page_token=None, all_users=None,
237266
max_results=max_results,
238267
extra_params=extra_params)
239268

240-
def load_table_from_storage(self, job_name, destination, *source_uris):
269+
def load_table_from_storage(self, job_id, destination, *source_uris):
241270
"""Construct a job for loading data into a table from CloudStorage.
242271
243272
See
244273
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load
245274
246-
:type job_name: str
247-
:param job_name: Name of the job.
275+
:type job_id: str
276+
:param job_id: Name of the job.
248277
249278
:type destination: :class:`google.cloud.bigquery.table.Table`
250279
:param destination: Table into which data is to be loaded.
@@ -256,16 +285,16 @@ def load_table_from_storage(self, job_name, destination, *source_uris):
256285
:rtype: :class:`google.cloud.bigquery.job.LoadJob`
257286
:returns: a new ``LoadJob`` instance
258287
"""
259-
return LoadJob(job_name, destination, source_uris, client=self)
288+
return LoadJob(job_id, destination, source_uris, client=self)
260289

261-
def copy_table(self, job_name, destination, *sources):
290+
def copy_table(self, job_id, destination, *sources):
262291
"""Construct a job for copying one or more tables into another table.
263292
264293
See
265294
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy
266295
267-
:type job_name: str
268-
:param job_name: Name of the job.
296+
:type job_id: str
297+
:param job_id: Name of the job.
269298
270299
:type destination: :class:`google.cloud.bigquery.table.Table`
271300
:param destination: Table into which data is to be copied.
@@ -276,16 +305,16 @@ def copy_table(self, job_name, destination, *sources):
276305
:rtype: :class:`google.cloud.bigquery.job.CopyJob`
277306
:returns: a new ``CopyJob`` instance
278307
"""
279-
return CopyJob(job_name, destination, sources, client=self)
308+
return CopyJob(job_id, destination, sources, client=self)
280309

281-
def extract_table_to_storage(self, job_name, source, *destination_uris):
310+
def extract_table_to_storage(self, job_id, source, *destination_uris):
282311
"""Construct a job for extracting a table into Cloud Storage files.
283312
284313
See
285314
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract
286315
287-
:type job_name: str
288-
:param job_name: Name of the job.
316+
:type job_id: str
317+
:param job_id: Name of the job.
289318
290319
:type source: :class:`google.cloud.bigquery.table.Table`
291320
:param source: table to be extracted.
@@ -298,17 +327,17 @@ def extract_table_to_storage(self, job_name, source, *destination_uris):
298327
:rtype: :class:`google.cloud.bigquery.job.ExtractJob`
299328
:returns: a new ``ExtractJob`` instance
300329
"""
301-
return ExtractJob(job_name, source, destination_uris, client=self)
330+
return ExtractJob(job_id, source, destination_uris, client=self)
302331

303-
def run_async_query(self, job_name, query,
332+
def run_async_query(self, job_id, query,
304333
udf_resources=(), query_parameters=()):
305334
"""Construct a job for running a SQL query asynchronously.
306335
307336
See
308337
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query
309338
310-
:type job_name: str
311-
:param job_name: Name of the job.
339+
:type job_id: str
340+
:param job_id: Name of the job.
312341
313342
:type query: str
314343
:param query: SQL query to be executed
@@ -327,7 +356,7 @@ def run_async_query(self, job_name, query,
327356
:rtype: :class:`google.cloud.bigquery.job.QueryJob`
328357
:returns: a new ``QueryJob`` instance
329358
"""
330-
return QueryJob(job_name, query, client=self,
359+
return QueryJob(job_id, query, client=self,
331360
udf_resources=udf_resources,
332361
query_parameters=query_parameters)
333362

bigquery/google/cloud/bigquery/job.py

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
from google.cloud.bigquery._helpers import QueryParametersProperty
3333
from google.cloud.bigquery._helpers import ScalarQueryParameter
3434
from google.cloud.bigquery._helpers import StructQueryParameter
35+
from google.cloud.bigquery._helpers import UDFResource
3536
from google.cloud.bigquery._helpers import UDFResourcesProperty
3637
from google.cloud.bigquery._helpers import _EnumProperty
38+
from google.cloud.bigquery._helpers import _query_param_from_api_repr
3739
from google.cloud.bigquery._helpers import _TypedProperty
3840

3941
_DONE_STATE = 'DONE'
@@ -61,6 +63,22 @@
6163
}
6264

6365

66+
def _bool_or_none(value):
67+
"""Helper: deserialize boolean value from JSON string."""
68+
if isinstance(value, bool):
69+
return value
70+
if value is not None:
71+
return value.lower() in ['t', 'true', '1']
72+
73+
74+
def _int_or_none(value):
75+
"""Helper: deserialize int value from JSON string."""
76+
if isinstance(value, int):
77+
return value
78+
if value is not None:
79+
return int(value)
80+
81+
6482
def _error_result_to_exception(error_result):
6583
"""Maps BigQuery error reasons to an exception.
6684
@@ -311,6 +329,10 @@ def _scrub_local_properties(self, cleaned):
311329
"""Helper: handle subclass properties in cleaned."""
312330
pass
313331

332+
def _copy_configuration_properties(self, configuration):
333+
"""Helper: assign subclass configuration properties in cleaned."""
334+
raise NotImplementedError("Abstract")
335+
314336
def _set_properties(self, api_response):
315337
"""Update properties from resource in body of ``api_response``
316338
@@ -330,6 +352,8 @@ def _set_properties(self, api_response):
330352

331353
self._properties.clear()
332354
self._properties.update(cleaned)
355+
configuration = cleaned['configuration'][self._JOB_TYPE]
356+
self._copy_configuration_properties(configuration)
333357

334358
# For Future interface
335359
self._set_future_result()
@@ -731,7 +755,7 @@ def _populate_config_resource(self, configuration):
731755
if self.quote_character is not None:
732756
configuration['quote'] = self.quote_character
733757
if self.skip_leading_rows is not None:
734-
configuration['skipLeadingRows'] = self.skip_leading_rows
758+
configuration['skipLeadingRows'] = str(self.skip_leading_rows)
735759
if self.source_format is not None:
736760
configuration['sourceFormat'] = self.source_format
737761
if self.write_disposition is not None:
@@ -769,6 +793,28 @@ def _scrub_local_properties(self, cleaned):
769793
schema = cleaned.pop('schema', {'fields': ()})
770794
self.schema = _parse_schema_resource(schema)
771795

796+
def _copy_configuration_properties(self, configuration):
797+
"""Helper: assign subclass configuration properties in cleaned."""
798+
self.allow_jagged_rows = _bool_or_none(
799+
configuration.get('allowJaggedRows'))
800+
self.allow_quoted_newlines = _bool_or_none(
801+
configuration.get('allowQuotedNewlines'))
802+
self.autodetect = _bool_or_none(
803+
configuration.get('autodetect'))
804+
self.create_disposition = configuration.get('createDisposition')
805+
self.encoding = configuration.get('encoding')
806+
self.field_delimiter = configuration.get('fieldDelimiter')
807+
self.ignore_unknown_values = _bool_or_none(
808+
configuration.get('ignoreUnknownValues'))
809+
self.max_bad_records = _int_or_none(
810+
configuration.get('maxBadRecords'))
811+
self.null_marker = configuration.get('nullMarker')
812+
self.quote_character = configuration.get('quote')
813+
self.skip_leading_rows = _int_or_none(
814+
configuration.get('skipLeadingRows'))
815+
self.source_format = configuration.get('sourceFormat')
816+
self.write_disposition = configuration.get('writeDisposition')
817+
772818
@classmethod
773819
def from_api_repr(cls, resource, client):
774820
"""Factory: construct a job given its API representation
@@ -879,6 +925,11 @@ def _build_resource(self):
879925

880926
return resource
881927

928+
def _copy_configuration_properties(self, configuration):
929+
"""Helper: assign subclass configuration properties in cleaned."""
930+
self.create_disposition = configuration.get('createDisposition')
931+
self.write_disposition = configuration.get('writeDisposition')
932+
882933
@classmethod
883934
def from_api_repr(cls, resource, client):
884935
"""Factory: construct a job given its API representation
@@ -1012,6 +1063,14 @@ def _build_resource(self):
10121063

10131064
return resource
10141065

1066+
def _copy_configuration_properties(self, configuration):
1067+
"""Helper: assign subclass configuration properties in cleaned."""
1068+
self.compression = configuration.get('compression')
1069+
self.destination_format = configuration.get('destinationFormat')
1070+
self.field_delimiter = configuration.get('fieldDelimiter')
1071+
self.print_header = _bool_or_none(
1072+
configuration.get('printHeader'))
1073+
10151074
@classmethod
10161075
def from_api_repr(cls, resource, client):
10171076
"""Factory: construct a job given its API representation
@@ -1207,7 +1266,8 @@ def _populate_config_resource(self, configuration):
12071266
if self.maximum_billing_tier is not None:
12081267
configuration['maximumBillingTier'] = self.maximum_billing_tier
12091268
if self.maximum_bytes_billed is not None:
1210-
configuration['maximumBytesBilled'] = self.maximum_bytes_billed
1269+
configuration['maximumBytesBilled'] = str(
1270+
self.maximum_bytes_billed)
12111271
if len(self._udf_resources) > 0:
12121272
configuration[self._UDF_KEY] = [
12131273
{udf_resource.udf_type: udf_resource.value}
@@ -1257,6 +1317,25 @@ def _scrub_local_properties(self, cleaned):
12571317
configuration = cleaned['configuration']['query']
12581318

12591319
self.query = configuration['query']
1320+
1321+
def _copy_configuration_properties(self, configuration):
1322+
"""Helper: assign subclass configuration properties in cleaned."""
1323+
self.allow_large_results = _bool_or_none(
1324+
configuration.get('allowLargeResults'))
1325+
self.flatten_results = _bool_or_none(
1326+
configuration.get('flattenResults'))
1327+
self.use_query_cache = _bool_or_none(
1328+
configuration.get('useQueryCache'))
1329+
self.use_legacy_sql = _bool_or_none(
1330+
configuration.get('useLegacySql'))
1331+
1332+
self.create_disposition = configuration.get('createDisposition')
1333+
self.priority = configuration.get('priority')
1334+
self.write_disposition = configuration.get('writeDisposition')
1335+
self.maximum_billing_tier = configuration.get('maximumBillingTier')
1336+
self.maximum_bytes_billed = _int_or_none(
1337+
configuration.get('maximumBytesBilled'))
1338+
12601339
dest_remote = configuration.get('destinationTable')
12611340

12621341
if dest_remote is None:
@@ -1265,9 +1344,30 @@ def _scrub_local_properties(self, cleaned):
12651344
else:
12661345
dest_local = self._destination_table_resource()
12671346
if dest_remote != dest_local:
1268-
dataset = self._client.dataset(dest_remote['datasetId'])
1347+
project = dest_remote['projectId']
1348+
dataset = self._client.dataset(
1349+
dest_remote['datasetId'], project=project)
12691350
self.destination = dataset.table(dest_remote['tableId'])
12701351

1352+
def_ds = configuration.get('defaultDataset')
1353+
if def_ds is None:
1354+
if self.default_dataset is not None:
1355+
del self.default_dataset
1356+
else:
1357+
project = def_ds['projectId']
1358+
self.default_dataset = self._client.dataset(def_ds['datasetId'])
1359+
1360+
udf_resources = []
1361+
for udf_mapping in configuration.get(self._UDF_KEY, ()):
1362+
key_val, = udf_mapping.items()
1363+
udf_resources.append(UDFResource(key_val[0], key_val[1]))
1364+
self._udf_resources = udf_resources
1365+
1366+
self._query_parameters = [
1367+
_query_param_from_api_repr(mapping)
1368+
for mapping in configuration.get(self._QUERY_PARAMETERS_KEY, ())
1369+
]
1370+
12711371
@classmethod
12721372
def from_api_repr(cls, resource, client):
12731373
"""Factory: construct a job given its API representation

0 commit comments

Comments
 (0)