Skip to content

Commit

Permalink
Populate the jobReference from the API response. (#6044)
Browse files Browse the repository at this point in the history
* Populate the jobReference from the API response.

Before this change, the location field was not populated after
inititial construction. This is a problem when no location is
provided but it has been auto-detected based on the resources
referenced in the job (such as via the tables referenced in a
query).

* Remove reference to _job_ref in system tests.

* Fix lint error
  • Loading branch information
tswast authored Sep 21, 2018
1 parent 8446806 commit 44347db
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 18 deletions.
37 changes: 25 additions & 12 deletions google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,19 +284,26 @@ class _AsyncJob(google.api_core.future.polling.PollingFuture):
"""
def __init__(self, job_id, client):
super(_AsyncJob, self).__init__()

# The job reference can be either a plain job ID or the full resource.
# Populate the properties dictionary consistently depending on what has
# been passed in.
job_ref = job_id
if not isinstance(job_id, _JobReference):
job_ref = _JobReference(job_id, client.project, None)
self._job_ref = job_ref
self._properties = {
'jobReference': job_ref._to_api_repr(),
}

self._client = client
self._properties = {}
self._result_set = False
self._completion_lock = threading.Lock()

@property
def job_id(self):
"""str: ID of the job."""
return self._job_ref.job_id
return _helpers._get_sub_prop(
self._properties, ['jobReference', 'jobId'])

@property
def project(self):
Expand All @@ -305,12 +312,14 @@ def project(self):
:rtype: str
:returns: the project (derived from the client).
"""
return self._job_ref.project
return _helpers._get_sub_prop(
self._properties, ['jobReference', 'projectId'])

@property
def location(self):
"""str: Location where the job runs."""
return self._job_ref.location
return _helpers._get_sub_prop(
self._properties, ['jobReference', 'location'])

def _require_client(self, client):
"""Check client or verify over-ride.
Expand Down Expand Up @@ -481,7 +490,7 @@ def _set_properties(self, api_response):

self._properties.clear()
self._properties.update(cleaned)
self._copy_configuration_properties(cleaned['configuration'])
self._copy_configuration_properties(cleaned.get('configuration', {}))

# For Future interface
self._set_future_result()
Expand Down Expand Up @@ -1337,7 +1346,7 @@ def _build_resource(self):
self.destination.to_api_repr())

return {
'jobReference': self._job_ref._to_api_repr(),
'jobReference': self._properties['jobReference'],
'configuration': configuration,
}

Expand Down Expand Up @@ -1523,7 +1532,7 @@ def _build_resource(self):
})

return {
'jobReference': self._job_ref._to_api_repr(),
'jobReference': self._properties['jobReference'],
'configuration': configuration,
}

Expand Down Expand Up @@ -1737,7 +1746,7 @@ def _build_resource(self):
self.destination_uris)

return {
'jobReference': self._job_ref._to_api_repr(),
'jobReference': self._properties['jobReference'],
'configuration': configuration,
}

Expand Down Expand Up @@ -2330,7 +2339,7 @@ def _build_resource(self):
configuration = self._configuration.to_api_repr()

resource = {
'jobReference': self._job_ref._to_api_repr(),
'jobReference': self._properties['jobReference'],
'configuration': configuration,
}
configuration['query']['query'] = self.query
Expand Down Expand Up @@ -3020,8 +3029,12 @@ def from_api_repr(cls, resource, client):
Returns:
UnknownJob: Job corresponding to the resource.
"""
job_ref = _JobReference._from_api_repr(
resource.get('jobReference', {'projectId': client.project}))
job_ref_properties = resource.get(
'jobReference', {'projectId': client.project})
job_ref = _JobReference._from_api_repr(job_ref_properties)
job = cls(job_ref, client)
# Populate the job reference with the project, even if it has been
# redacted, because we know it should equal that of the request.
resource['jobReference'] = job_ref_properties
job._properties = resource
return job
2 changes: 1 addition & 1 deletion tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ def test_load_table_from_file_w_explicit_location(self):
client.get_job(job_id, location='US')

load_job_us = client.get_job(job_id)
load_job_us._job_ref._properties['location'] = 'US'
load_job_us._properties['jobReference']['location'] = 'US'
self.assertFalse(load_job_us.exists())
with self.assertRaises(NotFound):
load_job_us.reload()
Expand Down
33 changes: 33 additions & 0 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2740,6 +2740,39 @@ def test_query_w_client_location(self):
data=resource,
)

def test_query_detect_location(self):
query = 'select count(*) from persons'
resource_location = 'EU'
resource = {
'jobReference': {
'projectId': self.PROJECT,
# Location not set in request, but present in the response.
'location': resource_location,
'jobId': 'some-random-id',
},
'configuration': {
'query': {
'query': query,
'useLegacySql': False,
},
},
}
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds,
_http=http)
conn = client._connection = _make_connection(resource)

job = client.query(query)

self.assertEqual(job.location, resource_location)

# Check that request did not contain a location.
conn.api_request.assert_called_once()
_, req = conn.api_request.call_args
sent = req['data']
self.assertIsNone(sent['jobReference'].get('location'))

def test_query_w_udf_resources(self):
from google.cloud.bigquery.job import QueryJob
from google.cloud.bigquery.job import QueryJobConfig
Expand Down
28 changes: 23 additions & 5 deletions tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,15 @@ def test_ctor_w_bare_job_id(self):
self.assertEqual(job.project, self.PROJECT)
self.assertIsNone(job.location)
self.assertIs(job._client, client)
self.assertEqual(job._properties, {})
self.assertEqual(
job._properties,
{
'jobReference': {
'projectId': self.PROJECT,
'jobId': self.JOB_ID,
},
}
)
self.assertIsInstance(job._completion_lock, type(threading.Lock()))
self.assertEqual(
job.path,
Expand All @@ -174,7 +182,16 @@ def test_ctor_w_job_ref(self):
self.assertEqual(job.project, self.PROJECT)
self.assertEqual(job.location, self.LOCATION)
self.assertIs(job._client, client)
self.assertEqual(job._properties, {})
self.assertEqual(
job._properties,
{
'jobReference': {
'projectId': self.PROJECT,
'location': self.LOCATION,
'jobId': self.JOB_ID,
},
}
)
self.assertFalse(job._result_set)
self.assertIsInstance(job._completion_lock, type(threading.Lock()))
self.assertEqual(
Expand Down Expand Up @@ -359,6 +376,7 @@ def _set_properties_job(self):
job._copy_configuration_properties = mock.Mock()
job._set_future_result = mock.Mock()
job._properties = {
'jobReference': job._properties['jobReference'],
'foo': 'bar',
}
return job
Expand Down Expand Up @@ -577,7 +595,7 @@ def test_exists_defaults_miss(self):
from google.cloud.bigquery.retry import DEFAULT_RETRY

job = self._set_properties_job()
job._job_ref._properties['location'] = self.LOCATION
job._properties['jobReference']['location'] = self.LOCATION
call_api = job._client._call_api = mock.Mock()
call_api.side_effect = NotFound('testing')

Expand Down Expand Up @@ -636,7 +654,7 @@ def test_reload_defaults(self):
}
}
job = self._set_properties_job()
job._job_ref._properties['location'] = self.LOCATION
job._properties['jobReference']['location'] = self.LOCATION
call_api = job._client._call_api = mock.Mock()
call_api.return_value = resource

Expand Down Expand Up @@ -693,7 +711,7 @@ def test_cancel_defaults(self):
}
response = {'job': resource}
job = self._set_properties_job()
job._job_ref._properties['location'] = self.LOCATION
job._properties['jobReference']['location'] = self.LOCATION
connection = job._client._connection = _make_connection(response)

self.assertTrue(job.cancel())
Expand Down

0 comments on commit 44347db

Please sign in to comment.