Skip to content

Commit

Permalink
Merge pull request #2636 from dhermes/logging-iterators-1
Browse files Browse the repository at this point in the history
Converting Logging client->list_entries to iterator.
  • Loading branch information
dhermes authored Oct 31, 2016
2 parents 319fd8f + e38908e commit 016415a
Show file tree
Hide file tree
Showing 15 changed files with 671 additions and 292 deletions.
17 changes: 14 additions & 3 deletions core/google/cloud/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ class HTTPIterator(Iterator):
_PAGE_TOKEN = 'pageToken'
_MAX_RESULTS = 'maxResults'
_RESERVED_PARAMS = frozenset([_PAGE_TOKEN, _MAX_RESULTS])
_HTTP_METHOD = 'GET'

def __init__(self, client, path, item_to_value,
items_key=DEFAULT_ITEMS_KEY,
Expand Down Expand Up @@ -378,9 +379,19 @@ def _get_next_page_response(self):
:rtype: dict
:returns: The parsed JSON response of the next page's contents.
"""
return self.client.connection.api_request(
method='GET', path=self.path,
query_params=self._get_query_params())
params = self._get_query_params()
if self._HTTP_METHOD == 'GET':
return self.client.connection.api_request(
method=self._HTTP_METHOD,
path=self.path,
query_params=params)
elif self._HTTP_METHOD == 'POST':
return self.client.connection.api_request(
method=self._HTTP_METHOD,
path=self.path,
data=params)
else:
raise ValueError('Unexpected HTTP method', self._HTTP_METHOD)


class GAXIterator(Iterator):
Expand Down
26 changes: 26 additions & 0 deletions core/unit_tests/test_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,32 @@ def test__get_next_page_response_new_no_token_in_response(self):
self.assertEqual(kw['path'], path)
self.assertEqual(kw['query_params'], {})

def test__get_next_page_response_with_post(self):
path = '/foo'
returned = {'green': 'eggs', 'ham': 55}
connection = _Connection(returned)
client = _Client(connection)
iterator = self._makeOne(client, path, None)
iterator._HTTP_METHOD = 'POST'
response = iterator._get_next_page_response()
self.assertEqual(response, returned)

self.assertEqual(len(connection._requested), 1)
called_kwargs = connection._requested[0]
self.assertEqual(called_kwargs, {
'method': iterator._HTTP_METHOD,
'path': path,
'data': {},
})

def test__get_next_page_bad_http_method(self):
path = '/foo'
client = _Client(None)
iterator = self._makeOne(client, path, None)
iterator._HTTP_METHOD = 'NOT-A-VERB'
with self.assertRaises(ValueError):
iterator._get_next_page_response()


class TestGAXIterator(unittest.TestCase):

Expand Down
32 changes: 19 additions & 13 deletions docs/logging-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ Fetch entries for the default project.

>>> from google.cloud import logging
>>> client = logging.Client()
>>> entries, token = client.list_entries() # API call
>>> for entry in entries:
>>> for entry in client.list_entries(): # API call(s)
... timestamp = entry.timestamp.isoformat()
... print('%sZ: %s' %
... (timestamp, entry.payload))
Expand All @@ -82,8 +81,9 @@ Fetch entries across multiple projects.

>>> from google.cloud import logging
>>> client = logging.Client()
>>> entries, token = client.list_entries(
... project_ids=['one-project', 'another-project']) # API call
>>> iterator = client.list_entries(
... project_ids=['one-project', 'another-project'])
>>> entries = list(iterator) # API call(s)

Filter entries retrieved using the `Advanced Logs Filters`_ syntax

Expand All @@ -94,15 +94,17 @@ Filter entries retrieved using the `Advanced Logs Filters`_ syntax
>>> from google.cloud import logging
>>> client = logging.Client()
>>> FILTER = "log:log_name AND textPayload:simple"
>>> entries, token = client.list_entries(filter=FILTER) # API call
>>> iterator = client.list_entries(filter=FILTER)
>>> entries = list(iterator) # API call(s)

Sort entries in descending timestamp order.

.. doctest::

>>> from google.cloud import logging
>>> client = logging.Client()
>>> entries, token = client.list_entries(order_by=logging.DESCENDING) # API call
>>> iterator = client.list_entries(order_by=logging.DESCENDING)
>>> entries = list(iterator) # API call(s)

Retrieve entries in batches of 10, iterating until done.

Expand All @@ -111,12 +113,15 @@ Retrieve entries in batches of 10, iterating until done.
>>> from google.cloud import logging
>>> client = logging.Client()
>>> retrieved = []
>>> token = None
>>> while True:
... entries, token = client.list_entries(page_size=10, page_token=token) # API call
... retrieved.extend(entries)
... if token is None:
... break
>>> iterator = client.list_entries(page_size=10, page_token=token)
>>> pages = iterator.pages
>>> page1 = next(pages) # API call
>>> for entry in page1:
... do_something(entry)
...
>>> page2 = next(pages) # API call
>>> for entry in page2:
... do_something_else(entry)

Retrieve entries for a single logger, sorting in descending timestamp order:

Expand All @@ -125,7 +130,8 @@ Retrieve entries for a single logger, sorting in descending timestamp order:
>>> from google.cloud import logging
>>> client = logging.Client()
>>> logger = client.logger('log_name')
>>> entries, token = logger.list_entries(order_by=logging.DESCENDING) # API call
>>> iterator = logger.list_entries(order_by=logging.DESCENDING)
>>> entries = list(iterator) # API call(s)

Delete all entries for a logger
-------------------------------
Expand Down
3 changes: 1 addition & 2 deletions logging/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ Example of fetching entries:

.. code:: python
entries, token = logger.list_entries()
for entry in entries:
for entry in logger.list_entries():
print(entry.payload)
See the ``google-cloud-python`` API `logging documentation`_ to learn how to
Expand Down
65 changes: 54 additions & 11 deletions logging/google/cloud/logging/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""GAX wrapper for Logging API requests."""

import functools

from google.gax import CallOptions
from google.gax import INITIAL_PAGE
from google.gax.errors import GaxError
Expand All @@ -28,6 +30,8 @@
from google.cloud._helpers import _datetime_to_rfc3339
from google.cloud.exceptions import Conflict
from google.cloud.exceptions import NotFound
from google.cloud.iterator import GAXIterator
from google.cloud.logging._helpers import entry_from_resource


class _LoggingAPI(object):
Expand All @@ -36,9 +40,13 @@ class _LoggingAPI(object):
:type gax_api:
:class:`google.logging.v2.logging_service_v2_api.LoggingServiceV2Api`
:param gax_api: API object used to make GAX requests.
:type client: :class:`~google.cloud.logging.client.Client`
:param client: The client that owns this API object.
"""
def __init__(self, gax_api):
def __init__(self, gax_api, client):
self._gax_api = gax_api
self._client = client

def list_entries(self, projects, filter_='', order_by='',
page_size=0, page_token=None):
Expand All @@ -49,8 +57,9 @@ def list_entries(self, projects, filter_='', order_by='',
defaults to the project bound to the API's client.
:type filter_: str
:param filter_: a filter expression. See:
https://cloud.google.com/logging/docs/view/advanced_filters
:param filter_:
a filter expression. See:
https://cloud.google.com/logging/docs/view/advanced_filters
:type order_by: str
:param order_by: One of :data:`~google.cloud.logging.ASCENDING`
Expand All @@ -65,21 +74,24 @@ def list_entries(self, projects, filter_='', order_by='',
passed, the API will return the first page of
entries.
:rtype: tuple, (list, str)
:returns: list of mappings, plus a "next page token" string:
if not None, indicates that more entries can be retrieved
with another call (pass that value as ``page_token``).
:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of :class:`~google.cloud.logging.entries._BaseEntry`
accessible to the current API.
"""
if page_token is None:
page_token = INITIAL_PAGE
options = CallOptions(page_token=page_token)
page_iter = self._gax_api.list_log_entries(
projects, filter_=filter_, order_by=order_by,
page_size=page_size, options=options)
entries = [MessageToDict(entry_pb)
for entry_pb in page_iter.next()]
token = page_iter.page_token or None
return entries, token

# We attach a mutable loggers dictionary so that as Logger
# objects are created by entry_from_resource, they can be
# re-used by other log entries from the same logger.
loggers = {}
item_to_value = functools.partial(
_item_to_entry, loggers=loggers)
return GAXIterator(self._client, page_iter, item_to_value)

def write_entries(self, entries, logger_name=None, resource=None,
labels=None):
Expand Down Expand Up @@ -430,3 +442,34 @@ def _log_entry_mapping_to_pb(mapping):
mapping['timestamp'] = _datetime_to_rfc3339(mapping['timestamp'])
ParseDict(mapping, entry_pb)
return entry_pb


def _item_to_entry(iterator, entry_pb, loggers):
"""Convert a log entry protobuf to the native object.
.. note::
This method does not have the correct signature to be used as
the ``item_to_value`` argument to
:class:`~google.cloud.iterator.Iterator`. It is intended to be
patched with a mutable ``loggers`` argument that can be updated
on subsequent calls. For an example, see how the method is
used above in :meth:`_LoggingAPI.list_entries`.
:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.
:type entry_pb: :class:`~google.logging.v2.log_entry_pb2.LogEntry`
:param entry_pb: Log entry protobuf returned from the API.
:type loggers: dict
:param loggers:
A mapping of logger fullnames -> loggers. If the logger
that owns the entry is not in ``loggers``, the entry
will have a newly-created logger.
:rtype: :class:`~google.cloud.logging.entries._BaseEntry`
:returns: The next log entry in the page.
"""
resource = MessageToDict(entry_pb)
return entry_from_resource(resource, iterator.client, loggers)
48 changes: 48 additions & 0 deletions logging/google/cloud/logging/_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Common logging helpers."""


from google.cloud.logging.entries import ProtobufEntry
from google.cloud.logging.entries import StructEntry
from google.cloud.logging.entries import TextEntry


def entry_from_resource(resource, client, loggers):
"""Detect correct entry type from resource and instantiate.
:type resource: dict
:param resource: one entry resource from API response
:type client: :class:`~google.cloud.logging.client.Client`
:param client: Client that owns the log entry.
:type loggers: dict
:param loggers:
A mapping of logger fullnames -> loggers. If the logger
that owns the entry is not in ``loggers``, the entry
will have a newly-created logger.
:rtype: :class:`~google.cloud.logging.entries._BaseEntry`
:returns: The entry instance, constructed via the resource
"""
if 'textPayload' in resource:
return TextEntry.from_api_repr(resource, client, loggers)
elif 'jsonPayload' in resource:
return StructEntry.from_api_repr(resource, client, loggers)
elif 'protoPayload' in resource:
return ProtobufEntry.from_api_repr(resource, client, loggers)

raise ValueError('Cannot parse log entry resource')
Loading

0 comments on commit 016415a

Please sign in to comment.