Skip to content

Commit ddb26e2

Browse files
committed
Merge pull request #1072 from tseaver/bigquery-jobs_query
Add job for running queries
2 parents 6df44a2 + 3f3cc73 commit ddb26e2

File tree

5 files changed

+451
-3
lines changed

5 files changed

+451
-3
lines changed

gcloud/bigquery/client.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from gcloud.bigquery.job import CopyJob
2222
from gcloud.bigquery.job import ExtractTableToStorageJob
2323
from gcloud.bigquery.job import LoadTableFromStorageJob
24+
from gcloud.bigquery.job import RunQueryJob
2425

2526

2627
class Client(JSONClient):
@@ -153,3 +154,17 @@ def extract_table_to_storage(self, name, source, *destination_uris):
153154
"""
154155
return ExtractTableToStorageJob(name, source, destination_uris,
155156
client=self)
157+
158+
def run_query(self, name, query):
159+
"""Construct a job for running a SQL query.
160+
161+
:type name: string
162+
:param name: Name of the job.
163+
164+
:type query: string
165+
:param query: SQL query to be executed
166+
167+
:rtype: :class:`gcloud.bigquery.job.RunQueryJob`
168+
:returns: a new ``RunQueryJob`` instance
169+
"""
170+
return RunQueryJob(name, query, client=self)

gcloud/bigquery/job.py

Lines changed: 149 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
from gcloud.exceptions import NotFound
2020
from gcloud._helpers import _datetime_from_microseconds
21+
from gcloud.bigquery.dataset import Dataset
2122
from gcloud.bigquery.table import SchemaField
23+
from gcloud.bigquery.table import Table
2224
from gcloud.bigquery.table import _build_schema_resource
2325
from gcloud.bigquery.table import _parse_schema_resource
2426

@@ -122,6 +124,13 @@ class Encoding(_EnumProperty):
122124
ALLOWED = (UTF_8, ISO_8559_1)
123125

124126

127+
class QueryPriority(_EnumProperty):
128+
"""Pseudo-enum for ``RunQueryJob.priority`` property."""
129+
INTERACTIVE = 'INTERACTIVE'
130+
BATCH = 'BATCH'
131+
ALLOWED = (INTERACTIVE, BATCH)
132+
133+
125134
class SourceFormat(_EnumProperty):
126135
"""Pseudo-enum for ``source_format`` properties."""
127136
CSV = 'CSV'
@@ -403,7 +412,7 @@ class _LoadConfiguration(object):
403412

404413

405414
class LoadTableFromStorageJob(_BaseJob):
406-
"""Asynchronous job for loading data into a BQ table from CloudStorage.
415+
"""Asynchronous job for loading data into a table from CloudStorage.
407416
408417
:type name: string
409418
:param name: the name of the job
@@ -616,7 +625,7 @@ class _CopyConfiguration(object):
616625

617626

618627
class CopyJob(_BaseJob):
619-
"""Asynchronous job: copy data into a BQ table from other tables.
628+
"""Asynchronous job: copy data into a table from other tables.
620629
621630
:type name: string
622631
:param name: the name of the job
@@ -695,7 +704,7 @@ class _ExtractConfiguration(object):
695704

696705

697706
class ExtractTableToStorageJob(_BaseJob):
698-
"""Asynchronous job: extract data from a BQ table into Cloud Storage.
707+
"""Asynchronous job: extract data from a table into Cloud Storage.
699708
700709
:type name: string
701710
:param name: the name of the job
@@ -773,3 +782,140 @@ def _build_resource(self):
773782
self._populate_config_resource(configuration)
774783

775784
return resource
785+
786+
787+
class _QueryConfiguration(object):
788+
"""User-settable configuration options for query jobs."""
789+
# None -> use server default.
790+
_allow_large_results = None
791+
_create_disposition = None
792+
_default_dataset = None
793+
_destination_table = None
794+
_flatten_results = None
795+
_priority = None
796+
_use_query_cache = None
797+
_write_disposition = None
798+
799+
800+
class RunQueryJob(_BaseJob):
801+
"""Asynchronous job: query tables.
802+
803+
:type name: string
804+
:param name: the name of the job
805+
806+
:type query: string
807+
:param query: SQL query string
808+
809+
:type client: :class:`gcloud.bigquery.client.Client`
810+
:param client: A client which holds credentials and project configuration
811+
for the dataset (which requires a project).
812+
"""
813+
def __init__(self, name, query, client):
814+
super(RunQueryJob, self).__init__(name, client)
815+
self.query = query
816+
self._configuration = _QueryConfiguration()
817+
818+
allow_large_results = _TypedProperty('allow_large_results', bool)
819+
"""See:
820+
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.allowLargeResults
821+
"""
822+
823+
create_disposition = CreateDisposition('create_disposition')
824+
"""See:
825+
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.createDisposition
826+
"""
827+
828+
default_dataset = _TypedProperty('default_dataset', Dataset)
829+
"""See:
830+
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.default_dataset
831+
"""
832+
833+
destination_table = _TypedProperty('destination_table', Table)
834+
"""See:
835+
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.destinationTable
836+
"""
837+
838+
flatten_results = _TypedProperty('flatten_results', bool)
839+
"""See:
840+
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.flattenResults
841+
"""
842+
843+
priority = QueryPriority('priority')
844+
"""See:
845+
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.priority
846+
"""
847+
848+
use_query_cache = _TypedProperty('use_query_cache', bool)
849+
"""See:
850+
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.useQueryCache
851+
"""
852+
853+
write_disposition = WriteDisposition('write_disposition')
854+
"""See:
855+
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.writeDisposition
856+
"""
857+
858+
def _destination_table_resource(self):
859+
if self.destination_table is not None:
860+
return {
861+
'projectId': self.destination_table.project,
862+
'datasetId': self.destination_table.dataset_name,
863+
'tableId': self.destination_table.name,
864+
}
865+
866+
def _populate_config_resource(self, configuration):
867+
"""Helper for _build_resource: copy config properties to resource"""
868+
if self.allow_large_results is not None:
869+
configuration['allowLargeResults'] = self.allow_large_results
870+
if self.create_disposition is not None:
871+
configuration['createDisposition'] = self.create_disposition
872+
if self.default_dataset is not None:
873+
configuration['defaultDataset'] = {
874+
'projectId': self.default_dataset.project,
875+
'datasetId': self.default_dataset.name,
876+
}
877+
if self.destination_table is not None:
878+
table_res = self._destination_table_resource()
879+
configuration['destinationTable'] = table_res
880+
if self.flatten_results is not None:
881+
configuration['flattenResults'] = self.flatten_results
882+
if self.priority is not None:
883+
configuration['priority'] = self.priority
884+
if self.use_query_cache is not None:
885+
configuration['useQueryCache'] = self.use_query_cache
886+
if self.write_disposition is not None:
887+
configuration['writeDisposition'] = self.write_disposition
888+
889+
def _build_resource(self):
890+
"""Generate a resource for :meth:`begin`."""
891+
892+
resource = {
893+
'jobReference': {
894+
'projectId': self.project,
895+
'jobId': self.name,
896+
},
897+
'configuration': {
898+
'query': {
899+
'query': self.query,
900+
},
901+
},
902+
}
903+
configuration = resource['configuration']['query']
904+
self._populate_config_resource(configuration)
905+
906+
return resource
907+
908+
def _scrub_local_properties(self, cleaned):
909+
"""Helper: handle subclass properties in cleaned."""
910+
configuration = cleaned['configuration']['query']
911+
dest_remote = configuration.get('destinationTable')
912+
913+
if dest_remote is None:
914+
if self.destination_table is not None:
915+
del self.destination_table
916+
else:
917+
dest_local = self._destination_table_resource()
918+
if dest_remote != dest_local:
919+
assert dest_remote['projectId'] == self.project
920+
dataset = self._client.dataset(dest_remote['datasetId'])
921+
self.destination_table = dataset.table(dest_remote['tableId'])

gcloud/bigquery/test_client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,20 @@ def test_extract_table_to_storage(self):
186186
self.assertEqual(job.source, source)
187187
self.assertEqual(list(job.destination_uris), [DESTINATION])
188188

189+
def test_run_query(self):
190+
from gcloud.bigquery.job import RunQueryJob
191+
PROJECT = 'PROJECT'
192+
JOB = 'job_name'
193+
QUERY = 'select count(*) from persons'
194+
creds = _Credentials()
195+
http = object()
196+
client = self._makeOne(project=PROJECT, credentials=creds, http=http)
197+
job = client.run_query(JOB, QUERY)
198+
self.assertTrue(isinstance(job, RunQueryJob))
199+
self.assertTrue(job._client is client)
200+
self.assertEqual(job.name, JOB)
201+
self.assertEqual(job.query, QUERY)
202+
189203

190204
class _Credentials(object):
191205

0 commit comments

Comments
 (0)