Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIP-114: ids_only filter #99

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ This will download all missing events from the remote relay and insert them into

Instead of a "full DB" sync, you can also sync the result of a nostr filter (or multiple filters, use a JSON array of them):

./strfry sync wss://relay.example.com '{"authors":["003b"]}'
./strfry sync wss://relay.example.com --filter '{"authors":["003b"]}'

Warning: Syncing can consume a lot of memory and bandwidth if the DBs are highly divergent (for example if your local DB is empty and your filter matches many events).

Expand Down
20 changes: 20 additions & 0 deletions src/apps/relay/RelayIngester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
} catch (std::exception &e) {
sendNoticeError(msg->connId, std::string("bad req: ") + e.what());
}
} else if (cmd == "GET") {
if (cfg().relay__logging__dumpInReqs) LI << "[" << msg->connId << "] dumpInReq: " << msg->payload;

try {
ingesterProcessGet(txn, decomp, msg->connId, arr);
} catch (std::exception &e) {
sendNoticeError(msg->connId, std::string("bad get: ") + e.what());
}
} else if (cmd == "CLOSE") {
if (cfg().relay__logging__dumpInReqs) LI << "[" << msg->connId << "] dumpInReq: " << msg->payload;

Expand Down Expand Up @@ -124,6 +132,18 @@ void RelayServer::ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao:
tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::NewSub{std::move(sub)}});
}

void RelayServer::ingesterProcessGet(lmdb::txn &txn, Decompressor &decomp, uint64_t connId, const tao::json::value &arr) {
if (arr.get_array().size() != 2) throw herr("GET arr size != 2");

auto ev = lookupEventById(txn, from_hex(arr[1].get_string()));
if (!ev) {
sendNoticeError(connId, std::string("GET event not found"));
} else {
auto evJson = getEventJson(txn, decomp, ev->primaryKeyId);
sendEvent(connId, SubId("*"), evJson);
}
}

void RelayServer::ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) {
if (arr.get_array().size() != 2) throw herr("arr too small/big");

Expand Down
7 changes: 6 additions & 1 deletion src/apps/relay/RelayReqMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ void RelayServer::runReqMonitor(ThreadPool<MsgReqMonitor>::Thread &thr) {

env.foreach_Event(txn, [&](auto &ev){
if (msg->sub.filterGroup.doesMatch(ev.flat_nested())) {
sendEvent(connId, msg->sub.subId, getEventJson(txn, decomp, ev.primaryKeyId));
if (msg->sub.filterGroup.ids_only()) {
auto id = to_hex(sv(ev.flat_nested()->id()));
sendHave(connId, msg->sub.subId, id);
} else {
sendEvent(connId, msg->sub.subId, getEventJson(txn, decomp, ev.primaryKeyId));
}
}

return true;
Expand Down
12 changes: 9 additions & 3 deletions src/apps/relay/RelayReqWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@ void RelayServer::runReqWorker(ThreadPool<MsgReqWorker>::Thread &thr) {
Decompressor decomp;
QueryScheduler queries;

queries.onEvent = [&](lmdb::txn &txn, const auto &sub, uint64_t levId, std::string_view eventPayload){
sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr));
queries.onEvent = [&](lmdb::txn &txn, const auto &sub, uint64_t levId, std::string_view eventPayload) {
if (sub.filterGroup.ids_only()) {
auto ev = lookupEventByLevId(txn, levId);
auto id = to_hex(sv(ev.flat_nested()->id()));
sendHave(sub.connId, sub.subId, id);
} else {
sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr));
}
};

queries.onComplete = [&](lmdb::txn &, Subscription &sub){
queries.onComplete = [&](lmdb::txn &, Subscription &sub) {
sendToConn(sub.connId, tao::json::to_string(tao::json::value::array({ "EOSE", sub.subId.str() })));
tpReqMonitor.dispatch(sub.connId, MsgReqMonitor{MsgReqMonitor::NewSub{std::move(sub)}});
};
Expand Down
16 changes: 16 additions & 0 deletions src/apps/relay/RelayServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ struct RelayServer {
void runIngester(ThreadPool<MsgIngester>::Thread &thr);
void ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::string ipAddr, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector<MsgWriter> &output);
void ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson);
void ingesterProcessGet(lmdb::txn &txn, Decompressor &decomp, uint64_t connId, const tao::json::value &origJson);
void ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson);
void ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp, uint64_t connId, const tao::json::value &origJson);

Expand Down Expand Up @@ -211,6 +212,21 @@ struct RelayServer {
sendToConn(connId, std::move(reply));
}

void sendHave(uint64_t connId, const SubId &subId, const std::string_view eventId) {
auto subIdSv = subId.sv();

std::string reply;
reply.reserve(14 + subIdSv.size() + eventId.size());

reply += "[\"HAVE\",\"";
reply += subIdSv;
reply += "\",\"";
reply += eventId;
reply += "\"]";

sendToConn(connId, std::move(reply));
}

void sendEventToBatch(RecipientList &&list, std::string &&evJson) {
tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::SendEventToBatch{std::move(list), std::move(evJson)}});
hubTrigger->send();
Expand Down
2 changes: 1 addition & 1 deletion src/apps/relay/RelayWebsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
tempBuf.reserve(cfg().events__maxEventSize + MAX_SUBID_SIZE + 100);


tao::json::value supportedNips = tao::json::value::array({ 1, 2, 4, 9, 11, 20, 22, 28, 40, 70 });
tao::json::value supportedNips = tao::json::value::array({ 1, 2, 4, 9, 11, 20, 22, 28, 40, 70, 114 });

auto getServerInfoHttpResponse = [&supportedNips, ver = uint64_t(0), rendered = std::string("")]() mutable {
if (ver != cfg().version()) {
Expand Down
11 changes: 11 additions & 0 deletions src/filters.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ struct NostrFilter {
uint64_t limit = MAX_U64;
bool neverMatch = false;
bool indexOnlyScans = false;
bool idsOnly = false;

explicit NostrFilter(const tao::json::value &filterObj, uint64_t maxFilterLimit) {
uint64_t numMajorFields = 0;
Expand Down Expand Up @@ -154,6 +155,8 @@ struct NostrFilter {
until = v.get_unsigned();
} else if (k == "limit") {
limit = v.get_unsigned();
} else if (k == "ids_only") {
idsOnly = v.get_boolean();
} else {
throw herr("unrecognised filter item");
}
Expand Down Expand Up @@ -248,6 +251,14 @@ struct NostrFilterGroup {
return false;
}

bool ids_only() const {
for (const auto &f : filters) {
if (f.idsOnly) return true;
}

return false;
}

size_t size() const {
return filters.size();
}
Expand Down