diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 5c1118500c..491983f8eb 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -19,6 +19,7 @@ import re from google.api_core import exceptions +from google.api_core.future import polling as polling_future import requests from google.cloud.bigquery.dataset import Dataset @@ -42,7 +43,6 @@ from google.cloud.bigquery._tqdm_helpers import wait_for_query from google.cloud.bigquery.job.base import _AsyncJob -from google.cloud.bigquery.job.base import _DONE_STATE from google.cloud.bigquery.job.base import _JobConfig from google.cloud.bigquery.job.base import _JobReference @@ -974,61 +974,6 @@ def estimated_bytes_processed(self): result = int(result) return result - def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True): - """Refresh the job and checks if it is complete. - - Args: - retry (Optional[google.api_core.retry.Retry]): - How to retry the call that retrieves query results. If the job state is - ``DONE``, retrying is aborted early, as the job will not change anymore. - timeout (Optional[float]): - The number of seconds to wait for the underlying HTTP transport - before using ``retry``. - reload (Optional[bool]): - If ``True``, make an API call to refresh the job state of - unfinished jobs before checking. Default ``True``. - - Returns: - bool: ``True`` if the job is complete or if fetching its status resulted in - an error, ``False`` otherwise. - """ - # Do not refresh if the state is already done, as the job will not - # change once complete. - is_done = self.state == _DONE_STATE - if not reload or is_done: - return is_done - - # If an explicit timeout is not given, fall back to the transport timeout - # stored in _blocking_poll() in the process of polling for job completion. - transport_timeout = timeout if timeout is not None else self._transport_timeout - - try: - self._reload_query_results(retry=retry, timeout=transport_timeout) - except exceptions.GoogleAPIError as exc: - # Reloading also updates error details on self, thus no need for an - # explicit self.set_exception() call if reloading succeeds. - try: - self.reload(retry=retry, timeout=transport_timeout) - except exceptions.GoogleAPIError: - # Use the query results reload exception, as it generally contains - # much more useful error information. - self.set_exception(exc) - return True - else: - return self.state == _DONE_STATE - - # Only reload the job once we know the query is complete. - # This will ensure that fields such as the destination table are - # correctly populated. - if self._query_results.complete: - try: - self.reload(retry=retry, timeout=transport_timeout) - except exceptions.GoogleAPIError as exc: - self.set_exception(exc) - return True - - return self.state == _DONE_STATE - def _blocking_poll(self, timeout=None, **kwargs): self._done_timeout = timeout self._transport_timeout = timeout @@ -1130,6 +1075,40 @@ def _reload_query_results(self, retry=DEFAULT_RETRY, timeout=None): timeout=transport_timeout, ) + def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None): + """Check if the query has finished running and raise if it's not. + + If the query has finished, also reload the job itself. + """ + # If an explicit timeout is not given, fall back to the transport timeout + # stored in _blocking_poll() in the process of polling for job completion. + transport_timeout = timeout if timeout is not None else self._transport_timeout + + try: + self._reload_query_results(retry=retry, timeout=transport_timeout) + except exceptions.GoogleAPIError as exc: + # Reloading also updates error details on self, thus no need for an + # explicit self.set_exception() call if reloading succeeds. + try: + self.reload(retry=retry, timeout=transport_timeout) + except exceptions.GoogleAPIError: + # Use the query results reload exception, as it generally contains + # much more useful error information. + self.set_exception(exc) + finally: + return + + # Only reload the job once we know the query is complete. + # This will ensure that fields such as the destination table are + # correctly populated. + if not self._query_results.complete: + raise polling_future._OperationNotComplete() + else: + try: + self.reload(retry=retry, timeout=transport_timeout) + except exceptions.GoogleAPIError as exc: + self.set_exception(exc) + def result( self, page_size=None, diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index 655a121e65..02bc67b7d3 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -309,132 +309,6 @@ def test_cancelled(self): self.assertTrue(job.cancelled()) - def test_done_job_complete(self): - client = _make_client(project=self.PROJECT) - resource = self._make_resource(ended=True) - job = self._get_target_class().from_api_repr(resource, client) - job._query_results = google.cloud.bigquery.query._QueryResults.from_api_repr( - {"jobComplete": True, "jobReference": resource["jobReference"]} - ) - self.assertTrue(job.done()) - - def test_done_w_timeout(self): - client = _make_client(project=self.PROJECT) - resource = self._make_resource(ended=False) - job = self._get_target_class().from_api_repr(resource, client) - - with mock.patch.object( - client, "_get_query_results" - ) as fake_get_results, mock.patch.object(job, "reload") as fake_reload: - job.done(timeout=42) - - fake_get_results.assert_called_once() - call_args = fake_get_results.call_args - self.assertEqual(call_args.kwargs.get("timeout"), 42) - - call_args = fake_reload.call_args - self.assertEqual(call_args.kwargs.get("timeout"), 42) - - def test_done_w_timeout_and_longer_internal_api_timeout(self): - client = _make_client(project=self.PROJECT) - resource = self._make_resource(ended=False) - job = self._get_target_class().from_api_repr(resource, client) - job._done_timeout = 8.8 - - with mock.patch.object( - client, "_get_query_results" - ) as fake_get_results, mock.patch.object(job, "reload") as fake_reload: - job.done(timeout=5.5) - - # The expected timeout used is simply the given timeout, as the latter - # is shorter than the job's internal done timeout. - expected_timeout = 5.5 - - fake_get_results.assert_called_once() - call_args = fake_get_results.call_args - self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout) - - call_args = fake_reload.call_args - self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout) - - def test_done_w_query_results_error_reload_ok_job_finished(self): - client = _make_client(project=self.PROJECT) - bad_request_error = exceptions.BadRequest("Error in query") - client._get_query_results = mock.Mock(side_effect=bad_request_error) - - resource = self._make_resource(ended=False) - job = self._get_target_class().from_api_repr(resource, client) - job._exception = None - - def fake_reload(self, *args, **kwargs): - self._properties["status"]["state"] = "DONE" - self.set_exception(copy.copy(bad_request_error)) - - fake_reload_method = types.MethodType(fake_reload, job) - - with mock.patch.object(job, "reload", new=fake_reload_method): - is_done = job.done() - - assert is_done - assert isinstance(job._exception, exceptions.BadRequest) - - def test_done_w_query_results_error_reload_ok_job_still_running(self): - client = _make_client(project=self.PROJECT) - retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError) - client._get_query_results = mock.Mock(side_effect=retry_error) - - resource = self._make_resource(ended=False) - job = self._get_target_class().from_api_repr(resource, client) - job._exception = None - - def fake_reload(self, *args, **kwargs): - self._properties["status"]["state"] = "RUNNING" - - fake_reload_method = types.MethodType(fake_reload, job) - - with mock.patch.object(job, "reload", new=fake_reload_method): - is_done = job.done() - - assert not is_done - assert job._exception is None - - def test_done_w_query_results_error_reload_error(self): - client = _make_client(project=self.PROJECT) - bad_request_error = exceptions.BadRequest("Error in query") - client._get_query_results = mock.Mock(side_effect=bad_request_error) - - resource = self._make_resource(ended=False) - job = self._get_target_class().from_api_repr(resource, client) - reload_error = exceptions.DataLoss("Oops, sorry!") - job.reload = mock.Mock(side_effect=reload_error) - job._exception = None - - is_done = job.done() - - assert is_done - assert job._exception is bad_request_error - - def test_done_w_job_query_results_ok_reload_error(self): - client = _make_client(project=self.PROJECT) - query_results = google.cloud.bigquery.query._QueryResults( - properties={ - "jobComplete": True, - "jobReference": {"projectId": self.PROJECT, "jobId": "12345"}, - } - ) - client._get_query_results = mock.Mock(return_value=query_results) - - resource = self._make_resource(ended=False) - job = self._get_target_class().from_api_repr(resource, client) - retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError) - job.reload = mock.Mock(side_effect=retry_error) - job._exception = None - - is_done = job.done() - - assert is_done - assert job._exception is retry_error - def test_query_plan(self): from google.cloud._helpers import _RFC3339_MICROS from google.cloud.bigquery.job import QueryPlanEntry @@ -1905,8 +1779,6 @@ def test_reload_w_timeout(self): ) def test_iter(self): - import types - begun_resource = self._make_resource() query_resource = { "jobComplete": True, @@ -1921,3 +1793,97 @@ def test_iter(self): job = self._make_one(self.JOB_ID, self.QUERY, client) self.assertIsInstance(iter(job), types.GeneratorType) + + def test__done_or_raise_w_timeout(self): + client = _make_client(project=self.PROJECT) + resource = self._make_resource(ended=False) + job = self._get_target_class().from_api_repr(resource, client) + + with mock.patch.object( + client, "_get_query_results" + ) as fake_get_results, mock.patch.object(job, "reload") as fake_reload: + job._done_or_raise(timeout=42) + + fake_get_results.assert_called_once() + call_args = fake_get_results.call_args + self.assertEqual(call_args.kwargs.get("timeout"), 42) + + call_args = fake_reload.call_args + self.assertEqual(call_args.kwargs.get("timeout"), 42) + + def test__done_or_raise_w_timeout_and_longer_internal_api_timeout(self): + client = _make_client(project=self.PROJECT) + resource = self._make_resource(ended=False) + job = self._get_target_class().from_api_repr(resource, client) + job._done_timeout = 8.8 + + with mock.patch.object( + client, "_get_query_results" + ) as fake_get_results, mock.patch.object(job, "reload") as fake_reload: + job._done_or_raise(timeout=5.5) + + # The expected timeout used is simply the given timeout, as the latter + # is shorter than the job's internal done timeout. + expected_timeout = 5.5 + + fake_get_results.assert_called_once() + call_args = fake_get_results.call_args + self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout) + + call_args = fake_reload.call_args + self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout) + + def test__done_or_raise_w_query_results_error_reload_ok(self): + client = _make_client(project=self.PROJECT) + bad_request_error = exceptions.BadRequest("Error in query") + client._get_query_results = mock.Mock(side_effect=bad_request_error) + + resource = self._make_resource(ended=False) + job = self._get_target_class().from_api_repr(resource, client) + job._exception = None + + def fake_reload(self, *args, **kwargs): + self._properties["status"]["state"] = "DONE" + self.set_exception(copy.copy(bad_request_error)) + + fake_reload_method = types.MethodType(fake_reload, job) + + with mock.patch.object(job, "reload", new=fake_reload_method): + job._done_or_raise() + + assert isinstance(job._exception, exceptions.BadRequest) + + def test__done_or_raise_w_query_results_error_reload_error(self): + client = _make_client(project=self.PROJECT) + bad_request_error = exceptions.BadRequest("Error in query") + client._get_query_results = mock.Mock(side_effect=bad_request_error) + + resource = self._make_resource(ended=False) + job = self._get_target_class().from_api_repr(resource, client) + reload_error = exceptions.DataLoss("Oops, sorry!") + job.reload = mock.Mock(side_effect=reload_error) + job._exception = None + + job._done_or_raise() + + assert job._exception is bad_request_error + + def test__done_or_raise_w_job_query_results_ok_reload_error(self): + client = _make_client(project=self.PROJECT) + query_results = google.cloud.bigquery.query._QueryResults( + properties={ + "jobComplete": True, + "jobReference": {"projectId": self.PROJECT, "jobId": "12345"}, + } + ) + client._get_query_results = mock.Mock(return_value=query_results) + + resource = self._make_resource(ended=False) + job = self._get_target_class().from_api_repr(resource, client) + retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError) + job.reload = mock.Mock(side_effect=retry_error) + job._exception = None + + job._done_or_raise() + + assert job._exception is retry_error