Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

style: Fix and enforce formatting #33

Merged
merged 5 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ test: install
.PHONY: lint
lint: #! Run type analysis and linting checks
lint: install
poetry run mypy ld_eventsource
@poetry run mypy ld_eventsource
@poetry run isort --check --atomic ld_eventsource contract-tests
@poetry run pycodestyle ld_eventsource contract-tests

#
# Documentation generation
Expand Down
14 changes: 9 additions & 5 deletions contract-tests/service.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from stream_entity import StreamEntity

import json
import logging
import os
import sys
import urllib3
from logging.config import dictConfig

from flask import Flask, request
from flask.logging import default_handler
from logging.config import dictConfig
from stream_entity import StreamEntity

default_port = 8000

Expand All @@ -30,7 +29,7 @@
'handlers': ['console']
},
'loggers': {
'werkzeug': { 'level': 'ERROR' } # disable irrelevant Flask app logging
'werkzeug': {'level': 'ERROR'} # disable irrelevant Flask app logging
}
})

Expand All @@ -54,11 +53,13 @@ def status():
}
return (json.dumps(body), 200, {'Content-type': 'application/json'})


@app.route('/', methods=['DELETE'])
def delete_stop_service():
global_log.info("Test service has told us to exit")
os._exit(0)


@app.route('/', methods=['POST'])
def post_create_stream():
global stream_counter, streams
Expand All @@ -74,6 +75,7 @@ def post_create_stream():

return ('', 201, {'Location': resource_url})


@app.route('/streams/<id>', methods=['POST'])
def post_stream_command(id):
global streams
Expand All @@ -87,6 +89,7 @@ def post_stream_command(id):
return ('', 400)
return ('', 204)


@app.route('/streams/<id>', methods=['DELETE'])
def delete_stream(id):
global streams
Expand All @@ -97,6 +100,7 @@ def delete_stream(id):
stream.close()
return ('', 204)


if __name__ == "__main__":
port = default_port
if sys.argv[len(sys.argv) - 1] != 'service.py':
Expand Down
78 changes: 42 additions & 36 deletions contract-tests/stream_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
import sys
import threading
import traceback

import urllib3

# Import ld_eventsource from parent directory
sys.path.insert(1, os.path.join(sys.path[0], '..'))
from ld_eventsource import *
from ld_eventsource.actions import *
from ld_eventsource.config import *

from ld_eventsource import * # noqa: E402
from ld_eventsource.actions import * # noqa: E402
from ld_eventsource.config import * # noqa: E402

http_client = urllib3.PoolManager()


def millis_to_seconds(t):
return None if t is None else t / 1000

Expand All @@ -27,7 +28,7 @@ def __init__(self, options):
self.closed = False
self.callback_counter = 0
self.sse = None

thread = threading.Thread(target=self.run)
thread.start()

Expand All @@ -38,60 +39,65 @@ def run(self):
connect = ConnectStrategy.http(
url=stream_url,
headers=self.options.get("headers"),
urllib3_request_options=None if self.options.get("readTimeoutMs") is None else {
"timeout": urllib3.Timeout(read=millis_to_seconds(self.options.get("readTimeoutMs")))
}
)
urllib3_request_options=(
None
if self.options.get("readTimeoutMs") is None
else {
"timeout": urllib3.Timeout(
read=millis_to_seconds(self.options.get("readTimeoutMs"))
)
}
),
)
sse = SSEClient(
connect,
initial_retry_delay=millis_to_seconds(self.options.get("initialDelayMs")),
initial_retry_delay=millis_to_seconds(
self.options.get("initialDelayMs")
),
last_event_id=self.options.get("lastEventId"),
error_strategy=ErrorStrategy.from_lambda(lambda _:
(ErrorStrategy.FAIL if self.closed else ErrorStrategy.CONTINUE, None)),
logger=self.log
error_strategy=ErrorStrategy.from_lambda(
lambda _: (
ErrorStrategy.FAIL if self.closed else ErrorStrategy.CONTINUE,
None,
)
),
logger=self.log,
)
self.sse = sse
for item in sse.all:
if isinstance(item, Event):
self.log.info('Received event from stream (%s)', item.event)
self.send_message({
'kind': 'event',
'event': {
'type': item.event,
'data': item.data,
'id': item.last_event_id
self.send_message(
{
'kind': 'event',
'event': {
'type': item.event,
'data': item.data,
'id': item.last_event_id,
},
}
})
)
elif isinstance(item, Comment):
self.log.info('Received comment from stream: %s', item.comment)
self.send_message({
'kind': 'comment',
'comment': item.comment
})
self.send_message({'kind': 'comment', 'comment': item.comment})
elif isinstance(item, Fault):
if self.closed:
break
# item.error will be None if this is just an EOF rather than an I/O error or HTTP error.
# Currently the test harness does not expect us to send an error message in that case.
if item.error:
self.log.info('Received error from stream: %s' % item.error)
self.send_message({
'kind': 'error',
'error': str(item.error)
})
self.send_message({'kind': 'error', 'error': str(item.error)})
except Exception as e:
self.log.info('Received error from stream: %s', e)
self.log.info(traceback.format_exc())
self.send_message({
'kind': 'error',
'error': str(e)
})
self.send_message({'kind': 'error', 'error': str(e)})

def do_command(self, command: str) -> bool:
self.log.info('Test service sent command: %s' % command)
# currently we support no special commands
return False

def send_message(self, message):
global http_client

Expand All @@ -104,9 +110,9 @@ def send_message(self, message):
resp = http_client.request(
'POST',
callback_url,
headers = {'Content-Type': 'application/json'},
body = json.dumps(message)
)
headers={'Content-Type': 'application/json'},
body=json.dumps(message),
)
if resp.status >= 300 and not self.closed:
self.log.error('Callback request returned HTTP error %d', resp.status)
except Exception as e:
Expand Down
30 changes: 19 additions & 11 deletions ld_eventsource/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class Action:
"""
Base class for objects that can be returned by :attr:`.SSEClient.all`.
"""

pass


Expand All @@ -16,11 +17,13 @@ class Event(Action):
Instances of this class are returned by both :attr:`.SSEClient.events` and
:attr:`.SSEClient.all`.
"""
def __init__(self,
event: str='message',
data: str='',
id: Optional[str]=None,
last_event_id: Optional[str]=None

def __init__(
self,
event: str = 'message',
data: str = '',
id: Optional[str] = None,
last_event_id: Optional[str] = None,
):
self._event = event
self._data = data
Expand Down Expand Up @@ -58,27 +61,31 @@ def last_event_id(self) -> Optional[str]:
def __eq__(self, other):
if not isinstance(other, Event):
return False
return self._event == other._event and self._data == other._data \
and self._id == other._id and self.last_event_id == other.last_event_id
return (
self._event == other._event
and self._data == other._data
and self._id == other._id
and self.last_event_id == other.last_event_id
)

def __repr__(self):
return "Event(event=\"%s\", data=%s, id=%s, last_event_id=%s)" % (
self._event,
json.dumps(self._data),
"None" if self._id is None else json.dumps(self._id),
"None" if self._last_event_id is None else json.dumps(self._last_event_id)
"None" if self._last_event_id is None else json.dumps(self._last_event_id),
)


class Comment(Action):
"""
A comment received by :class:`.SSEClient`.

Comment lines (any line beginning with a colon) have no significance in the SSE specification
and can be ignored, but if you want to see them, use :attr:`.SSEClient.all`. They will never
be returned by :attr:`.SSEClient.events`.
"""

def __init__(self, comment: str):
self._comment = comment

Expand All @@ -104,6 +111,7 @@ class Start(Action):
A ``Start`` is returned for the first successful connection. If the client reconnects
after a failure, there will be a :class:`.Fault` followed by a ``Start``.
"""

pass


Expand All @@ -121,7 +129,7 @@ class Fault(Action):

def __init__(self, error: Optional[Exception]):
self.__error = error

@property
def error(self) -> Optional[Exception]:
"""
Expand Down
3 changes: 2 additions & 1 deletion ld_eventsource/config/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .connect_strategy import ConnectStrategy, ConnectionClient, ConnectionResult
from .connect_strategy import (ConnectionClient, ConnectionResult,
ConnectStrategy)
from .error_strategy import ErrorStrategy
from .retry_delay_strategy import RetryDelayStrategy
25 changes: 14 additions & 11 deletions ld_eventsource/config/connect_strategy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

from logging import Logger
from typing import Callable, Iterator, Optional, Union

from urllib3 import PoolManager

from ld_eventsource.http import _HttpClientImpl, _HttpConnectParams
Expand Down Expand Up @@ -33,9 +35,9 @@ def create_client(self, logger: Logger) -> ConnectionClient:
@staticmethod
def http(
url: str,
headers: Optional[dict]=None,
pool: Optional[PoolManager]=None,
urllib3_request_options: Optional[dict]=None
headers: Optional[dict] = None,
pool: Optional[PoolManager] = None,
urllib3_request_options: Optional[dict] = None,
) -> ConnectStrategy:
"""
Creates the default HTTP implementation, specifying request parameters.
Expand All @@ -46,7 +48,9 @@ def http(
:param urllib3_request_options: optional ``kwargs`` to add to the ``request`` call; these
can include any parameters supported by ``urllib3``, such as ``timeout``
"""
return _HttpConnectStrategy(_HttpConnectParams(url, headers, pool, urllib3_request_options))
return _HttpConnectStrategy(
_HttpConnectParams(url, headers, pool, urllib3_request_options)
)


class ConnectionClient:
Expand All @@ -65,7 +69,9 @@ def connect(self, last_event_id: Optional[str]) -> ConnectionResult:
(should be sent to the server to support resuming an interrupted stream)
:return: a :class:`ConnectionResult` representing the stream
"""
raise NotImplementedError("ConnectionClient base class cannot be used by itself")
raise NotImplementedError(
"ConnectionClient base class cannot be used by itself"
)

def close(self):
"""
Expand All @@ -80,16 +86,12 @@ def __exit__(self, type, value, traceback):
self.close()



class ConnectionResult:
"""
The return type of :meth:`ConnectionClient.connect()`.
"""
def __init__(
self,
stream: Iterator[bytes],
closer: Optional[Callable]
):

def __init__(self, stream: Iterator[bytes], closer: Optional[Callable]):
self.__stream = stream
self.__closer = closer

Expand Down Expand Up @@ -118,6 +120,7 @@ def __exit__(self, type, value, traceback):
# _HttpConnectStrategy and _HttpConnectionClient are defined here rather than in http.py to avoid
# a circular module reference.


class _HttpConnectStrategy(ConnectStrategy):
def __init__(self, params: _HttpConnectParams):
self.__params = params
Expand Down
Loading