diff --git a/tests/node/test_wakunode_store.nim b/tests/node/test_wakunode_store.nim new file mode 100644 index 0000000000..d0e44634cc --- /dev/null +++ b/tests/node/test_wakunode_store.nim @@ -0,0 +1,1164 @@ +{.used.} + +import + std/options, + stew/shims/net as stewNet, + testutils/unittests, + chronos, + libp2p/crypto/crypto + +import + ../../../waku/[ + node/waku_node, + node/peer_manager, + waku_core, + waku_store, + waku_store/client, + waku_archive, + waku_archive/driver/sqlite_driver, + common/databases/db_sqlite + ], + ../waku_store/store_utils, + ../waku_archive/archive_utils, + ../testlib/[ + common, + wakucore, + wakunode, + testasync, + futures, + testutils + ] + + +suite "Waku Store - End to End - Sorted Archive": + var pubsubTopic {.threadvar.}: PubsubTopic + var contentTopic {.threadvar.}: ContentTopic + var contentTopicSeq {.threadvar.}: seq[ContentTopic] + + var archiveMessages {.threadvar.}: seq[WakuMessage] + var historyQuery {.threadvar.}: HistoryQuery + + var server {.threadvar.}: WakuNode + var client {.threadvar.}: WakuNode + + var archiveDriver {.threadvar.}: ArchiveDriver + var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo + var clientPeerId {.threadvar.}: PeerId + + asyncSetup: + pubsubTopic = DefaultPubsubTopic + contentTopic = DefaultContentTopic + contentTopicSeq = @[contentTopic] + + let timeOrigin = now() + archiveMessages = @[ + fakeWakuMessage(@[byte 00], ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 01], ts=ts(10, timeOrigin)), + fakeWakuMessage(@[byte 02], ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 03], ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 04], ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 05], ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 06], ts=ts(60, timeOrigin)), + fakeWakuMessage(@[byte 07], ts=ts(70, timeOrigin)), + fakeWakuMessage(@[byte 08], ts=ts(80, timeOrigin)), + fakeWakuMessage(@[byte 09], ts=ts(90, timeOrigin)) + ] + + historyQuery = HistoryQuery( + pubsubTopic: some(pubsubTopic), + contentTopics: contentTopicSeq, + direction: true, + pageSize: 5 + ) + + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + + server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + + archiveDriver = newArchiveDriverWithMessages(pubsubTopic, archiveMessages) + let mountArchiveResult = server.mountArchive(archiveDriver) + assert mountArchiveResult.isOk() + + waitFor server.mountStore() + client.mountStoreClient() + + waitFor allFutures(server.start(), client.start()) + + serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() + clientPeerId = client.peerInfo.toRemotePeerInfo().peerId + + asyncTeardown: + waitFor allFutures(client.stop(), server.stop()) + + suite "Message Pagination": + asyncTest "Forward Pagination": + # When making a history query + let queryResponse = await client.query(historyQuery, serverRemotePeerInfo) + + # Then the response contains the messages + check: + queryResponse.get().messages == archiveMessages[0..<5] + + # Given the next query + var otherHistoryQuery = HistoryQuery( + cursor: queryResponse.get().cursor, + pubsubTopic: some(pubsubTopic), + contentTopics: contentTopicSeq, + direction: true, + pageSize: 5 + ) + + # When making the next history query + let otherQueryResponse = await client.query(otherHistoryQuery, serverRemotePeerInfo) + + # Then the response contains the messages + check: + otherQueryResponse.get().messages == archiveMessages[5..<10] + + asyncTest "Backward Pagination": + # Given the history query is backward + historyQuery.direction = false + + # When making a history query + let queryResponse = await client.query(historyQuery, serverRemotePeerInfo) + + # Then the response contains the messages + check: + queryResponse.get().messages == archiveMessages[5..<10] + + # Given the next query + var nextHistoryQuery = HistoryQuery( + cursor: queryResponse.get().cursor, + pubsubTopic: some(pubsubTopic), + contentTopics: contentTopicSeq, + direction: false, + pageSize: 5 + ) + + # When making the next history query + let otherQueryResponse = await client.query(nextHistoryQuery, serverRemotePeerInfo) + + # Then the response contains the messages + check: + otherQueryResponse.get().messages == archiveMessages[0..<5] + + suite "Pagination with Differente Page Sizes": + asyncTest "Pagination with Small Page Size": + # Given the first query (1/5) + historyQuery.pageSize = 2 + + # When making a history query + let queryResponse1 = await client.query(historyQuery, serverRemotePeerInfo) + + # Then the response contains the messages + check: + queryResponse1.get().messages == archiveMessages[0..<2] + + # Given the next query (2/5) + let historyQuery2 = HistoryQuery( + cursor: queryResponse1.get().cursor, + pubsubTopic: some(pubsubTopic), + contentTopics: contentTopicSeq, + direction: true, + pageSize: 2 + ) + + # When making the next history query + let queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo) + + # Then the response contains the messages + check: + queryResponse2.get().messages == archiveMessages[2..<4] + + # Given the next query (3/5) + let historyQuery3 = HistoryQuery( + cursor: queryResponse2.get().cursor, + pubsubTopic: some(pubsubTopic), + contentTopics: contentTopicSeq, + direction: true, + pageSize: 2 + ) + + # When making the next history query + let queryResponse3 = await client.query(historyQuery3, serverRemotePeerInfo) + + # Then the response contains the messages + check: + queryResponse3.get().messages == archiveMessages[4..<6] + + # Given the next query (4/5) + let historyQuery4 = HistoryQuery( + cursor: queryResponse3.get().cursor, + pubsubTopic: some(pubsubTopic), + contentTopics: contentTopicSeq, + direction: true, + pageSize: 2 + ) + + # When making the next history query + let queryResponse4 = await client.query(historyQuery4, serverRemotePeerInfo) + + # Then the response contains the messages + check: + queryResponse4.get().messages == archiveMessages[6..<8] + + # Given the next query (5/5) + let historyQuery5 = HistoryQuery( + cursor: queryResponse4.get().cursor, + pubsubTopic: some(pubsubTopic), + contentTopics: contentTopicSeq, + direction: true, + pageSize: 2 + ) + + # When making the next history query + let queryResponse5 = await client.query(historyQuery5, serverRemotePeerInfo) + + # Then the response contains the messages + check: + queryResponse5.get().messages == archiveMessages[8..<10] + + asyncTest "Pagination with Large Page Size": + # Given the first query (1/2) + historyQuery.pageSize = 8 + + # When making a history query + let queryResponse1 = await client.query(historyQuery, serverRemotePeerInfo) + + # Then the response contains the messages + check: + queryResponse1.get().messages == archiveMessages[0..<8] + + # Given the next query (2/2) + let historyQuery2 = HistoryQuery( + cursor: queryResponse1.get().cursor, + pubsubTopic: some(pubsubTopic), + contentTopics: contentTopicSeq, + direction: true, + pageSize: 8 + ) + + # When making the next history query + let queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo) + + # Then the response contains the messages + check: + queryResponse2.get().messages == archiveMessages[8..<10] + + asyncTest "Pagination with Excessive Page Size": + # Given the first query (1/1) + historyQuery.pageSize = 100 + + # When making a history query + let queryResponse1 = await client.query(historyQuery, serverRemotePeerInfo) + + # Then the response contains the messages + check: + queryResponse1.get().messages == archiveMessages[0..<10] + + asyncTest "Pagination with Mixed Page Size": + # Given the first query (1/3) + historyQuery.pageSize = 2 + + # When making a history query + let queryResponse1 = await client.query(historyQuery, serverRemotePeerInfo) + + # Then the response contains the messages + check: + queryResponse1.get().messages == archiveMessages[0..<2] + + # Given the next query (2/3) + let historyQuery2 = HistoryQuery( + cursor: queryResponse1.get().cursor, + pubsubTopic: some(pubsubTopic), + contentTopics: contentTopicSeq, + direction: true, + pageSize: 4 + ) + + # When making the next history query + let queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo) + + # Then the response contains the messages + check: + queryResponse2.get().messages == archiveMessages[2..<6] + + # Given the next query (3/3) + let historyQuery3 = HistoryQuery( + cursor: queryResponse2.get().cursor, + pubsubTopic: some(pubsubTopic), + contentTopics: contentTopicSeq, + direction: true, + pageSize: 6 + ) + + # When making the next history query + let queryResponse3 = await client.query(historyQuery3, serverRemotePeerInfo) + + # Then the response contains the messages + check: + queryResponse3.get().messages == archiveMessages[6..<10] + + asyncTest "Pagination with Zero Page Size (Behaves as DefaultPageSize)": + # Given a message list of size higher than the default page size + let currentStoreLen = uint((await archiveDriver.getMessagesCount()).get()) + assert archive.DefaultPageSize > currentStoreLen, "This test requires a store with more than (DefaultPageSize) messages" + let missingMessagesAmount = archive.DefaultPageSize - currentStoreLen + 5 + + let lastMessageTimestamp = archiveMessages[archiveMessages.len - 1].timestamp + var extraMessages: seq[WakuMessage] = @[] + for i in 0.. currentStoreLen, "This test requires a store with more than (DefaultPageSize) messages" + let missingMessagesAmount = archive.DefaultPageSize - currentStoreLen + 5 + + let lastMessageTimestamp = archiveMessages[archiveMessages.len - 1].timestamp + var extraMessages: seq[WakuMessage] = @[] + for i in 0..