Skip to content

Commit

Permalink
refork and fix of #2015
Browse files Browse the repository at this point in the history
  • Loading branch information
dwmclary committed Jul 27, 2016
1 parent 565c992 commit c8ba881
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 6 deletions.
71 changes: 70 additions & 1 deletion gcloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,42 @@
from gcloud.bigquery._helpers import _TypedProperty


class UDFResource(object):
"""Describe a single user-defined function (UDF) resource.
:type udf_type: str
:param udf_type: the type of the resource ('inlineCode' or 'resourceUri')
:type value: str
:param value: the inline code or resource URI
See
https://cloud.google.com/bigquery/user-defined-functions#api
"""
def __init__(self, udf_type, value):
self.udf_type = udf_type
self.value = value

def __eq__(self, other):
return(
self.udf_type == other.udf_type and
self.value == other.value)


def _build_udf_resources(resources):
"""
:type resources: sequence of :class:`UDFResource`
:param resources: fields to be appended
:rtype: mapping
:returns: a mapping describing userDefinedFunctionResources for the query.
"""
udfs = []
for resource in resources:
udf = {resource.udf_type: resource.value}
udfs.append(udf)
return udfs


class Compression(_EnumProperty):
"""Pseudo-enum for ``compression`` properties."""
GZIP = 'GZIP'
Expand Down Expand Up @@ -888,14 +924,44 @@ class QueryJob(_AsyncJob):
:type client: :class:`gcloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
:type udf_resources: tuple
:param udf_resources: An iterable of
:class:`gcloud.bigquery.job.UDFResource`
(empty by default)
"""
_JOB_TYPE = 'query'
_UDF_KEY = 'userDefinedFunctionResources'
_udf_resources = None

def __init__(self, name, query, client):
def __init__(self, name, query, client, udf_resources=()):
super(QueryJob, self).__init__(name, client)
self.query = query
self.udf_resources = udf_resources
self._configuration = _AsyncQueryConfiguration()

@property
def udf_resources(self):
"""Property for list of UDF resources attached to a query
See
https://cloud.google.com/bigquery/user-defined-functions#api
"""
return list(self._udf_resources)

@udf_resources.setter
def udf_resources(self, value):
"""Update queries UDF resources
:type value: list of :class:`UDFResource`
:param value: an object which defines the type and value of a resource
:raises: TypeError if 'value' is not a sequence, or ValueError if
any item in the sequence is not a UDFResource
"""
if not all(isinstance(u, UDFResource) for u in value):
raise ValueError("udf items must be UDFResource")
self._udf_resources = tuple(value)

allow_large_results = _TypedProperty('allow_large_results', bool)
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.allowLargeResults
Expand Down Expand Up @@ -979,6 +1045,9 @@ def _populate_config_resource(self, configuration):
configuration['useLegacySql'] = self.use_legacy_sql
if self.write_disposition is not None:
configuration['writeDisposition'] = self.write_disposition
if len(self._udf_resources) > 0:
configuration[self._UDF_KEY] = _build_udf_resources(
self._udf_resources)

def _build_resource(self):
"""Generate a resource for :meth:`begin`."""
Expand Down
39 changes: 38 additions & 1 deletion gcloud/bigquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from gcloud.bigquery._helpers import _rows_from_json
from gcloud.bigquery.dataset import Dataset
from gcloud.bigquery.job import QueryJob
from gcloud.bigquery.job import UDFResource
from gcloud.bigquery.job import _build_udf_resources
from gcloud.bigquery.table import _parse_schema_resource


Expand All @@ -46,12 +48,22 @@ class QueryResults(object):
:type client: :class:`gcloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
:type udf_resources: tuple
:param udf_resources: An iterable of
:class:`gcloud.bigquery.job.UDFResource`
(empty by default)
"""
def __init__(self, query, client):

_UDF_KEY = 'userDefinedFunctionResources'
_udf_resources = None

def __init__(self, query, client, udf_resources=()):
self._client = client
self._properties = {}
self.query = query
self._configuration = _SyncQueryConfiguration()
self.udf_resources = udf_resources
self._job = None

@property
Expand Down Expand Up @@ -204,6 +216,28 @@ def schema(self):
"""
return _parse_schema_resource(self._properties.get('schema', {}))

@property
def udf_resources(self):
"""Property for list of UDF resources attached to a query
See
https://cloud.google.com/bigquery/user-defined-functions#api
"""
return list(self._udf_resources)

@udf_resources.setter
def udf_resources(self, value):
"""Update queries UDF resources
:type value: list of :class:`UDFResource`
:param value: an object which defines the type and value of a resource
:raises: TypeError if 'value' is not a sequence, or ValueError if
any item in the sequence is not a UDFResource
"""
if not all(isinstance(udf, UDFResource) for udf in value):
raise ValueError("udf items must be UDFResource")
self._udf_resources = tuple(value)

default_dataset = _TypedProperty('default_dataset', Dataset)
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#defaultDataset
Expand Down Expand Up @@ -277,6 +311,9 @@ def _build_resource(self):
if self.dry_run is not None:
resource['dryRun'] = self.dry_run

if len(self._udf_resources) > 0:
resource[self._UDF_KEY] = _build_udf_resources(self._udf_resources)

return resource

def run(self, client=None):
Expand Down
60 changes: 58 additions & 2 deletions gcloud/bigquery/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,7 @@ def test_begin_w_bound_client(self):
job = self._makeOne(self.JOB_NAME, self.QUERY, client)

job.begin()

self.assertEqual(job.udf_resources, [])
self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
Expand All @@ -1396,7 +1396,7 @@ def test_begin_w_bound_client(self):
},
'configuration': {
'query': {
'query': self.QUERY,
'query': self.QUERY
},
},
}
Expand Down Expand Up @@ -1468,6 +1468,62 @@ def test_begin_w_alternate_client(self):
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(job, RESOURCE)

def test_begin_w_bound_client_and_udf(self):
from gcloud.bigquery.job import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
PATH = 'projects/%s/jobs' % self.PROJECT
RESOURCE = self._makeResource()
# Ensure None for missing server-set props
del RESOURCE['statistics']['creationTime']
del RESOURCE['etag']
del RESOURCE['selfLink']
del RESOURCE['user_email']
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
job = self._makeOne(self.JOB_NAME, self.QUERY, client,
udf_resources=[
UDFResource("resourceUri", RESOURCE_URI)
])

job.begin()

self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
self.assertEqual(req['path'], '/%s' % PATH)
self.assertEqual(job.udf_resources,
[UDFResource("resourceUri", RESOURCE_URI)])
SENT = {
'jobReference': {
'projectId': self.PROJECT,
'jobId': self.JOB_NAME,
},
'configuration': {
'query': {
'query': self.QUERY,
'userDefinedFunctionResources':
[{'resourceUri': RESOURCE_URI}]
},
},
}
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(job, RESOURCE)

def test_begin_w_bad_udf(self):
RESOURCE = self._makeResource()
# Ensure None for missing server-set props
del RESOURCE['statistics']['creationTime']
del RESOURCE['etag']
del RESOURCE['selfLink']
del RESOURCE['user_email']
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
job = self._makeOne(self.JOB_NAME, self.QUERY, client)

with self.assertRaises(ValueError):
job.udf_resources = ["foo"]
self.assertEqual(job.udf_resources, [])

def test_exists_miss_w_bound_client(self):
PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME)
conn = _Connection()
Expand Down
84 changes: 83 additions & 1 deletion gcloud/bigquery/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def test_run_w_bound_client(self):
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
query = self._makeOne(self.QUERY, client)

self.assertEqual(query.udf_resources, [])
query.run()

self.assertEqual(len(conn._requested), 1)
Expand Down Expand Up @@ -233,6 +233,88 @@ def test_run_w_alternate_client(self):
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(query, RESOURCE)

def test_run_w_inline_udf(self):
from gcloud.bigquery.query import UDFResource
INLINE_UDF_CODE = 'var someCode = "here";'
PATH = 'projects/%s/queries' % self.PROJECT
RESOURCE = self._makeResource(complete=False)
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
query = self._makeOne(self.QUERY, client)
query.udf_resources = [UDFResource("inlineCode", INLINE_UDF_CODE)]

query.run()

self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
self.assertEqual(req['path'], '/%s' % PATH)
SENT = {'query': self.QUERY,
'userDefinedFunctionResources':
[{'inlineCode': INLINE_UDF_CODE}]}
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(query, RESOURCE)

def test_run_w_udf_resource_uri(self):
from gcloud.bigquery.job import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
PATH = 'projects/%s/queries' % self.PROJECT
RESOURCE = self._makeResource(complete=False)
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
query = self._makeOne(self.QUERY, client)
query.udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]

query.run()

self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
self.assertEqual(req['path'], '/%s' % PATH)
SENT = {'query': self.QUERY,
'userDefinedFunctionResources':
[{'resourceUri': RESOURCE_URI}]}
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(query, RESOURCE)

def test_run_w_mixed_udfs(self):
from gcloud.bigquery.job import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
INLINE_UDF_CODE = 'var someCode = "here";'
PATH = 'projects/%s/queries' % self.PROJECT
RESOURCE = self._makeResource(complete=False)
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
query = self._makeOne(self.QUERY, client)
query.udf_resources = [UDFResource("resourceUri", RESOURCE_URI),
UDFResource("inlineCode", INLINE_UDF_CODE)]

query.run()

self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
self.assertEqual(req['path'], '/%s' % PATH)
self.assertEqual(query.udf_resources,
[UDFResource("resourceUri", RESOURCE_URI),
UDFResource("inlineCode", INLINE_UDF_CODE)])
SENT = {'query': self.QUERY,
'userDefinedFunctionResources': [
{'resourceUri': RESOURCE_URI},
{"inlineCode": INLINE_UDF_CODE}]}
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(query, RESOURCE)

def test_run_w_bad_udfs(self):
RESOURCE = self._makeResource(complete=False)
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
query = self._makeOne(self.QUERY, client)

with self.assertRaises(ValueError):
query.udf_resources = ["foo"]
self.assertEqual(query.udf_resources, [])

def test_fetch_data_query_not_yet_run(self):
conn = _Connection()
client = _Client(project=self.PROJECT, connection=conn)
Expand Down
3 changes: 2 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tox]
envlist =
py27,py34,py35,cover,docs,lint
py26,py27,py34,py35,cover,docs,lint

[testing]
deps =
Expand Down Expand Up @@ -43,6 +43,7 @@ deps =
basepython =
python2.6
deps =
ordereddict
{[testing]deps}
setenv =
PYTHONPATH = {toxinidir}/_testing
Expand Down

0 comments on commit c8ba881

Please sign in to comment.