Skip to content

Commit

Permalink
Merge pull request #7 from zhulik/handle-down-network
Browse files Browse the repository at this point in the history
Handle down network
  • Loading branch information
zhulik authored May 20, 2021
2 parents c24cb20 + 445c658 commit 3fad75e
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 29 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ console.py
docs/build
.vscode/
requirements.txt
event_examples/


# Byte-compiled / optimized / DLL files
Expand Down
53 changes: 29 additions & 24 deletions aiotractive/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,31 +82,36 @@ async def _authenticate(self):
if self._user_credentials is not None:
return self._user_credentials

async with self.session.request(
"POST",
self.API_URL.join(URL(self.TOKEN_URI)),
data=json.dumps(
{
"platform_email": self._login,
"platform_token": self._password,
"grant_type": "tractive",
}
),
headers=self.BASE_HEADERS,
timeout=self._timeout,
) as response:
try:
response.raise_for_status()
if "Content-Type" in response.headers and "application/json" in response.headers["Content-Type"]:
self._user_credentials = await response.json()
self._auth_headers = {
"x-tractive-user": self._user_credentials["user_id"],
"authorization": f"Bearer {self._user_credentials['access_token']}",
try:
async with self.session.request(
"POST",
self.API_URL.join(URL(self.TOKEN_URI)),
data=json.dumps(
{
"platform_email": self._login,
"platform_token": self._password,
"grant_type": "tractive",
}
return self._user_credentials
except ClientResponseError as error:
if error.status in [401, 403]:
raise UnauthorizedError from error
),
headers=self.BASE_HEADERS,
timeout=self._timeout,
) as response:
try:
response.raise_for_status()
if "Content-Type" in response.headers and "application/json" in response.headers["Content-Type"]:
self._user_credentials = await response.json()
self._auth_headers = {
"x-tractive-user": self._user_credentials["user_id"],
"authorization": f"Bearer {self._user_credentials['access_token']}",
}
return self._user_credentials
except ClientResponseError as error:
if error.status in [401, 403]:
raise UnauthorizedError from error
except Exception as error:
raise TractiveError from error
except Exception as error:
raise TractiveError from error

async def close(self):
"""Close the session."""
Expand Down
61 changes: 59 additions & 2 deletions aiotractive/channel.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,81 @@
import asyncio
import json
import time
from asyncio.exceptions import TimeoutError as AIOTimeoutError

from .exceptions import DisconnectedError


class Channel:
CHANNEL_URL = "https://channel.tractive.com/3/channel"
IGNORE_MESSAGES = ["handshake", "keep-alive"]

KEEP_ALIVE_TIMEOUT = 7 # seconds
CHECK_CONNECTION_TIME = 4 # seconds

def __init__(self, api):
self._api = api
self._last_keep_alive = None
self._listen_task = None
self._check_connection_task = None
self._queue = asyncio.Queue()

async def listen(self):
self._check_connection_task = asyncio.create_task(self._check_connection())
self._listen_task = asyncio.create_task(self._listen())
while True:
event = await self._queue.get()
self._queue.task_done()

if event["type"] == "event":
yield event["event"]

if event["type"] == "error":
self._check_connection_task.cancel()

await self._check_connection_task
raise event["error"]

if event["type"] == "cancelled":
self._listen_task.cancel()

await self._listen_task
raise DisconnectedError() from event["error"]

async def _listen(self):
while True:
try:
async with self._api.session.request(
"POST", self.CHANNEL_URL, headers=await self._api.auth_headers()
"POST",
self.CHANNEL_URL,
headers=await self._api.auth_headers(),
) as response:
async for data, _ in response.content.iter_chunks():
event = json.loads(data)
if event["message"] == "keep-alive":
self._last_keep_alive = time.time()
continue
if event["message"] in self.IGNORE_MESSAGES:
continue
yield event
await self._queue.put({"type": "event", "event": event})
except AIOTimeoutError:
continue
except asyncio.CancelledError as error:
await self._queue.put({"type": "cancelled", "error": error})
return
except Exception as error: # pylint: disable=broad-except
await self._queue.put({"type": "error", "error": error})
return

async def _check_connection(self):
try:
while True:
if self._last_keep_alive is not None and (
time.time() - self._last_keep_alive > self.KEEP_ALIVE_TIMEOUT
):
self._listen_task.cancel()
return

await asyncio.sleep(self.CHECK_CONNECTION_TIME)
except asyncio.CancelledError:
return
4 changes: 4 additions & 0 deletions aiotractive/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ class UnauthorizedError(TractiveError):

class NotFoundError(TractiveError):
"""When the server responds with 404."""


class DisconnectedError(TractiveError):
"""Channel disconnected"""
5 changes: 3 additions & 2 deletions aiotractive/tractive.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ async def trackable_objects(self):
objects = await self._api.request(f"user/{await self._api.user_id()}/trackable_objects")
return [TrackableObject(self._api, t) for t in objects]

def events(self):
return Channel(self._api).listen()
async def events(self):
async for event in Channel(self._api).listen():
yield event

async def close(self):
"""Close open client session."""
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setup(
name="aiotractive",
version="0.2.1",
version="0.3.0",
author="Gleb Sinyavskiy",
author_email="zhulik.gleb@gmail.com",
description="Asynchronous Python client for the Tractive REST API",
Expand Down

0 comments on commit 3fad75e

Please sign in to comment.