Skip to content

Commit 4ccfd9d

Browse files
jbatswast
authored andcommitted
bigquery: modify CopyJob (#4051)
Update CopyJob and CopyJobConfig to conform to the new design for jobs.
1 parent 634019b commit 4ccfd9d

File tree

6 files changed

+247
-103
lines changed

6 files changed

+247
-103
lines changed

bigquery/google/cloud/bigquery/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from google.cloud.bigquery.client import Client
3333
from google.cloud.bigquery.dataset import AccessEntry
3434
from google.cloud.bigquery.dataset import Dataset
35+
from google.cloud.bigquery.job import CopyJobConfig
3536
from google.cloud.bigquery.job import ExtractJobConfig
3637
from google.cloud.bigquery.schema import SchemaField
3738
from google.cloud.bigquery.table import Table
@@ -42,6 +43,7 @@
4243
'ArrayQueryParameter',
4344
'Client',
4445
'Dataset',
46+
'CopyJobConfig',
4547
'ExtractJobConfig',
4648
'ScalarQueryParameter',
4749
'SchemaField',

bigquery/google/cloud/bigquery/client.py

+38-11
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from __future__ import absolute_import
1818

19+
import collections
1920
import uuid
2021

2122
from google.api.core import page_iterator
@@ -492,25 +493,39 @@ def load_table_from_storage(self, job_id, destination, *source_uris):
492493
"""
493494
return LoadJob(job_id, destination, source_uris, client=self)
494495

495-
def copy_table(self, job_id, destination, *sources):
496-
"""Construct a job for copying one or more tables into another table.
496+
def copy_table(self, sources, destination, job_id=None, job_config=None):
497+
"""Start a job for copying one or more tables into another table.
497498
498499
See
499500
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy
500501
501-
:type job_id: str
502-
:param job_id: Name of the job.
502+
:type sources: One of:
503+
:class:`~google.cloud.bigquery.table.TableReference`
504+
sequence of
505+
:class:`~google.cloud.bigquery.table.TableReference`
506+
:param sources: Table or tables to be copied.
503507
504-
:type destination: :class:`google.cloud.bigquery.table.Table`
508+
509+
:type destination: :class:`google.cloud.bigquery.table.TableReference`
505510
:param destination: Table into which data is to be copied.
506511
507-
:type sources: sequence of :class:`google.cloud.bigquery.table.Table`
508-
:param sources: tables to be copied.
512+
:type job_id: str
513+
:param job_id: (Optional) The ID of the job.
514+
515+
:type job_config: :class:`google.cloud.bigquery.job.CopyJobConfig`
516+
:param job_config: (Optional) Extra configuration options for the job.
509517
510518
:rtype: :class:`google.cloud.bigquery.job.CopyJob`
511519
:returns: a new ``CopyJob`` instance
512520
"""
513-
return CopyJob(job_id, destination, sources, client=self)
521+
job_id = _make_job_id(job_id)
522+
523+
if not isinstance(sources, collections.Sequence):
524+
sources = [sources]
525+
job = CopyJob(job_id, sources, destination, client=self,
526+
job_config=job_config)
527+
job.begin()
528+
return job
514529

515530
def extract_table(self, source, *destination_uris, **kwargs):
516531
"""Start a job to extract a table into Cloud Storage files.
@@ -541,9 +556,7 @@ def extract_table(self, source, *destination_uris, **kwargs):
541556
:returns: a new ``ExtractJob`` instance
542557
"""
543558
job_config = kwargs.get('job_config')
544-
job_id = kwargs.get('job_id')
545-
if job_id is None:
546-
job_id = str(uuid.uuid4())
559+
job_id = _make_job_id(kwargs.get('job_id'))
547560

548561
job = ExtractJob(
549562
job_id, source, list(destination_uris), client=self,
@@ -667,3 +680,17 @@ def _item_to_table(iterator, resource):
667680
:returns: The next table in the page.
668681
"""
669682
return Table.from_api_repr(resource, iterator.client)
683+
684+
685+
def _make_job_id(job_id):
686+
"""Construct an ID for a new job.
687+
688+
:type job_id: str or ``NoneType``
689+
:param job_id: the user-provided job ID
690+
691+
:rtype: str
692+
:returns: A job ID
693+
"""
694+
if job_id is None:
695+
return str(uuid.uuid4())
696+
return job_id

bigquery/google/cloud/bigquery/job.py

+101-63
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class Compression(_EnumApiResourceProperty):
126126
NONE = 'NONE'
127127

128128

129-
class CreateDisposition(_EnumProperty):
129+
class CreateDisposition(_EnumApiResourceProperty):
130130
"""Pseudo-enum for ``create_disposition`` properties."""
131131
CREATE_IF_NEEDED = 'CREATE_IF_NEEDED'
132132
CREATE_NEVER = 'CREATE_NEVER'
@@ -159,7 +159,7 @@ class SourceFormat(_EnumProperty):
159159
AVRO = 'AVRO'
160160

161161

162-
class WriteDisposition(_EnumProperty):
162+
class WriteDisposition(_EnumApiResourceProperty):
163163
"""Pseudo-enum for ``write_disposition`` properties."""
164164
WRITE_APPEND = 'WRITE_APPEND'
165165
WRITE_TRUNCATE = 'WRITE_TRUNCATE'
@@ -688,7 +688,8 @@ def output_rows(self):
688688
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.autodetect
689689
"""
690690

691-
create_disposition = CreateDisposition('create_disposition')
691+
create_disposition = CreateDisposition('create_disposition',
692+
'createDisposition')
692693
"""See
693694
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.createDisposition
694695
"""
@@ -733,7 +734,8 @@ def output_rows(self):
733734
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.sourceFormat
734735
"""
735736

736-
write_disposition = WriteDisposition('write_disposition')
737+
write_disposition = WriteDisposition('write_disposition',
738+
'writeDisposition')
737739
"""See
738740
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.writeDisposition
739741
"""
@@ -853,13 +855,51 @@ def from_api_repr(cls, resource, client):
853855
return job
854856

855857

856-
class _CopyConfiguration(object):
857-
"""User-settable configuration options for copy jobs.
858+
class CopyJobConfig(object):
859+
"""Configuration options for copy jobs.
858860
859-
Values which are ``None`` -> server defaults.
861+
All properties in this class are optional. Values which are ``None`` ->
862+
server defaults.
860863
"""
861-
_create_disposition = None
862-
_write_disposition = None
864+
865+
def __init__(self):
866+
self._properties = {}
867+
868+
create_disposition = CreateDisposition('create_disposition',
869+
'createDisposition')
870+
"""See
871+
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.createDisposition
872+
"""
873+
874+
write_disposition = WriteDisposition('write_disposition',
875+
'writeDisposition')
876+
"""See
877+
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.writeDisposition
878+
"""
879+
880+
def to_api_repr(self):
881+
"""Build an API representation of the copy job config.
882+
883+
:rtype: dict
884+
:returns: A dictionary in the format used by the BigQuery API.
885+
"""
886+
return copy.deepcopy(self._properties)
887+
888+
@classmethod
889+
def from_api_repr(cls, resource):
890+
"""Factory: construct a job configuration given its API representation
891+
892+
:type resource: dict
893+
:param resource:
894+
An extract job configuration in the same representation as is
895+
returned from the API.
896+
897+
:rtype: :class:`google.cloud.bigquery.job.ExtractJobConfig`
898+
:returns: Configuration parsed from ``resource``.
899+
"""
900+
config = cls()
901+
config._properties = copy.deepcopy(resource)
902+
return config
863903

864904

865905
class CopyJob(_AsyncJob):
@@ -868,41 +908,45 @@ class CopyJob(_AsyncJob):
868908
:type job_id: str
869909
:param job_id: the job's ID, within the project belonging to ``client``.
870910
871-
:type destination: :class:`google.cloud.bigquery.table.Table`
872-
:param destination: Table into which data is to be loaded.
873-
874-
:type sources: list of :class:`google.cloud.bigquery.table.Table`
911+
:type sources: list of :class:`google.cloud.bigquery.table.TableReference`
875912
:param sources: Table into which data is to be loaded.
876913
914+
:type destination: :class:`google.cloud.bigquery.table.TableReference`
915+
:param destination: Table into which data is to be loaded.
916+
877917
:type client: :class:`google.cloud.bigquery.client.Client`
878918
:param client: A client which holds credentials and project configuration
879919
for the dataset (which requires a project).
880-
"""
881920
921+
:type job_config: :class:`~google.cloud.bigquery.job.CopyJobConfig`
922+
:param job_config:
923+
(Optional) Extra configuration options for the copy job.
924+
"""
882925
_JOB_TYPE = 'copy'
883926

884-
def __init__(self, job_id, destination, sources, client):
927+
def __init__(self, job_id, sources, destination, client, job_config=None):
885928
super(CopyJob, self).__init__(job_id, client)
929+
930+
if job_config is None:
931+
job_config = CopyJobConfig()
932+
886933
self.destination = destination
887934
self.sources = sources
888-
self._configuration = _CopyConfiguration()
889-
890-
create_disposition = CreateDisposition('create_disposition')
891-
"""See
892-
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.createDisposition
893-
"""
935+
self._configuration = job_config
894936

895-
write_disposition = WriteDisposition('write_disposition')
896-
"""See
897-
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.writeDisposition
898-
"""
937+
@property
938+
def create_disposition(self):
939+
"""See
940+
:class:`~google.cloud.bigquery.job.CopyJobConfig.create_disposition`.
941+
"""
942+
return self._configuration.create_disposition
899943

900-
def _populate_config_resource(self, configuration):
901-
"""Helper for _build_resource: copy config properties to resource"""
902-
if self.create_disposition is not None:
903-
configuration['createDisposition'] = self.create_disposition
904-
if self.write_disposition is not None:
905-
configuration['writeDisposition'] = self.write_disposition
944+
@property
945+
def write_disposition(self):
946+
"""See
947+
:class:`~google.cloud.bigquery.job.CopyJobConfig.write_disposition`.
948+
"""
949+
return self._configuration.write_disposition
906950

907951
def _build_resource(self):
908952
"""Generate a resource for :meth:`begin`."""
@@ -913,31 +957,27 @@ def _build_resource(self):
913957
'tableId': table.table_id,
914958
} for table in self.sources]
915959

916-
resource = {
960+
configuration = self._configuration.to_api_repr()
961+
configuration['sourceTables'] = source_refs
962+
configuration['destinationTable'] = {
963+
'projectId': self.destination.project,
964+
'datasetId': self.destination.dataset_id,
965+
'tableId': self.destination.table_id,
966+
}
967+
968+
return {
917969
'jobReference': {
918970
'projectId': self.project,
919971
'jobId': self.job_id,
920972
},
921973
'configuration': {
922-
self._JOB_TYPE: {
923-
'sourceTables': source_refs,
924-
'destinationTable': {
925-
'projectId': self.destination.project,
926-
'datasetId': self.destination.dataset_id,
927-
'tableId': self.destination.table_id,
928-
},
929-
},
974+
self._JOB_TYPE: configuration,
930975
},
931976
}
932-
configuration = resource['configuration'][self._JOB_TYPE]
933-
self._populate_config_resource(configuration)
934-
935-
return resource
936977

937978
def _copy_configuration_properties(self, configuration):
938979
"""Helper: assign subclass configuration properties in cleaned."""
939-
self.create_disposition = configuration.get('createDisposition')
940-
self.write_disposition = configuration.get('writeDisposition')
980+
self._configuration._properties = copy.deepcopy(configuration)
941981

942982
@classmethod
943983
def from_api_repr(cls, resource, client):
@@ -958,27 +998,23 @@ def from_api_repr(cls, resource, client):
958998
:rtype: :class:`google.cloud.bigquery.job.CopyJob`
959999
:returns: Job parsed from ``resource``.
9601000
"""
961-
job_id, config = cls._get_resource_config(resource)
962-
dest_config = config['destinationTable']
963-
ds_ref = DatasetReference(dest_config['projectId'],
964-
dest_config['datasetId'],)
965-
dataset = Dataset(ds_ref)
966-
table_ref = TableReference(dataset, dest_config['tableId'])
967-
destination = Table(table_ref, client=client)
1001+
job_id, config_resource = cls._get_resource_config(resource)
1002+
config = CopyJobConfig.from_api_repr(config_resource)
1003+
destination = TableReference.from_api_repr(
1004+
config_resource['destinationTable'])
9681005
sources = []
969-
source_configs = config.get('sourceTables')
1006+
source_configs = config_resource.get('sourceTables')
9701007
if source_configs is None:
971-
single = config.get('sourceTable')
1008+
single = config_resource.get('sourceTable')
9721009
if single is None:
9731010
raise KeyError(
9741011
"Resource missing 'sourceTables' / 'sourceTable'")
9751012
source_configs = [single]
9761013
for source_config in source_configs:
977-
ds_ref = DatasetReference(source_config['projectId'],
978-
source_config['datasetId'])
979-
table_ref = ds_ref.table(source_config['tableId'])
980-
sources.append(Table(table_ref, client=client))
981-
job = cls(job_id, destination, sources, client=client)
1014+
table_ref = TableReference.from_api_repr(source_config)
1015+
sources.append(table_ref)
1016+
job = cls(
1017+
job_id, sources, destination, client=client, job_config=config)
9821018
job._set_properties(resource)
9831019
return job
9841020

@@ -1017,7 +1053,7 @@ def __init__(self):
10171053
"""
10181054

10191055
def to_api_repr(self):
1020-
"""Build an API representation of the extact job config.
1056+
"""Build an API representation of the extract job config.
10211057
10221058
:rtype: dict
10231059
:returns: A dictionary in the format used by the BigQuery API.
@@ -1243,7 +1279,8 @@ def __init__(self, job_id, query, client,
12431279
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.allowLargeResults
12441280
"""
12451281

1246-
create_disposition = CreateDisposition('create_disposition')
1282+
create_disposition = CreateDisposition('create_disposition',
1283+
'createDisposition')
12471284
"""See
12481285
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.createDisposition
12491286
"""
@@ -1289,7 +1326,8 @@ def __init__(self, job_id, query, client,
12891326
reference/rest/v2/jobs#configuration.dryRun
12901327
"""
12911328

1292-
write_disposition = WriteDisposition('write_disposition')
1329+
write_disposition = WriteDisposition('write_disposition',
1330+
'writeDisposition')
12931331
"""See
12941332
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.writeDisposition
12951333
"""

0 commit comments

Comments
 (0)