Skip to content

Commit 5e7c10a

Browse files
authored
Feature: modification and creation date in the aggregate messages (#473)
Problem : To avoid fetching all the messages to find the creation/modification time of a specific aggregate entry it will be useful to have this info directly in the aggregate message Solution: Update '/api/v0/aggregates' Endpoint param : - with_info: bool default : False From : ``` { "address": "0xa1B3bb7d2332383D96b7796B908fB7f7F3c2Be10", "data": { "key1": ..., "key2": ..., } } ``` To: ``` { "address": "0xa1B3bb7d2332383D96b7796B908fB7f7F3c2Be10", "data": { "key1": ..., "key2": ..., } "info": { "key1": { "created": ..., "last_updated": ..., "original_item_hash": ..., "last_update_item_hash: ..., }, "key2": ... } } ``` Fixes #470.
1 parent 3b5a30e commit 5e7c10a

File tree

4 files changed

+127
-31
lines changed

4 files changed

+127
-31
lines changed

src/aleph/db/accessors/aggregates.py

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
11
import datetime as dt
2-
from typing import Optional, Iterable, Any, Dict, Tuple, Sequence
3-
2+
from typing import (
3+
Optional,
4+
Iterable,
5+
Any,
6+
Dict,
7+
Tuple,
8+
Sequence,
9+
overload,
10+
Literal,
11+
Union,
12+
)
413
from sqlalchemy import (
514
select,
615
delete,
@@ -22,20 +31,64 @@ def aggregate_exists(session: DbSession, key: str, owner: str) -> bool:
2231
)
2332

2433

34+
AggregateContent = Iterable[Tuple[str, Dict[str, Any]]]
35+
AggregateContentWithInfo = Iterable[Tuple[str, dt.datetime, dt.datetime, str, str]]
36+
37+
38+
@overload
39+
def get_aggregates_by_owner(
40+
session: Any,
41+
owner: str,
42+
with_info: Literal[False],
43+
keys: Optional[Sequence[str]] = None,
44+
) -> AggregateContent:
45+
...
46+
47+
48+
@overload
49+
def get_aggregates_by_owner(
50+
session: Any,
51+
owner: str,
52+
with_info: Literal[True],
53+
keys: Optional[Sequence[str]] = None,
54+
) -> AggregateContentWithInfo:
55+
...
56+
57+
58+
@overload
2559
def get_aggregates_by_owner(
26-
session: DbSession, owner: str, keys: Optional[Sequence[str]] = None
27-
) -> Iterable[Tuple[str, Dict[str, Any]]]:
60+
session, owner: str, with_info: bool, keys: Optional[Sequence[str]] = None
61+
) -> Union[AggregateContent, AggregateContentWithInfo]:
62+
...
63+
2864

65+
def get_aggregates_by_owner(session, owner, with_info, keys=None):
2966
where_clause = AggregateDb.owner == owner
3067
if keys:
3168
where_clause = where_clause & AggregateDb.key.in_(keys)
32-
33-
select_stmt = (
34-
select(AggregateDb.key, AggregateDb.content)
35-
.where(where_clause)
36-
.order_by(AggregateDb.key)
37-
)
38-
return session.execute(select_stmt).all() # type: ignore
69+
if with_info:
70+
query = (
71+
session.query(
72+
AggregateDb.key,
73+
AggregateDb.content,
74+
AggregateDb.creation_datetime.label("created"),
75+
AggregateElementDb.creation_datetime.label("last_updated"),
76+
AggregateDb.last_revision_hash.label("last_update_item_hash"),
77+
AggregateElementDb.item_hash.label("original_item_hash"),
78+
)
79+
.join(
80+
AggregateElementDb,
81+
AggregateDb.last_revision_hash == AggregateElementDb.item_hash,
82+
)
83+
.filter(AggregateDb.owner == owner)
84+
)
85+
else:
86+
query = (
87+
session.query(AggregateDb.key, AggregateDb.content)
88+
.filter(where_clause)
89+
.order_by(AggregateDb.key)
90+
)
91+
return query.all()
3992

4093

4194
def get_aggregate_by_key(
@@ -44,7 +97,6 @@ def get_aggregate_by_key(
4497
key: str,
4598
with_content: bool = True,
4699
) -> Optional[AggregateDb]:
47-
48100
options = []
49101

50102
if not with_content:
@@ -170,7 +222,6 @@ def mark_aggregate_as_dirty(session: DbSession, owner: str, key: str) -> None:
170222

171223

172224
def refresh_aggregate(session: DbSession, owner: str, key: str) -> None:
173-
174225
# Step 1: use a group by to retrieve the aggregate content. This uses a custom
175226
# aggregate function (see 78dd67881db4_jsonb_merge_aggregate.py).
176227
select_merged_aggregate_subquery = (

src/aleph/web/controllers/aggregates.py

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
import logging
2-
from typing import List, Optional, Any, Dict, Tuple
2+
import datetime as dt
3+
from typing import List, Optional, Any, Dict, Tuple, Literal, Sequence
34

45
from aiohttp import web
56
from pydantic import BaseModel, validator, ValidationError
67
from sqlalchemy import select
78

8-
from aleph.db.accessors.aggregates import get_aggregates_by_owner, refresh_aggregate
9+
from aleph.db.accessors.aggregates import (
10+
get_aggregates_by_owner,
11+
refresh_aggregate,
12+
)
913
from aleph.db.models import AggregateDb
1014
from .utils import LIST_FIELD_SEPARATOR
1115

@@ -17,6 +21,7 @@
1721
class AggregatesQueryParams(BaseModel):
1822
keys: Optional[List[str]] = None
1923
limit: int = DEFAULT_LIMIT
24+
with_info: bool = False
2025

2126
@validator(
2227
"keys",
@@ -33,17 +38,15 @@ async def address_aggregate(request: web.Request) -> web.Response:
3338
TODO: handle filter on a single key, or even subkey.
3439
"""
3540

36-
address = request.match_info["address"]
41+
address: str = request.match_info["address"]
3742

3843
try:
3944
query_params = AggregatesQueryParams.parse_obj(request.query)
4045
except ValidationError as e:
4146
raise web.HTTPUnprocessableEntity(
4247
text=e.json(), content_type="application/json"
4348
)
44-
4549
session_factory = request.app["session_factory"]
46-
4750
with session_factory() as session:
4851
dirty_aggregates = session.execute(
4952
select(AggregateDb.key).where(
@@ -57,9 +60,9 @@ async def address_aggregate(request: web.Request) -> web.Response:
5760
refresh_aggregate(session=session, owner=address, key=key)
5861
session.commit()
5962

60-
aggregates: List[Tuple[str, Dict[str, Any]]] = list(
63+
aggregates = list(
6164
get_aggregates_by_owner(
62-
session=session, owner=address, keys=query_params.keys
65+
session=session, owner=address, with_info=query_params.with_info
6366
)
6467
)
6568

@@ -68,6 +71,35 @@ async def address_aggregate(request: web.Request) -> web.Response:
6871

6972
output = {
7073
"address": address,
71-
"data": {result[0]: result[1] for result in aggregates},
74+
"data": {},
7275
}
76+
info: Dict = {}
77+
data: Dict = {}
78+
79+
for result in aggregates:
80+
data[result[0]] = result[1]
81+
if query_params.with_info:
82+
(
83+
aggregate_key,
84+
content,
85+
created,
86+
last_updated,
87+
original_item_hash,
88+
last_update_item_hash,
89+
) = result
90+
91+
if isinstance(created, dt.datetime):
92+
created = created.isoformat()
93+
if isinstance(last_updated, dt.datetime):
94+
last_updated = last_updated.isoformat()
95+
info[aggregate_key] = {
96+
"created": str(created),
97+
"last_updated": str(last_updated),
98+
"original_item_hash": str(original_item_hash),
99+
"last_update_item_hash": str(last_update_item_hash),
100+
}
101+
102+
output["data"] = data
103+
output["info"] = info
104+
73105
return web.json_response(output)

tests/api/test_aggregates.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,17 @@ def make_uri(address: str) -> str:
2626
return AGGREGATES_URI.format(address=address)
2727

2828

29-
async def get_aggregates(api_client, address: str, **params) -> aiohttp.ClientResponse:
29+
async def get_aggregates(
30+
api_client, address: str, with_info: bool, **params
31+
) -> aiohttp.ClientResponse:
32+
params["with_info"] = str(with_info)
3033
return await api_client.get(make_uri(address), params=params)
3134

3235

33-
async def get_aggregates_expect_success(api_client, address: str, **params):
34-
response = await get_aggregates(api_client, address, **params)
36+
async def get_aggregates_expect_success(
37+
api_client, address: str, with_info: bool, **params
38+
):
39+
response = await get_aggregates(api_client, address, with_info, **params)
3540
assert response.status == 200, await response.text()
3641
return await response.json()
3742

@@ -47,7 +52,7 @@ async def test_get_aggregates_no_update(
4752
assert fixture_aggregate_messages # To avoid unused parameter warnings
4853

4954
address = ADDRESS_2
50-
aggregates = await get_aggregates_expect_success(ccn_api_client, address)
55+
aggregates = await get_aggregates_expect_success(ccn_api_client, address, False)
5156

5257
assert aggregates["address"] == address
5358
assert aggregates["data"] == EXPECTED_AGGREGATES[address]
@@ -63,12 +68,17 @@ async def test_get_aggregates(
6368
assert fixture_aggregate_messages # To avoid unused parameter warnings
6469

6570
address = ADDRESS_1
66-
aggregates = await get_aggregates_expect_success(ccn_api_client, address)
71+
aggregates = await get_aggregates_expect_success(ccn_api_client, address, True)
6772

6873
assert address == aggregates["address"]
6974
assert aggregates["data"]["test_key"] == {"a": 1, "b": 2}
7075
assert aggregates["data"]["test_target"] == {"a": 1, "b": 2}
7176
assert aggregates["data"]["test_reference"] == {"a": 1, "b": 2, "c": 3, "d": 4}
77+
assert aggregates["info"]["test_reference"]
78+
assert (
79+
aggregates["info"]["test_reference"]["original_item_hash"]
80+
== fixture_aggregate_messages[1].item_hash
81+
)
7282

7383

7484
@pytest.mark.asyncio
@@ -83,15 +93,15 @@ async def test_get_aggregates_filter_by_key(
8393

8494
address, key = ADDRESS_1, "test_target"
8595
aggregates = await get_aggregates_expect_success(
86-
ccn_api_client, address=address, keys=key
96+
ccn_api_client, address=address, keys=key, with_info=False
8797
)
8898
assert aggregates["address"] == address
8999
assert aggregates["data"][key] == EXPECTED_AGGREGATES[address][key]
90100

91101
# Multiple keys
92102
address, keys = ADDRESS_1, ["test_target", "test_reference"]
93103
aggregates = await get_aggregates_expect_success(
94-
ccn_api_client, address=address, keys=",".join(keys)
104+
ccn_api_client, address=address, keys=",".join(keys), with_info=False
95105
)
96106
assert aggregates["address"] == address
97107
for key in keys:
@@ -114,7 +124,7 @@ async def test_get_aggregates_limit(
114124

115125
address, key = ADDRESS_1, "test_reference"
116126
aggregates = await get_aggregates_expect_success(
117-
ccn_api_client, address=address, keys=key, limit=1
127+
ccn_api_client, address=address, keys=key, limit=1, with_info=False
118128
)
119129
assert aggregates["address"] == address
120130
assert aggregates["data"][key] == {"c": 3, "d": 4}
@@ -131,7 +141,7 @@ async def test_get_aggregates_invalid_address(
131141

132142
invalid_address = "unknown"
133143

134-
response = await get_aggregates(ccn_api_client, invalid_address)
144+
response = await get_aggregates(ccn_api_client, invalid_address, False)
135145
assert response.status == 404
136146

137147

@@ -145,7 +155,9 @@ async def test_get_aggregates_invalid_params(
145155
assert fixture_aggregate_messages # To avoid unused parameter warnings
146156

147157
# A string as limit
148-
response = await get_aggregates(ccn_api_client, ADDRESS_1, limit="abc")
158+
response = await get_aggregates(
159+
ccn_api_client, ADDRESS_1, limit="abc", with_info=False
160+
)
149161
assert response.status == 422
150162
assert response.content_type == "application/json"
151163

tests/message_processing/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,4 @@ def message_processor(mocker, mock_config: Config, session_factory: DbSessionFac
9797
mq_conn=mocker.AsyncMock(),
9898
)
9999
return message_processor
100+

0 commit comments

Comments
 (0)