Skip to content

Feature: SQLite-based Message Cache #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
86df08f
Feature : AlephDNS (#47)
1yam Aug 25, 2023
ba9f876
Problem: @app.on_event("startup") would not be triggered on uvicorn s…
MHHukiewitz Apr 19, 2023
d046703
simplify branching
MHHukiewitz Apr 19, 2023
b216d38
add peewee/sqlite-based message cache DB
MHHukiewitz May 21, 2023
2b65f74
implement cache
MHHukiewitz May 22, 2023
4551ab5
improve test and batch caching
MHHukiewitz May 22, 2023
7c1751a
add cache input stream
MHHukiewitz May 22, 2023
3377730
fix formatting
MHHukiewitz May 22, 2023
1f667ff
clean up imports
MHHukiewitz May 22, 2023
6630e55
add query method to cache
MHHukiewitz May 23, 2023
fd9cefa
change default cache DB to :memory:
MHHukiewitz May 23, 2023
056bb48
fix formatting
MHHukiewitz May 23, 2023
e2038b0
create interface from AlephClient and let MessageCache implement it
MHHukiewitz May 24, 2023
d33c2f0
fix mypy error
MHHukiewitz May 30, 2023
4ee541f
add node
MHHukiewitz May 30, 2023
40200bb
refactor interface; implement node methods; add download_file() metho…
MHHukiewitz May 30, 2023
a938d79
add peewee to testing requirements
MHHukiewitz Jun 6, 2023
6781bae
fix ProgramMessage parsing
MHHukiewitz Jun 6, 2023
08104c2
fix formatting
MHHukiewitz Jun 6, 2023
7be2eeb
implement suggestions: use JSONField, annotate types, rename interfac…
MHHukiewitz Jun 7, 2023
ad2af12
fix formatting and node.py
MHHukiewitz Jun 7, 2023
5b7a5fb
Update src/aleph/sdk/conf.py
MHHukiewitz Jun 6, 2023
6c686b5
allow iterables
MHHukiewitz Jun 7, 2023
d19175f
fix CACHE_DATABASE_PATH usage
MHHukiewitz Jun 7, 2023
2f30800
close DB connection when all Cache instances are deleted
MHHukiewitz Jun 21, 2023
02ff4d0
fix MessageCache.listen_to()
MHHukiewitz Aug 12, 2023
db33997
fix formatting
MHHukiewitz Aug 12, 2023
73f0bbe
fix formatting
MHHukiewitz Aug 12, 2023
207aaa1
fix cache and increase test coverage for cache.py and node.py
MHHukiewitz Aug 15, 2023
2f2e07d
increase test coverage, add CACHE_FILES_PATH to settings
MHHukiewitz Aug 15, 2023
634e927
move MessageCache to node.py for clearer naming and less ambiguity
MHHukiewitz Aug 15, 2023
6cd9aef
fix compatibility with python < v3.10
MHHukiewitz Aug 17, 2023
0a44e9d
add PostsResponse; harmonize get_messages and watch_messages paramete…
MHHukiewitz Aug 23, 2023
af810a2
fix code quality issues from previous PRs
MHHukiewitz Aug 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor interface; implement node methods; add download_file() metho…
…d to cache as
  • Loading branch information
MHHukiewitz committed Aug 28, 2023
commit 40200bb35be1cf51a32ca5f82315e37561edec8b
3 changes: 3 additions & 0 deletions src/aleph/sdk/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ def __getitem__(self, item_hash) -> Optional[AlephMessage]:
def __setitem__(self, item_hash, message: AlephMessage):
MessageModel.insert(**message_to_model(message)).on_conflict_replace().execute()

def __delitem__(self, item_hash):
MessageModel.delete().where(MessageModel.item_hash == item_hash).execute()

def __contains__(self, item_hash):
return MessageModel.select().where(MessageModel.item_hash == item_hash).exists()

Expand Down
18 changes: 18 additions & 0 deletions src/aleph/sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,11 @@ async def __aenter__(self) -> "AuthenticatedAlephClient":
return self

async def ipfs_push(self, content: Mapping) -> str:
"""
Push arbitrary content as JSON to the IPFS service.

:param content: The dict-like content to upload
"""
url = "/api/v0/ipfs/add_json"
logger.debug(f"Pushing to IPFS on {url}")

Expand All @@ -906,6 +911,11 @@ async def ipfs_push(self, content: Mapping) -> str:
return (await resp.json()).get("hash")

async def storage_push(self, content: Mapping) -> str:
"""
Push arbitrary content as JSON to the storage service.

:param content: The dict-like content to upload
"""
url = "/api/v0/storage/add_json"
logger.debug(f"Pushing to storage on {url}")

Expand All @@ -914,6 +924,11 @@ async def storage_push(self, content: Mapping) -> str:
return (await resp.json()).get("hash")

async def ipfs_push_file(self, file_content: Union[str, bytes]) -> str:
"""
Push a file to the IPFS service.

:param file_content: The file content to upload
"""
data = aiohttp.FormData()
data.add_field("file", file_content)

Expand All @@ -925,6 +940,9 @@ async def ipfs_push_file(self, file_content: Union[str, bytes]) -> str:
return (await resp.json()).get("hash")

async def storage_push_file(self, file_content) -> str:
"""
Push a file to the storage service.
"""
data = aiohttp.FormData()
data.add_field("file", file_content)

Expand Down
141 changes: 106 additions & 35 deletions src/aleph/sdk/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
Union,
)

from aleph_message.models import AlephMessage, MessagesResponse, MessageType
from aleph_message.models import (
AlephMessage,
MessagesResponse,
MessageType,
PostMessage,
)
from aleph_message.models.program import Encoding
from aleph_message.status import MessageStatus

Expand Down Expand Up @@ -89,6 +94,53 @@ async def get_posts(
"""
pass

async def get_posts_iterator(
self,
types: Optional[Iterable[str]] = None,
refs: Optional[Iterable[str]] = None,
addresses: Optional[Iterable[str]] = None,
tags: Optional[Iterable[str]] = None,
hashes: Optional[Iterable[str]] = None,
channels: Optional[Iterable[str]] = None,
chains: Optional[Iterable[str]] = None,
start_date: Optional[Union[datetime, float]] = None,
end_date: Optional[Union[datetime, float]] = None,
) -> AsyncIterable[PostMessage]:
"""
Fetch all filtered posts, returning an async iterator and fetching them page by page. Might return duplicates
but will always return all posts.

:param types: Types of posts to fetch (Default: all types)
:param refs: If set, only fetch posts that reference these hashes (in the "refs" field)
:param addresses: Addresses of the posts to fetch (Default: all addresses)
:param tags: Tags of the posts to fetch (Default: all tags)
:param hashes: Specific item_hashes to fetch
:param channels: Channels of the posts to fetch (Default: all channels)
:param chains: Chains of the posts to fetch (Default: all chains)
:param start_date: Earliest date to fetch messages from
:param end_date: Latest date to fetch messages from
"""
total_items = None
per_page = self.get_posts.__kwdefaults__["pagination"]
page = 1
while total_items is None or page * per_page < total_items:
resp = await self.get_posts(
page=page,
types=types,
refs=refs,
addresses=addresses,
tags=tags,
hashes=hashes,
channels=channels,
chains=chains,
start_date=start_date,
end_date=end_date,
)
total_items = resp["pagination_total"]
page += 1
for post in resp["posts"]:
yield post

@abstractmethod
async def download_file(
self,
Expand Down Expand Up @@ -143,6 +195,59 @@ async def get_messages(
"""
pass

async def get_messages_iterator(
self,
message_type: Optional[MessageType] = None,
content_types: Optional[Iterable[str]] = None,
content_keys: Optional[Iterable[str]] = None,
refs: Optional[Iterable[str]] = None,
addresses: Optional[Iterable[str]] = None,
tags: Optional[Iterable[str]] = None,
hashes: Optional[Iterable[str]] = None,
channels: Optional[Iterable[str]] = None,
chains: Optional[Iterable[str]] = None,
start_date: Optional[Union[datetime, float]] = None,
end_date: Optional[Union[datetime, float]] = None,
) -> AsyncIterable[AlephMessage]:
"""
Fetch all filtered messages, returning an async iterator and fetching them page by page. Might return duplicates
but will always return all messages.

:param message_type: Filter by message type, can be "AGGREGATE", "POST", "PROGRAM", "VM", "STORE" or "FORGET"
:param content_types: Filter by content type
:param content_keys: Filter by content key
:param refs: If set, only fetch posts that reference these hashes (in the "refs" field)
:param addresses: Addresses of the posts to fetch (Default: all addresses)
:param tags: Tags of the posts to fetch (Default: all tags)
:param hashes: Specific item_hashes to fetch
:param channels: Channels of the posts to fetch (Default: all channels)
:param chains: Filter by sender address chain
:param start_date: Earliest date to fetch messages from
:param end_date: Latest date to fetch messages from
"""
total_items = None
per_page = self.get_messages.__kwdefaults__["pagination"]
page = 1
while total_items is None or page * per_page < total_items:
resp = await self.get_messages(
page=page,
message_type=message_type,
content_types=content_types,
content_keys=content_keys,
refs=refs,
addresses=addresses,
tags=tags,
hashes=hashes,
channels=channels,
chains=chains,
start_date=start_date,
end_date=end_date,
)
total_items = resp.pagination_total
page += 1
for message in resp.messages:
yield message

@abstractmethod
async def get_message(
self,
Expand Down Expand Up @@ -191,40 +296,6 @@ def watch_messages(


class AuthenticatedAlephClientInterface(AlephClientInterface):
@abstractmethod
async def ipfs_push(self, content: Mapping) -> str:
"""
Push arbitrary content as JSON to the IPFS service.

:param content: The dict-like content to upload
"""
pass

@abstractmethod
async def storage_push(self, content: Mapping) -> str:
"""
Push arbitrary content as JSON to the storage service.

:param content: The dict-like content to upload
"""
pass

@abstractmethod
async def ipfs_push_file(self, file_content: Union[str, bytes]) -> str:
"""
Push a file to the IPFS service.

:param file_content: The file content to upload
"""
pass

@abstractmethod
async def storage_push_file(self, file_content: Union[str, bytes]) -> str:
"""
Push a file to the storage service.
"""
pass

@abstractmethod
async def create_post(
self,
Expand Down
Loading