Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1260: Expose 'jobs/list' results #1279

Merged
merged 7 commits into from
Dec 16, 2015
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions gcloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,84 @@ def dataset(self, dataset_name):
"""
return Dataset(dataset_name, client=self)

def _job_from_resource(self, resource):
"""Detect correct job type from resource and instantiate.

Helper for :meth:`list_jobs`.

:type resource: dict
:param resource: one job resource from API response

:rtype; One of:
:class:`gcloud.bigquery.job.LoadTableFromStorageJob`,
:class:`gcloud.bigquery.job.CopyJob,
:class:`gcloud.bigquery.job.ExtractTableToStorageJob,
:class:`gcloud.bigquery.job.RunAsyncQueryJob,
:class:`gcloud.bigquery.job.RunSyncQueryJob
:returns: the job instance, constructed via the resource
"""
config = resource['configuration']
if 'load' in config:
return LoadTableFromStorageJob.from_api_repr(resource, self)
elif 'copy' in config:
return CopyJob.from_api_repr(resource, self)
elif 'extract' in config:
return ExtractTableToStorageJob.from_api_repr(resource, self)
elif 'query' in config:
return RunAsyncQueryJob.from_api_repr(resource, self)
raise ValueError('Cannot parse job resource')

def list_jobs(self, max_results=None, page_token=None, all_users=None,
state_filter=None):
"""List jobs for the project associated with this client.

See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs/list

:type max_results: int
:param max_results: maximum number of jobs to return, If not
passed, defaults to a value set by the API.

:type page_token: string
:param page_token: opaque marker for the next "page" of jobs. If
not passed, the API will return the first page of
jobs.

:type all_users: boolean
:param all_users: if true, include jobs owned by all users in the
project.

:type state_filter: string
:param state_filter: if passed, include only jobs matching the given
state. One of "done", "pending", or "running".

:rtype: tuple, (list, str)
:returns: list of job instances, plus a "next page token" string:
if the token is not None, indicates that more jobs can be
retrieved with another call, passing that value as
``page_token``).
"""
params = {}

if max_results is not None:
params['maxResults'] = max_results

if page_token is not None:
params['pageToken'] = page_token

if all_users is not None:
params['allUsers'] = all_users

if state_filter is not None:
params['stateFilter'] = state_filter

path = '/projects/%s/jobs' % (self.project,)
resp = self.connection.api_request(method='GET', path=path,
query_params=params)
jobs = [self._job_from_resource(resource)
for resource in resp['jobs']]
return jobs, resp.get('nextPageToken')

def load_table_from_storage(self, job_name, destination, *source_uris):
"""Construct a job for loading data into a table from CloudStorage.

Expand Down
165 changes: 147 additions & 18 deletions gcloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,31 @@ def _set_properties(self, api_response):
self._properties.clear()
self._properties.update(cleaned)

@classmethod
def _get_resource_config(cls, resource):
"""Helper for :meth:`from_api_repr`

:type resource: dict
:param resource: resource for the job

:rtype: dict
:returns: tuple (string, dict), where the first element is the
job name and the second contains job-specific configuration.
:raises: :class:`KeyError` if the resource has no identifier, or
is missing the appropriate configuration.
"""
if ('jobReference' not in resource or
'jobId' not in resource['jobReference']):
raise KeyError('Resource lacks required identity information: '
'["jobReference"]["jobId"]')

This comment was marked as spam.

name = resource['jobReference']['jobId']
if ('configuration' not in resource or
cls._CONFIG_KEY not in resource['configuration']):
raise KeyError('Resource lacks required configuration: '
'["configuration"]["%s"]' % cls._CONFIG_KEY)
config = resource['configuration'][cls._CONFIG_KEY]
return name, config

def begin(self, client=None):
"""API call: begin the job via a POST request

Expand Down Expand Up @@ -447,6 +472,7 @@ class LoadTableFromStorageJob(_AsyncJob):
"""

_schema = None
_CONFIG_KEY = 'load'

def __init__(self, name, destination, source_uris, client, schema=()):
super(LoadTableFromStorageJob, self).__init__(name, client)
Expand Down Expand Up @@ -611,7 +637,7 @@ def _build_resource(self):
'jobId': self.name,
},
'configuration': {
'load': {
self._CONFIG_KEY: {
'sourceUris': self.source_uris,
'destinationTable': {
'projectId': self.destination.project,
Expand All @@ -621,7 +647,7 @@ def _build_resource(self):
},
},
}
configuration = resource['configuration']['load']
configuration = resource['configuration'][self._CONFIG_KEY]
self._populate_config_resource(configuration)

if len(self.schema) > 0:
Expand All @@ -635,6 +661,30 @@ def _scrub_local_properties(self, cleaned):
schema = cleaned.pop('schema', {'fields': ()})
self.schema = _parse_schema_resource(schema)

@classmethod
def from_api_repr(cls, resource, client):
"""Factory: construct a job given its API representation

:type resource: dict
:param resource: dataset job representation returned from the API

:type client: :class:`gcloud.bigquery.client.Client`
:param client: Client which holds credentials and project
configuration for the dataset.

:rtype: :class:`gcloud.bigquery.job.LoadTableFromStorageJob`
:returns: Job parsed from ``resource``.
"""
name, config = cls._get_resource_config(resource)
dest_config = config['destinationTable']
assert dest_config['projectId'] == client.project

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

dataset = Dataset(dest_config['datasetId'], client)
destination = Table(dest_config['tableId'], dataset)
source_urls = config['sourceUris']
job = cls(name, destination, source_urls, client=client)
job._set_properties(resource)
return job


class _CopyConfiguration(object):
"""User-settable configuration options for copy jobs.
Expand All @@ -661,6 +711,9 @@ class CopyJob(_AsyncJob):
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
"""

_CONFIG_KEY = 'copy'

def __init__(self, name, destination, sources, client):
super(CopyJob, self).__init__(name, client)
self.destination = destination
Expand Down Expand Up @@ -699,7 +752,7 @@ def _build_resource(self):
'jobId': self.name,
},
'configuration': {
'copy': {
self._CONFIG_KEY: {
'sourceTables': source_refs,
'destinationTable': {
'projectId': self.destination.project,
Expand All @@ -709,11 +762,39 @@ def _build_resource(self):
},
},
}
configuration = resource['configuration']['copy']
configuration = resource['configuration'][self._CONFIG_KEY]
self._populate_config_resource(configuration)

return resource

@classmethod
def from_api_repr(cls, resource, client):
"""Factory: construct a job given its API representation

:type resource: dict
:param resource: dataset job representation returned from the API

:type client: :class:`gcloud.bigquery.client.Client`
:param client: Client which holds credentials and project
configuration for the dataset.

:rtype: :class:`gcloud.bigquery.job.CopyJob`
:returns: Job parsed from ``resource``.
"""
name, config = cls._get_resource_config(resource)
dest_config = config['destinationTable']
assert dest_config['projectId'] == client.project

This comment was marked as spam.

This comment was marked as spam.

dataset = Dataset(dest_config['datasetId'], client)
destination = Table(dest_config['tableId'], dataset)
sources = []
for source_config in config['sourceTables']:
assert source_config['projectId'] == client.project
dataset = Dataset(source_config['datasetId'], client)
sources.append(Table(source_config['tableId'], dataset))
job = cls(name, destination, sources, client=client)
job._set_properties(resource)
return job


class _ExtractConfiguration(object):
"""User-settable configuration options for extract jobs.
Expand Down Expand Up @@ -744,6 +825,8 @@ class ExtractTableToStorageJob(_AsyncJob):
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
"""
_CONFIG_KEY = 'extract'

def __init__(self, name, source, destination_uris, client):
super(ExtractTableToStorageJob, self).__init__(name, client)
self.source = source
Expand Down Expand Up @@ -796,17 +879,41 @@ def _build_resource(self):
'jobId': self.name,
},
'configuration': {
'extract': {
self._CONFIG_KEY: {
'sourceTable': source_ref,
'destinationUris': self.destination_uris,
},
},
}
configuration = resource['configuration']['extract']
configuration = resource['configuration'][self._CONFIG_KEY]
self._populate_config_resource(configuration)

return resource

@classmethod
def from_api_repr(cls, resource, client):
"""Factory: construct a job given its API representation

:type resource: dict
:param resource: dataset job representation returned from the API

:type client: :class:`gcloud.bigquery.client.Client`
:param client: Client which holds credentials and project
configuration for the dataset.

:rtype: :class:`gcloud.bigquery.job.ExtractTableToStorageJob`
:returns: Job parsed from ``resource``.
"""
name, config = cls._get_resource_config(resource)
source_config = config['sourceTable']
assert source_config['projectId'] == client.project
dataset = Dataset(source_config['datasetId'], client)
source = Table(source_config['tableId'], dataset)
destination_uris = config['destinationUris']
job = cls(name, source, destination_uris, client=client)
job._set_properties(resource)
return job


class _AsyncQueryConfiguration(object):
"""User-settable configuration options for asynchronous query jobs.
Expand All @@ -816,7 +923,7 @@ class _AsyncQueryConfiguration(object):
_allow_large_results = None
_create_disposition = None
_default_dataset = None
_destination_table = None
_destination = None
_flatten_results = None
_priority = None
_use_query_cache = None
Expand All @@ -836,6 +943,8 @@ class RunAsyncQueryJob(_AsyncJob):
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
"""
_CONFIG_KEY = 'query'

def __init__(self, name, query, client):
super(RunAsyncQueryJob, self).__init__(name, client)
self.query = query
Expand All @@ -856,7 +965,7 @@ def __init__(self, name, query, client):
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.defaultDataset
"""

destination_table = _TypedProperty('destination_table', Table)
destination = _TypedProperty('destination', Table)
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.destinationTable
"""
Expand All @@ -882,11 +991,11 @@ def __init__(self, name, query, client):
"""

def _destination_table_resource(self):
if self.destination_table is not None:
if self.destination is not None:
return {
'projectId': self.destination_table.project,
'datasetId': self.destination_table.dataset_name,
'tableId': self.destination_table.name,
'projectId': self.destination.project,
'datasetId': self.destination.dataset_name,
'tableId': self.destination.name,
}

def _populate_config_resource(self, configuration):
Expand All @@ -900,7 +1009,7 @@ def _populate_config_resource(self, configuration):
'projectId': self.default_dataset.project,
'datasetId': self.default_dataset.name,
}
if self.destination_table is not None:
if self.destination is not None:
table_res = self._destination_table_resource()
configuration['destinationTable'] = table_res
if self.flatten_results is not None:
Expand All @@ -921,12 +1030,12 @@ def _build_resource(self):
'jobId': self.name,
},
'configuration': {
'query': {
self._CONFIG_KEY: {
'query': self.query,
},
},
}
configuration = resource['configuration']['query']
configuration = resource['configuration'][self._CONFIG_KEY]
self._populate_config_resource(configuration)

return resource
Expand All @@ -937,14 +1046,34 @@ def _scrub_local_properties(self, cleaned):
dest_remote = configuration.get('destinationTable')

if dest_remote is None:
if self.destination_table is not None:
del self.destination_table
if self.destination is not None:
del self.destination
else:
dest_local = self._destination_table_resource()
if dest_remote != dest_local:
assert dest_remote['projectId'] == self.project
dataset = self._client.dataset(dest_remote['datasetId'])
self.destination_table = dataset.table(dest_remote['tableId'])
self.destination = dataset.table(dest_remote['tableId'])

@classmethod
def from_api_repr(cls, resource, client):
"""Factory: construct a job given its API representation

:type resource: dict
:param resource: dataset job representation returned from the API

:type client: :class:`gcloud.bigquery.client.Client`
:param client: Client which holds credentials and project
configuration for the dataset.

:rtype: :class:`gcloud.bigquery.job.RunAsyncQueryJob`
:returns: Job parsed from ``resource``.
"""
name, config = cls._get_resource_config(resource)
query = config['query']
job = cls(name, query, client=client)
job._set_properties(resource)
return job


class _SyncQueryConfiguration(object):
Expand Down
Loading