Skip to content

[WIP] Api v2.0 #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,17 @@ unit-tests:

## integration-tests: Run integration tests.
.PHONY: integration-tests
integration-tests: export CACHE_URL = redis://localhost:6379
integration-tests: test_path = tests/integration
integration-tests:
@coverage run -m pytest $(test_path)
@CACHE_URL=redis://localhost:6379 \
PROVIDER='' \
TEST_DATASET='' \
API_ROOT_URL=https://platform.localhost.ai \
DATA_API_ROOT_URL=https://data.localhost.ai \
TEST_API_KEY='' \
TEST_BEARER_TOKEN='' \
TEST_COMPANY_ID='-1' \
coverage run -m pytest --vcr-record=none $(test_path)

## coverage: Display code coverage in the console.
.PHONY: coverage
Expand Down
4 changes: 3 additions & 1 deletion requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
coverage==5.3
freezegun==1.0.0
pytest==6.1.2
pytest==7.1.2
pytest-httpx==0.20.0
pytest-mock==3.3.1
pytest-vcr~=1.0.2
requests-mock==1.8.0
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ parallel = True

[coverage:report]
precision = 2
fail_under = 98.09
fail_under = 98.47
skip_covered = True
show_missing = True
exclude_lines =
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
"pydantic >=1.8.2, <2.0.0",
"redis >=3.5.3, <4.0.0",
"requests >=2.25.0, <3.0.0",
"httpx >=0.22.0, <0.23.0",
"PyYAML >=6.0, <6.1",
],
python_requires='>=3.8, <4.0',
license='The Unlicense',
Expand Down
1 change: 1 addition & 0 deletions src/corva/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .api import Api
from .api_adapter import InsertResult
from .handlers import scheduled, stream, task
from .logger import CORVA_LOGGER as Logger
from .models.scheduled.scheduled import (
Expand Down
265 changes: 265 additions & 0 deletions src/corva/api_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
import dataclasses
import functools
import json
import logging
from typing import Callable, List, Optional, Sequence

import httpx
import pydantic
import yaml


def _httpx_headers_to_dict(headers: httpx.Headers) -> dict:
return json.loads(
repr(headers) # use built-in `repr` as it obfuscates sensitive headers
.strip("Headers()") # strip obsolete data
.replace(
"'", '"'
) # replace single quotes with double ones to get proper json string
)


def _failed_request_msg(
msg: str,
request: httpx.Request,
response: Optional[httpx.Response],
) -> str:
data = {"message": f"Request failed - {msg}"}

if response:
# log response first, so there is less chance it gets truncated
# and users are able to see server error message
data["response"] = {
"code": response.status_code,
"reason": response.reason_phrase,
"headers": _httpx_headers_to_dict(response.headers),
"content": str(response.content),
}

data["request"] = {
"method": request.method,
"url": str(request.url),
"headers": _httpx_headers_to_dict(request.headers),
"content": str(request.content),
}

# use yaml because it is much more readable in logs
return yaml.dump(data, sort_keys=False)


def logging_send(func: Callable, *, logger: logging.Logger) -> Callable:
@functools.wraps(func)
def wrapper(request: httpx.Request, *args, **kwargs):
try:
response = func(request, *args, **kwargs)
except httpx.HTTPError as exc:
# Response was not received at all
logger.error(
_failed_request_msg(msg=str(exc), request=request, response=None)
)
raise

try:
response.raise_for_status()
except httpx.HTTPStatusError as exc:
# Response has unsuccessful status
logger.error(
_failed_request_msg(msg=str(exc), request=request, response=response)
)

return response

return wrapper


class InsertResult(pydantic.BaseModel):
inserted_ids: List[str]
failed_count: int
messages: List[str]


class DataApiV1Sdk:
def __init__(self, client: httpx.Client):
self.http = client

def get(
self,
provider: str,
dataset: str,
*,
query: dict,
sort: dict,
limit: int,
skip: int = 0,
fields: Optional[str] = None,
) -> List[dict]:
"""Fetches data from the endpoint GET 'data/{provider}/{dataset}/'.

Args:
provider: company name owning the dataset.
dataset: dataset name.
query: search conditions. Example: {"asset_id": 123} - will fetch data
for asset with id 123.
sort: sort conditions. Example: {"timestamp": 1} - will sort data
in ascending order by timestamp.
limit: number of data points to fecth.
Recommendation for setting the limit:
1. The bigger ↑ each data point is - the smaller ↓ the limit;
2. The smaller ↓ each data point is - the bigger ↑ the limit.
skip: exclude from the response the first N items of the dataset.
Note: skip should only be used for small amounts of data, having a
large skip will lead to very slow queries.
fields: comma separated list of fields to return. Example: "_id,data".

Raises:
httpx.HTTPStatusError: if request was unsuccessful.

Returns:
Data from dataset.
"""

response = self.http.get(
url=f"data/{provider}/{dataset}/",
params={
"query": json.dumps(query),
"sort": json.dumps(sort),
"fields": fields,
"limit": limit,
"skip": skip,
},
)

response.raise_for_status()

data = list(response.json())

return data

def insert(
self, provider: str, dataset: str, *, documents: Sequence[dict]
) -> InsertResult:
"""Inserts data using the endpoint POST 'data/{provider}/{dataset}/'.

Args:
provider: company name owning the dataset.
dataset: dataset name.
documents: data to insert.

Raises:
httpx.HTTPStatusError: if request was unsuccessful.

Returns:
Insert result.
"""

response = self.http.post(url=f"data/{provider}/{dataset}/", json=documents)

response.raise_for_status()

return InsertResult.parse_obj(response.json())

def replace(self, provider: str, dataset: str, id_: str, *, document: dict) -> dict:
"""Replaces all document data.

Replace all document data using the endpoint PUT
'data/{provider}/{dataset}/{id}/'.

Args:
provider: company name owning the dataset.
dataset: dataset name.
id_: document id to replace.
document: new document data.

Raises:
httpx.HTTPStatusError: if request was unsuccessful.

Returns:
Updated document.
"""

response = self.http.put(url=f"data/{provider}/{dataset}/{id_}/", json=document)

response.raise_for_status()

return response.json()


class PlatformApiV1Sdk:
def __init__(self, client: httpx.Client):
self.http = client


class PlatformApiV2Sdk:
def __init__(self, client: httpx.Client):
self.http = client


@dataclasses.dataclass(frozen=True)
class PlatformApiVersions:
v1: PlatformApiV1Sdk
v2: PlatformApiV2Sdk


@dataclasses.dataclass(frozen=True)
class DataApiVersions:
v1: DataApiV1Sdk


class UserApiSdk:
def __init__(
self,
platform_v1_url: str,
platform_v2_url: str,
data_api_url: str,
api_key: str,
app_key: str,
logger: logging.Logger = logging.getLogger(),
timeout: int = 30,
):
self._platform_v1_url = platform_v1_url
self._platform_v2_url = platform_v2_url
self._data_api_url = data_api_url
self._headers = {
"Authorization": f"API {api_key}",
"X-Corva-App": app_key,
}
self._logger = logger
self._timeout = timeout

def __enter__(self):
data_cli = httpx.Client(
base_url=self._data_api_url,
headers=self._headers,
timeout=self._timeout,
)
platform_v1_cli = httpx.Client(
base_url=self._platform_v1_url,
headers=self._headers,
timeout=self._timeout,
)
platform_v2_cli = httpx.Client(
base_url=self._platform_v2_url,
headers=self._headers,
timeout=self._timeout,
)

data_cli.send = logging_send(func=data_cli.send, logger=self._logger)
platform_v1_cli.send = logging_send(
func=platform_v1_cli.send, logger=self._logger
)
platform_v2_cli.send = logging_send(
func=platform_v2_cli.send, logger=self._logger
)

self.data = DataApiVersions(v1=DataApiV1Sdk(client=data_cli))
self.platform = PlatformApiVersions(
v1=PlatformApiV1Sdk(client=platform_v1_cli),
v2=PlatformApiV2Sdk(client=platform_v2_cli),
)

return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.data.v1.http.close()
self.platform.v1.http.close()
self.platform.v2.http.close()
25 changes: 25 additions & 0 deletions src/corva/configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import os

import pydantic

Expand All @@ -25,3 +26,27 @@ class Settings(pydantic.BaseSettings):


SETTINGS = Settings()


def get_test_api_key() -> str:
"""Api key for testing"""

return os.environ['TEST_API_KEY']


def get_test_bearer() -> str:
"""Bearer token for testing"""

return os.environ['TEST_BEARER_TOKEN']


def get_test_dataset() -> str:
"""Dataset for testing"""

return os.environ['TEST_DATASET']


def get_test_company_id() -> int:
"""Company id for testing"""

return int(os.environ['TEST_COMPANY_ID'])
Loading