Skip to content

Add BigQuery UDF support #2007

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions gcloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,9 +873,18 @@ class _AsyncQueryConfiguration(object):
_priority = None
_use_query_cache = None
_use_legacy_sql = None
_udf_resources = None
_write_disposition = None


class UdfResource(object):
"""UDF resource for QueryJob."""

def __init__(self, uri=None, code=None):
self.uri = uri
self.code = code


class QueryJob(_AsyncJob):
"""Asynchronous job: query tables.

Expand Down Expand Up @@ -937,11 +946,39 @@ def __init__(self, name, query, client):
reference/v2/jobs#configuration.query.useLegacySql
"""

_udf_resources = None
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.userDefinedFunctionResources
"""

write_disposition = WriteDisposition('write_disposition')
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.writeDisposition
"""

@property
def udf_resources(self):
"""List of user-defined-function resources.

:type: list of :class:`UdfResource`
:returns: the list, if set, or None
"""
return list(self._udf_resources) if self._udf_resources else None

@udf_resources.setter
def udf_resources(self, value):
"""Update list of user-defined-function resources.

:type value: list of :class:`UdfResource`
:param value:the new list

:raises: ValueError, if the items in the list are not all instance of
:class:`UdfResource`
"""
if not all((isinstance(item, UdfResource) for item in value)):
raise ValueError("pass a list of UdfResource instances")
self._udf_resources = tuple(value)

def _destination_table_resource(self):
"""Create a JSON resource for the destination table.

Expand Down Expand Up @@ -977,6 +1014,17 @@ def _populate_config_resource(self, configuration):
configuration['useQueryCache'] = self.use_query_cache
if self.use_legacy_sql is not None:
configuration['useLegacySql'] = self.use_legacy_sql
if self.udf_resources is not None:
udf_list = []
for udf_resource in self.udf_resources:
udf = {
'resourceUri': udf_resource.uri,
'inlineCode': udf_resource.code
}
udf_list.append(udf)
configuration['userDefinedFunctionResources'] = (
udf_list
)
if self.write_disposition is not None:
configuration['writeDisposition'] = self.write_disposition

Expand Down
32 changes: 32 additions & 0 deletions gcloud/bigquery/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import unittest2
from gcloud.bigquery.job import UdfResource


class _Base(object):
Expand Down Expand Up @@ -1219,6 +1220,8 @@ class TestQueryJob(unittest2.TestCase, _Base):
JOB_TYPE = 'query'
QUERY = 'select count(*) from persons'
DESTINATION_TABLE = 'destination_table'
UDF1 = UdfResource(uri="gs://backet/functions.js")
UDF2 = UdfResource(code="function foo(row, emit) {...")

def _getTargetClass(self):
from gcloud.bigquery.job import QueryJob
Expand Down Expand Up @@ -1289,6 +1292,18 @@ def _verifyResourceProperties(self, job, resource):
config['priority'])
else:
self.assertTrue(job.priority is None)
if 'userDefinedFunctionResources' in config:
config_udf_list = config['userDefinedFunctionResources']
udf_list = job.udf_resources
self.assertEquals(len(config_udf_list), len(udf_list))
for config_udf, udf in zip(config_udf_list, udf_list):
udf_ref = {
'inlineCode': udf.code,
'resourceUri': udf.uri
}
self.assertEqual(udf_ref, config_udf)
else:
self.assertTrue(job.udf_resources is None)
if 'writeDisposition' in config:
self.assertEqual(job.write_disposition,
config['writeDisposition'])
Expand Down Expand Up @@ -1371,6 +1386,16 @@ def test_from_api_repr_w_properties(self):
self.assertTrue(dataset._client is client)
self._verifyResourceProperties(dataset, RESOURCE)

def test_set_udf_resource(self):
RESOURCE = self._makeResource()
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
job = self._makeOne(self.JOB_NAME, self.QUERY, client)
job.udf_resources = [self.UDF1]
job.udf_resources = [self.UDF1, self.UDF2]
with self.assertRaises(ValueError):
job.udf_resources = [self.UDF1, 'error']

def test_begin_w_bound_client(self):
PATH = 'projects/%s/jobs' % self.PROJECT
RESOURCE = self._makeResource()
Expand Down Expand Up @@ -1427,6 +1452,12 @@ def test_begin_w_alternate_client(self):
'priority': 'INTERACTIVE',
'useQueryCache': True,
'useLegacySql': True,
'userDefinedFunctionResources': [
{
"resourceUri": self.UDF1.uri,
"inlineCode": self.UDF1.code
}
],
'writeDisposition': 'WRITE_TRUNCATE',
}
RESOURCE['configuration']['query'] = QUERY_CONFIGURATION
Expand All @@ -1447,6 +1478,7 @@ def test_begin_w_alternate_client(self):
job.priority = 'INTERACTIVE'
job.use_query_cache = True
job.use_legacy_sql = True
job.udf_resources = [self.UDF1]
job.write_disposition = 'WRITE_TRUNCATE'

job.begin(client=client2)
Expand Down