Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: accepts a table ID, which downloads the table without a query #443

Merged
merged 22 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ omit =
google/cloud/__init__.py

[report]
fail_under = 88
fail_under = 89
show_missing = True
exclude_lines =
# Re-enable the standard pragma
Expand Down
2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def cover(session):
test runs (not system test runs), and then erases coverage data.
"""
session.install("coverage", "pytest-cov")
session.run("coverage", "report", "--show-missing", "--fail-under=88")
session.run("coverage", "report", "--show-missing", "--fail-under=89")
plamut marked this conversation as resolved.
Show resolved Hide resolved

session.run("coverage", "erase")

Expand Down
2 changes: 1 addition & 1 deletion owlbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
templated_files = common.py_library(
unit_test_python_versions=["3.7", "3.8", "3.9", "3.10"],
system_test_python_versions=["3.7", "3.8", "3.9", "3.10"],
cov_level=88,
cov_level=89,
unit_test_extras=extras,
system_test_extras=extras,
intersphinx_dependencies={
Expand Down
148 changes: 96 additions & 52 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@
# license that can be found in the LICENSE file.

import logging
import re
import time
import warnings
from datetime import datetime
import typing
from typing import Any, Dict, Optional, Union

import numpy as np

# Only import at module-level at type checking time to avoid circular
# dependencies in the pandas package, which has an optional dependency on
# pandas-gbq.
if typing.TYPE_CHECKING: # pragma: NO COVER
import pandas

# Required dependencies, but treat as optional so that _test_google_api_imports
# can provide a better error message.
try:
Expand Down Expand Up @@ -64,6 +73,10 @@ def _test_google_api_imports():
raise ImportError("pandas-gbq requires google-cloud-bigquery") from ex


def _is_query(query_or_table: str) -> bool:
return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None


class DatasetCreationError(ValueError):
"""
Raised when the create dataset method fails
Expand Down Expand Up @@ -374,6 +387,30 @@ def process_http_error(ex):

raise GenericGBQException("Reason: {0}".format(ex))

def download_table(
self,
table_id: str,
max_results: Optional[int] = None,
progress_bar_type: Optional[str] = None,
dtypes: Optional[Dict[str, Union[str, Any]]] = None,
) -> "pandas.DataFrame":
self._start_timer()

try:
table_ref = bigquery.TableReference.from_string(
table_id, default_project=self.project_id
)
rows_iter = self.client.list_rows(table_ref, max_results=max_results)
except self.http_error as ex:
self.process_http_error(ex)

return self._download_results(
rows_iter,
max_results=max_results,
progress_bar_type=progress_bar_type,
user_dtypes=dtypes,
)

def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
from concurrent.futures import TimeoutError
from google.auth.exceptions import RefreshError
Expand All @@ -390,15 +427,6 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
if config is not None:
job_config.update(config)

if "query" in config and "query" in config["query"]:
if query is not None:
raise ValueError(
"Query statement can't be specified "
"inside config while it is specified "
"as parameter"
)
query = config["query"].pop("query")

self._start_timer()

try:
Expand Down Expand Up @@ -464,15 +492,25 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
)

dtypes = kwargs.get("dtypes")

# Ensure destination is populated.
try:
query_reply.result()
except self.http_error as ex:
self.process_http_error(ex)

rows_iter = self.client.list_rows(
query_reply.destination, max_results=max_results
)
return self._download_results(
query_reply,
rows_iter,
max_results=max_results,
progress_bar_type=progress_bar_type,
user_dtypes=dtypes,
)

def _download_results(
self, query_job, max_results=None, progress_bar_type=None, user_dtypes=None,
self, rows_iter, max_results=None, progress_bar_type=None, user_dtypes=None,
):
# No results are desired, so don't bother downloading anything.
if max_results == 0:
Expand Down Expand Up @@ -504,11 +542,6 @@ def _download_results(
to_dataframe_kwargs["create_bqstorage_client"] = create_bqstorage_client

try:
query_job.result()
# Get the table schema, so that we can list rows.
destination = self.client.get_table(query_job.destination)
rows_iter = self.client.list_rows(destination, max_results=max_results)

schema_fields = [field.to_api_repr() for field in rows_iter.schema]
conversion_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields)
conversion_dtypes.update(user_dtypes)
Expand Down Expand Up @@ -644,7 +677,7 @@ def _cast_empty_df_dtypes(schema_fields, df):


def read_gbq(
query,
query_or_table,
project_id=None,
index_col=None,
col_order=None,
Expand All @@ -668,17 +701,18 @@ def read_gbq(

This method uses the Google Cloud client library to make requests to
Google BigQuery, documented `here
<https://google-cloud-python.readthedocs.io/en/latest/bigquery/usage.html>`__.
<https://googleapis.dev/python/bigquery/latest/index.html>`__.

See the :ref:`How to authenticate with Google BigQuery <authentication>`
guide for authentication instructions.

Parameters
----------
query : str
SQL-Like Query to return data values.
query_or_table : str
SQL query to return data values. If the string is a table ID, fetch the
rows directly from the table without running a query.
project_id : str, optional
Google BigQuery Account project ID. Optional when available from
Google Cloud Platform project ID. Optional when available from
the environment.
index_col : str, optional
Name of result column to use for index in results DataFrame.
Expand All @@ -688,14 +722,14 @@ def read_gbq(
reauth : boolean, default False
Force Google BigQuery to re-authenticate the user. This is useful
if multiple accounts are used.
auth_local_webserver : boolean, default False
Use the `local webserver flow`_ instead of the `console flow`_
when getting user credentials.

.. _local webserver flow:
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server
.. _console flow:
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console
auth_local_webserver : bool, default False
Use the `local webserver flow
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server>`_
instead of the `console flow
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console>`_
when getting user credentials. Your code must run on the same machine
as your web browser and your web browser can access your application
via ``localhost:808X``.

.. versionadded:: 0.2.0
dialect : str, default 'standard'
Expand Down Expand Up @@ -745,13 +779,6 @@ def read_gbq(
<https://cloud.google.com/bigquery/docs/access-control#roles>`__
permission on the project you are billing queries to.

**Note:** Due to a `known issue in the ``google-cloud-bigquery``
package
<https://github.com/googleapis/google-cloud-python/pull/7633>`__
(fixed in version 1.11.0), you must write your query results to a
destination table. To do this with ``read_gbq``, supply a
``configuration`` dictionary.

This feature requires the ``google-cloud-bigquery-storage`` and
``pyarrow`` packages.

Expand Down Expand Up @@ -823,6 +850,15 @@ def read_gbq(
if dialect not in ("legacy", "standard"):
raise ValueError("'{0}' is not valid for dialect".format(dialect))

if configuration and "query" in configuration and "query" in configuration["query"]:
if query_or_table is not None:
raise ValueError(
"Query statement can't be specified "
"inside config while it is specified "
"as parameter"
)
query_or_table = configuration["query"].pop("query")

connector = GbqConnector(
project_id,
reauth=reauth,
Expand All @@ -834,13 +870,21 @@ def read_gbq(
use_bqstorage_api=use_bqstorage_api,
)

final_df = connector.run_query(
query,
configuration=configuration,
max_results=max_results,
progress_bar_type=progress_bar_type,
dtypes=dtypes,
)
if _is_query(query_or_table):
final_df = connector.run_query(
query_or_table,
configuration=configuration,
max_results=max_results,
progress_bar_type=progress_bar_type,
dtypes=dtypes,
)
else:
final_df = connector.download_table(
query_or_table,
max_results=max_results,
progress_bar_type=progress_bar_type,
dtypes=dtypes,
)

# Reindex the DataFrame on the provided column
if index_col is not None:
Expand Down Expand Up @@ -889,7 +933,7 @@ def to_gbq(

This method uses the Google Cloud client library to make requests to
Google BigQuery, documented `here
<https://google-cloud-python.readthedocs.io/en/latest/bigquery/usage.html>`__.
<https://googleapis.dev/python/bigquery/latest/index.html>`__.

See the :ref:`How to authenticate with Google BigQuery <authentication>`
guide for authentication instructions.
Expand All @@ -902,7 +946,7 @@ def to_gbq(
Name of table to be written, in the form ``dataset.tablename`` or
``project.dataset.tablename``.
project_id : str, optional
Google BigQuery Account project ID. Optional when available from
Google Cloud Platform project ID. Optional when available from
the environment.
chunksize : int, optional
Number of rows to be inserted in each chunk from the dataframe.
Expand All @@ -920,13 +964,13 @@ def to_gbq(
``'append'``
If table exists, insert data. Create if does not exist.
auth_local_webserver : bool, default False
Use the `local webserver flow`_ instead of the `console flow`_
when getting user credentials.

.. _local webserver flow:
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server
.. _console flow:
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console
Use the `local webserver flow
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server>`_
instead of the `console flow
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console>`_
when getting user credentials. Your code must run on the same machine
as your web browser and your web browser can access your application
via ``localhost:808X``.

.. versionadded:: 0.2.0
table_schema : list of dicts, optional
Expand Down
8 changes: 7 additions & 1 deletion pandas_gbq/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
Private module.
"""

import pandas.api.types


def localize_df(df, schema_fields):
"""Localize any TIMESTAMP columns to tz-aware type.
Expand Down Expand Up @@ -38,7 +40,11 @@ def localize_df(df, schema_fields):
if "mode" in field and field["mode"].upper() == "REPEATED":
continue

if field["type"].upper() == "TIMESTAMP" and df[column].dt.tz is None:
if (
field["type"].upper() == "TIMESTAMP"
and pandas.api.types.is_datetime64_ns_dtype(df.dtypes[column])
and df[column].dt.tz is None
):
df[column] = df[column].dt.tz_localize("UTC")

return df
19 changes: 19 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# license that can be found in the LICENSE file.

import os
import functools
import pathlib

from google.cloud import bigquery
Expand Down Expand Up @@ -56,6 +57,24 @@ def project(project_id):
return project_id


@pytest.fixture
def to_gbq(credentials, project_id):
import pandas_gbq

return functools.partial(
pandas_gbq.to_gbq, project_id=project_id, credentials=credentials
)


@pytest.fixture
def read_gbq(credentials, project_id):
import pandas_gbq

return functools.partial(
pandas_gbq.read_gbq, project_id=project_id, credentials=credentials
)


@pytest.fixture()
def random_dataset_id(bigquery_client: bigquery.Client, project_id: str):
dataset_id = prefixer.create_prefix()
Expand Down
Loading