Skip to content

Commit

Permalink
refactor(store): HistoryQuery.direction (#2263)
Browse files Browse the repository at this point in the history
* Fix issue with default history query ascending value in serde operations: Should use the same value.
* Update direction types to PagingDirection.
  • Loading branch information
AlejandroCabeza authored Dec 19, 2023
1 parent 8b37919 commit fae20bf
Show file tree
Hide file tree
Showing 15 changed files with 97 additions and 52 deletions.
7 changes: 4 additions & 3 deletions tests/waku_archive/test_waku_archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import

import
../../../waku/common/databases/db_sqlite,
../../../waku/common/paging,
../../../waku/waku_core,
../../../waku/waku_core/message/digest,
../../../waku/waku_archive/driver/sqlite_driver,
Expand Down Expand Up @@ -341,7 +342,7 @@ procSuite "Waku Archive - find messages":
## Given
let req = ArchiveQuery(
pageSize: 4,
ascending: true
direction: PagingDirection.FORWARD
)

## When
Expand Down Expand Up @@ -377,7 +378,7 @@ procSuite "Waku Archive - find messages":
## Given
let req = ArchiveQuery(
pageSize: 4,
ascending: false # backward
direction: PagingDirection.BACKWARD
)

## When
Expand Down Expand Up @@ -454,7 +455,7 @@ procSuite "Waku Archive - find messages":
contentTopics: @[ContentTopic("1")],
startTime: some(ts(15, timeOrigin)),
endTime: some(ts(55, timeOrigin)),
ascending: true
direction: PagingDirection.FORWARD
)

## When
Expand Down
7 changes: 4 additions & 3 deletions tests/waku_store/test_rpc_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import
chronos
import
../../../waku/common/protobuf,
../../../waku/common/paging,
../../../waku/waku_core,
../../../waku/waku_store/rpc,
../../../waku/waku_store/rpc_codec,
Expand Down Expand Up @@ -53,7 +54,7 @@ procSuite "Waku Store - RPC codec":
## Given
let
index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.FORWARD))
pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirection.FORWARD))

## When
let pb = pagingInfo.encode()
Expand Down Expand Up @@ -88,7 +89,7 @@ procSuite "Waku Store - RPC codec":
## Given
let
index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.BACKWARD))
pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirection.BACKWARD))
query = HistoryQueryRPC(
contentFilters: @[HistoryContentFilterRPC(contentTopic: DefaultContentTopic), HistoryContentFilterRPC(contentTopic: DefaultContentTopic)],
pagingInfo: some(pagingInfo),
Expand Down Expand Up @@ -129,7 +130,7 @@ procSuite "Waku Store - RPC codec":
let
message = fakeWakuMessage()
index = PagingIndexRPC.compute(message, receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.BACKWARD))
pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirection.BACKWARD))
res = HistoryResponseRPC(messages: @[message], pagingInfo: some(pagingInfo), error: HistoryResponseErrorRPC.INVALID_CURSOR)

## When
Expand Down
5 changes: 3 additions & 2 deletions tests/waku_store/test_waku_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import

import
../../../waku/[
common/paging,
node/peer_manager,
waku_core,
waku_store,
Expand Down Expand Up @@ -46,7 +47,7 @@ suite "Waku Store - query handler":
server = await newTestWakuStore(serverSwitch, handler=queryhandler)
client = newTestWakuStoreClient(clientSwitch)

let req = HistoryQuery(contentTopics: @[DefaultContentTopic], ascending: true)
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], direction: PagingDirection.FORWARD)

## When
let queryRes = await client.query(req, peer=serverPeerInfo)
Expand Down Expand Up @@ -88,7 +89,7 @@ suite "Waku Store - query handler":
server = await newTestWakuStore(serverSwitch, handler=queryhandler)
client = newTestWakuStoreClient(clientSwitch)

let req = HistoryQuery(contentTopics: @[DefaultContentTopic], ascending: true)
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], direction: PagingDirection.FORWARD)

## When
let queryRes = await client.query(req, peer=serverPeerInfo)
Expand Down
5 changes: 3 additions & 2 deletions tests/waku_store/test_wakunode_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import
libp2p/protocols/pubsub/gossipsub
import
../../../waku/common/databases/db_sqlite,
../../../waku/common/paging,
../../../waku/waku_core,
../../../waku/waku_core/message/digest,
../../../waku/node/peer_manager,
Expand Down Expand Up @@ -107,7 +108,7 @@ procSuite "WakuNode - Store":
client.mountStoreClient()

## Given
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, ascending: true)
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, direction: PagingDirection.FORWARD)
let serverPeer = server.peerInfo.toRemotePeerInfo()

## When
Expand Down Expand Up @@ -158,7 +159,7 @@ procSuite "WakuNode - Store":
client.mountStoreClient()

## Given
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, ascending: false)
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, direction: PagingDirection.BACKWARD)
let serverPeer = server.peerInfo.toRemotePeerInfo()

## When
Expand Down
35 changes: 35 additions & 0 deletions waku/common/paging.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import std/options

type
PagingDirection* {.pure.} = enum
## PagingDirection determines the direction of pagination
BACKWARD = uint32(0)
FORWARD = uint32(1)


proc default*(): PagingDirection {.inline.} =
PagingDirection.FORWARD


proc into*(b: bool): PagingDirection =
PagingDirection(b)


proc into*(b: Option[bool]): PagingDirection =
if b.isNone():
return default()
b.get().into()


proc into*(d: PagingDirection): bool =
d == PagingDirection.FORWARD


proc into*(d: Option[PagingDirection]): bool =
if d.isNone():
return false
d.get().into()


proc into*(s: string): PagingDirection =
(s == "true").into()
2 changes: 1 addition & 1 deletion waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ proc toArchiveQuery(request: HistoryQuery): ArchiveQuery =
startTime: request.startTime,
endTime: request.endTime,
pageSize: request.pageSize.uint,
ascending: request.ascending
direction: request.direction
)

# TODO: Review this mapping logic. Maybe, move it to the appplication code
Expand Down
19 changes: 10 additions & 9 deletions waku/waku_api/jsonrpc/store/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import
chronicles,
json_rpc/rpcserver
import
../../../waku_core,
../../../waku_store,
../../../[
waku_core,
waku_store,
waku_node
],
../../../waku_store/rpc,
../../../waku_node,
../../../node/peer_manager,
../../../common/paging,
./types


Expand All @@ -27,16 +30,14 @@ proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfoRPC =
PagingInfoRPC(
pageSize: some(pagingOptions.pageSize),
cursor: pagingOptions.cursor,
direction: if pagingOptions.forward: some(PagingDirectionRPC.FORWARD)
else: some(PagingDirectionRPC.BACKWARD)
direction: some(pagingOptions.forward.into())
)

proc toPagingOptions*(pagingInfo: PagingInfoRPC): StorePagingOptions =
StorePagingOptions(
pageSize: pagingInfo.pageSize.get(0'u64),
cursor: pagingInfo.cursor,
forward: if pagingInfo.direction.isNone(): true
else: pagingInfo.direction.get() == PagingDirectionRPC.FORWARD
forward: pagingInfo.direction.into()
)

proc toJsonRPCStoreResponse*(response: HistoryResponse): StoreResponse =
Expand Down Expand Up @@ -65,8 +66,8 @@ proc installStoreApiHandlers*(node: WakuNode, server: RpcServer) =
contentTopics: contentFiltersOption.get(@[]).mapIt(it.contentTopic),
startTime: startTime,
endTime: endTime,
ascending: if pagingOptions.isNone(): true
else: pagingOptions.get().forward,
direction: if pagingOptions.isNone(): default()
else: pagingOptions.get().forward.into(),
pageSize: if pagingOptions.isNone(): DefaultPageSize
else: min(pagingOptions.get().pageSize, MaxPageSize),
cursor: if pagingOptions.isNone(): none(HistoryCursor)
Expand Down
11 changes: 6 additions & 5 deletions waku/waku_api/rest/store/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import
../../../waku_store/common,
../../../waku_node,
../../../node/peer_manager,
../../../common/paging,
../../handlers,
../responses,
../serdes,
Expand Down Expand Up @@ -123,7 +124,7 @@ proc createHistoryQuery(pubsubTopic: Option[string],
startTime: Option[string],
endTime: Option[string],
pageSize: Option[string],
ascending: Option[string]):
direction: Option[string]):

Result[HistoryQuery, string] =

Expand Down Expand Up @@ -164,16 +165,16 @@ proc createHistoryQuery(pubsubTopic: Option[string],
let parsedEndTime = ? parseTime(endTime)

# Parse ascending field
var parsedAscending = true
if ascending.isSome() and ascending.get() != "":
parsedAscending = ascending.get() == "true"
var parsedDirection = default()
if direction.isSome() and direction.get() != "":
parsedDirection = direction.get().into()

return ok(
HistoryQuery(pubsubTopic: parsedPubsubTopic,
contentTopics: parsedContentTopics,
startTime: parsedStartTime,
endTime: parsedEndTime,
ascending: parsedAscending,
direction: parsedDirection,
pageSize: parsedPagedSize,
cursor: parsedCursor
))
Expand Down
3 changes: 2 additions & 1 deletion waku/waku_api/rest/store/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ paths:
schema:
type: string
description: >
"true" for paging forward, "false" for paging backward
"true" for paging forward, "false" for paging backward.
If not specified or if specified with an invalid value, the default is "true".
example: "true"

responses:
Expand Down
13 changes: 8 additions & 5 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import
regex,
metrics
import
../common/databases/dburl,
../common/databases/db_sqlite,
../common/[
databases/dburl,
databases/db_sqlite,
paging
],
./driver,
./retention_policy,
./retention_policy/retention_policy_capacity,
Expand Down Expand Up @@ -131,7 +134,7 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {
qEndTime = query.endTime
qMaxPageSize = if query.pageSize <= 0: DefaultPageSize
else: min(query.pageSize, MaxPageSize)
qAscendingOrder = query.ascending
isAscendingOrder = query.direction.into()

if qContentTopics.len > 10:
return err(ArchiveError.invalidQuery("too many content topics"))
Expand All @@ -145,7 +148,7 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {
startTime = qStartTime,
endTime = qEndTime,
maxPageSize = qMaxPageSize + 1,
ascendingOrder = qAscendingOrder
ascendingOrder = isAscendingOrder
)

let queryDuration = getTime().toUnixFloat() - queryStartTime
Expand Down Expand Up @@ -188,7 +191,7 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {
))

# All messages MUST be returned in chronological order
if not qAscendingOrder:
if not isAscendingOrder:
reverse(messages)

return ok(ArchiveResponse(messages: messages, cursor: cursor))
Expand Down
5 changes: 3 additions & 2 deletions waku/waku_archive/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import
stew/byteutils,
nimcrypto/sha2
import
../waku_core
../waku_core,
../common/paging


## Waku message digest
Expand Down Expand Up @@ -54,7 +55,7 @@ type
startTime*: Option[Timestamp]
endTime*: Option[Timestamp]
pageSize*: uint
ascending*: bool
direction*: PagingDirection

ArchiveResponse* = object
messages*: seq[WakuMessage]
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_store/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ when defined(waku_exp_store_resume):
startTime: some(queryStartTime),
endTime: some(queryEndTime),
pageSize: uint64(pageSize),
ascending: true
direction: default()
)

var res: WakuStoreResult[seq[WakuMessage]]
Expand Down
5 changes: 3 additions & 2 deletions waku/waku_store/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import
stew/byteutils,
nimcrypto/sha2
import
../waku_core
../waku_core,
../common/paging


const
Expand Down Expand Up @@ -55,7 +56,7 @@ type
startTime*: Option[Timestamp]
endTime*: Option[Timestamp]
pageSize*: uint64
ascending*: bool
direction*: PagingDirection

HistoryResponse* = object
messages*: seq[WakuMessage]
Expand Down
Loading

0 comments on commit fae20bf

Please sign in to comment.