From dbb26f92021e0a1048967baae5328a45ff896c09 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsumoto Date: Thu, 28 Sep 2023 08:57:02 +0900 Subject: [PATCH 01/10] logging connection informations --- handlers.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/handlers.go b/handlers.go index e73efc1..737edb3 100644 --- a/handlers.go +++ b/handlers.go @@ -56,6 +56,8 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { ticker := time.NewTicker(pingPeriod) stop := make(chan struct{}) + s.Log.Infof("connected from %s", conn.RemoteAddr().String()) + // NIP-42 challenge challenge := make([]byte, 8) rand.Read(challenge) @@ -85,6 +87,7 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { removeListener(ws) } s.clientsMu.Unlock() + s.Log.Infof("diconnected from %s", conn.RemoteAddr().String()) }() conn.SetReadLimit(maxMessageSize) @@ -387,6 +390,7 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { s.Log.Errorf("error writing ping: %v; closing websocket", err) return } + s.Log.Infof("pinging for %s", conn.RemoteAddr().String()) case <-stop: return } From 18c8a56135d36743b33c210c59d46d798070b94e Mon Sep 17 00:00:00 2001 From: mattn Date: Thu, 28 Sep 2023 09:56:57 +0900 Subject: [PATCH 02/10] Update handlers.go --- handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/handlers.go b/handlers.go index 737edb3..f205ef8 100644 --- a/handlers.go +++ b/handlers.go @@ -87,7 +87,7 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { removeListener(ws) } s.clientsMu.Unlock() - s.Log.Infof("diconnected from %s", conn.RemoteAddr().String()) + s.Log.Infof("disconnected from %s", conn.RemoteAddr().String()) }() conn.SetReadLimit(maxMessageSize) From 1740e67512ce5ff509c198c229b0825a131b5eb9 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsumoto Date: Thu, 28 Sep 2023 10:39:22 +0900 Subject: [PATCH 03/10] read stop channel --- handlers.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/handlers.go b/handlers.go index 737edb3..1ea8f45 100644 --- a/handlers.go +++ b/handlers.go @@ -87,7 +87,7 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { removeListener(ws) } s.clientsMu.Unlock() - s.Log.Infof("diconnected from %s", conn.RemoteAddr().String()) + s.Log.Infof("disconnected from %s", conn.RemoteAddr().String()) }() conn.SetReadLimit(maxMessageSize) @@ -379,6 +379,8 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { defer func() { ticker.Stop() conn.Close() + for := range stop { + } }() for { From 90a9680b7d8d0f930165e7abbab612b3dfa4bfbe Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsumoto Date: Thu, 28 Sep 2023 10:42:15 +0900 Subject: [PATCH 04/10] fix --- handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/handlers.go b/handlers.go index 1ea8f45..74f6064 100644 --- a/handlers.go +++ b/handlers.go @@ -379,7 +379,7 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { defer func() { ticker.Stop() conn.Close() - for := range stop { + for range stop { } }() From 87f4a8e443d1ab4680bd9ea20d1d6f1184f4a9da Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsumoto Date: Sat, 30 Sep 2023 23:45:16 +0900 Subject: [PATCH 05/10] extra option to set MaxOpenConns/MaxIdleConns for sqlite3 storage --- storage/sqlite3/init.go | 4 ++-- storage/sqlite3/sqlite3.go | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/storage/sqlite3/init.go b/storage/sqlite3/init.go index cb7fe0d..a517345 100644 --- a/storage/sqlite3/init.go +++ b/storage/sqlite3/init.go @@ -15,8 +15,8 @@ func (b *SQLite3Backend) Init() error { return err } - // sqlx default is 0 (unlimited), while sqlite3 by default accepts up to 100 connections - db.SetMaxOpenConns(80) + db.SetMaxOpenConns(b.MaxOpenConns) + db.SetMaxIdleConns(b.MaxIdleConns) db.Mapper = reflectx.NewMapperFunc("json", sqlx.NameMapper) b.DB = db diff --git a/storage/sqlite3/sqlite3.go b/storage/sqlite3/sqlite3.go index d0d6e03..d150e48 100644 --- a/storage/sqlite3/sqlite3.go +++ b/storage/sqlite3/sqlite3.go @@ -6,5 +6,7 @@ import ( type SQLite3Backend struct { *sqlx.DB - DatabaseURL string + DatabaseURL string + MaxOpenConns int + MaxIdleConns int } From 2477dc40027bd770e99c9a1eb212435485639925 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsumoto Date: Tue, 3 Oct 2023 22:19:56 +0900 Subject: [PATCH 06/10] index --- storage/sqlite3/init.go | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/storage/sqlite3/init.go b/storage/sqlite3/init.go index a517345..3810553 100644 --- a/storage/sqlite3/init.go +++ b/storage/sqlite3/init.go @@ -9,6 +9,20 @@ import ( var _ relayer.Storage = (*SQLite3Backend)(nil) +var ddls = []string{ + `CREATE TABLE IF NOT EXISTS event ( + id text NOT NULL, + pubkey text NOT NULL, + created_at integer NOT NULL, + kind integer NOT NULL, + tags jsonb NOT NULL, + content text NOT NULL, + sig text NOT NULL);`, + `CREATE INDEX IF NOT EXISTS idx_event_id on event(id)`, + `CREATE INDEX IF NOT EXISTS idx_event_kind on event(kind)`, + `CREATE INDEX IF NOT EXISTS idx_event_kind_tags on event(kind, tags)`, +} + func (b *SQLite3Backend) Init() error { db, err := sqlx.Connect("sqlite3", b.DatabaseURL) if err != nil { @@ -21,16 +35,11 @@ func (b *SQLite3Backend) Init() error { db.Mapper = reflectx.NewMapperFunc("json", sqlx.NameMapper) b.DB = db - _, err = b.DB.Exec(` -CREATE TABLE IF NOT EXISTS event ( - id text NOT NULL, - pubkey text NOT NULL, - created_at integer NOT NULL, - kind integer NOT NULL, - tags jsonb NOT NULL, - content text NOT NULL, - sig text NOT NULL -); - `) - return err + for _, ddl := range ddls { + _, err = b.DB.Exec(ddl) + if err != nil { + return err + } + } + return nil } From 4e412d32a21781f590c9735be05879742191a6b0 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsumoto Date: Sat, 30 Sep 2023 23:44:44 +0900 Subject: [PATCH 07/10] use WriteControl --- handlers.go | 523 +++++++++++++++++++++++++++------------------------- 1 file changed, 271 insertions(+), 252 deletions(-) diff --git a/handlers.go b/handlers.go index 74f6064..47104f5 100644 --- a/handlers.go +++ b/handlers.go @@ -40,10 +40,275 @@ var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } +func challenge(conn *websocket.Conn) *WebSocket { + // NIP-42 challenge + challenge := make([]byte, 8) + rand.Read(challenge) + + return &WebSocket{ + conn: conn, + challenge: hex.EncodeToString(challenge), + } +} + +func (s *Server) doEvent(ctx context.Context, ws *WebSocket, request []json.RawMessage, store Storage) string { + advancedDeleter, _ := store.(AdvancedDeleter) + + // it's a new event + var evt nostr.Event + if err := json.Unmarshal(request[1], &evt); err != nil { + return "failed to decode event: " + err.Error() + } + + // check serialization + serialized := evt.Serialize() + + // assign ID + hash := sha256.Sum256(serialized) + evt.ID = hex.EncodeToString(hash[:]) + + // check signature (requires the ID to be set) + if ok, err := evt.CheckSignature(); err != nil { + reason := "error: failed to verify signature" + ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: false, Reason: &reason}) + return "" + } else if !ok { + reason := "invalid: signature is invalid" + ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: false, Reason: &reason}) + return "" + } + + if evt.Kind == 5 { + // event deletion -- nip09 + for _, tag := range evt.Tags { + if len(tag) >= 2 && tag[0] == "e" { + if advancedDeleter != nil { + advancedDeleter.BeforeDelete(ctx, tag[1], evt.PubKey) + } + + if err := store.DeleteEvent(ctx, tag[1], evt.PubKey); err != nil { + reason := fmt.Sprintf("error: %s", err.Error()) + ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: false, Reason: &reason}) + return "" + } + + if advancedDeleter != nil { + advancedDeleter.AfterDelete(tag[1], evt.PubKey) + } + } + } + return "" + } + + ok, message := AddEvent(ctx, s.relay, &evt) + var reason *string + if message != "" { + reason = &message + } + ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: ok, Reason: reason}) + return "" +} + +func (s *Server) doCount(ctx context.Context, ws *WebSocket, request []json.RawMessage, store Storage) string { + counter, ok := store.(EventCounter) + if !ok { + return "restricted: this relay does not support NIP-45" + } + + var id string + json.Unmarshal(request[1], &id) + if id == "" { + return "COUNT has no " + } + + total := int64(0) + filters := make(nostr.Filters, len(request)-2) + for i, filterReq := range request[2:] { + if err := json.Unmarshal(filterReq, &filters[i]); err != nil { + return "failed to decode filter" + } + + filter := &filters[i] + + // prevent kind-4 events from being returned to unauthed users, + // only when authentication is a thing + if _, ok := s.relay.(Auther); ok { + if slices.Contains(filter.Kinds, 4) { + senders := filter.Authors + receivers, _ := filter.Tags["p"] + switch { + case ws.authed == "": + // not authenticated + return "restricted: this relay does not serve kind-4 to unauthenticated users, does your client implement NIP-42?" + case len(senders) == 1 && len(receivers) < 2 && (senders[0] == ws.authed): + // allowed filter: ws.authed is sole sender (filter specifies one or all receivers) + case len(receivers) == 1 && len(senders) < 2 && (receivers[0] == ws.authed): + // allowed filter: ws.authed is sole receiver (filter specifies one or all senders) + default: + // restricted filter: do not return any events, + // even if other elements in filters array were not restricted). + // client should know better. + return "restricted: authenticated user does not have authorization for requested filters." + } + } + } + + count, err := counter.CountEvents(ctx, filter) + if err != nil { + s.Log.Errorf("store: %v", err) + continue + } + total += count + } + + ws.WriteJSON([]interface{}{"COUNT", id, map[string]int64{"count": total}}) + return "" +} + +func (s *Server) doReq(ctx context.Context, ws *WebSocket, request []json.RawMessage, store Storage) string { + var id string + json.Unmarshal(request[1], &id) + if id == "" { + return "REQ has no " + } + + filters := make(nostr.Filters, len(request)-2) + for i, filterReq := range request[2:] { + if err := json.Unmarshal( + filterReq, + &filters[i], + ); err != nil { + return "failed to decode filter" + } + + filter := &filters[i] + + // prevent kind-4 events from being returned to unauthed users, + // only when authentication is a thing + if _, ok := s.relay.(Auther); ok { + if slices.Contains(filter.Kinds, 4) { + senders := filter.Authors + receivers, _ := filter.Tags["p"] + switch { + case ws.authed == "": + // not authenticated + return "restricted: this relay does not serve kind-4 to unauthenticated users, does your client implement NIP-42?" + case len(senders) == 1 && len(receivers) < 2 && (senders[0] == ws.authed): + // allowed filter: ws.authed is sole sender (filter specifies one or all receivers) + case len(receivers) == 1 && len(senders) < 2 && (receivers[0] == ws.authed): + // allowed filter: ws.authed is sole receiver (filter specifies one or all senders) + default: + // restricted filter: do not return any events, + // even if other elements in filters array were not restricted). + // client should know better. + return "restricted: authenticated user does not have authorization for requested filters." + } + } + } + + events, err := store.QueryEvents(ctx, filter) + if err != nil { + s.Log.Errorf("store: %v", err) + continue + } + + // ensures the client won't be bombarded with events in case Storage doesn't do limits right + if filter.Limit == 0 { + filter.Limit = 9999999999 + } + i := 0 + for event := range events { + ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event}) + i++ + if i > filter.Limit { + break + } + } + + // exhaust the channel (in case we broke out of it early) so it is closed by the storage + for range events { + } + } + + ws.WriteJSON(nostr.EOSEEnvelope(id)) + setListener(id, ws, filters) + return "" +} + +func (s *Server) doClose(ctx context.Context, ws *WebSocket, request []json.RawMessage, store Storage) string { + var id string + json.Unmarshal(request[1], &id) + if id == "" { + return "CLOSE has no " + } + + removeListenerId(ws, id) + return "" +} + +func (s *Server) doAuth(ctx context.Context, ws *WebSocket, request []json.RawMessage, store Storage) string { + if auther, ok := s.relay.(Auther); ok { + var evt nostr.Event + if err := json.Unmarshal(request[1], &evt); err != nil { + return "failed to decode auth event: " + err.Error() + } + if pubkey, ok := nip42.ValidateAuthEvent(&evt, ws.challenge, auther.ServiceURL()); ok { + ws.authed = pubkey + ctx = context.WithValue(ctx, AUTH_CONTEXT_KEY, pubkey) + ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: true}) + } else { + reason := "error: failed to authenticate" + ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: false, Reason: &reason}) + } + } + return "" +} + +func (s *Server) handleMessage(ctx context.Context, ws *WebSocket, message []byte, store Storage) { + var notice string + defer func() { + if notice != "" { + ws.WriteJSON(nostr.NoticeEnvelope(notice)) + } + }() + + var request []json.RawMessage + if err := json.Unmarshal(message, &request); err != nil { + // stop silently + return + } + + if len(request) < 2 { + notice = "request has less than 2 parameters" + return + } + + var typ string + json.Unmarshal(request[0], &typ) + + switch typ { + case "EVENT": + notice = s.doEvent(ctx, ws, request, store) + case "COUNT": + notice = s.doCount(ctx, ws, request, store) + case "REQ": + notice = s.doReq(ctx, ws, request, store) + case "CLOSE": + notice = s.doClose(ctx, ws, request, store) + case "AUTH": + notice = s.doAuth(ctx, ws, request, store) + default: + if cwh, ok := s.relay.(CustomWebSocketHandler); ok { + cwh.HandleUnknownType(ws, typ, request) + } else { + notice = "unknown message type " + typ + } + } +} + func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { ctx := r.Context() store := s.relay.Storage(ctx) - advancedDeleter, _ := store.(AdvancedDeleter) conn, err := upgrader.Upgrade(w, r, nil) if err != nil { @@ -58,14 +323,7 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { s.Log.Infof("connected from %s", conn.RemoteAddr().String()) - // NIP-42 challenge - challenge := make([]byte, 8) - rand.Read(challenge) - - ws := &WebSocket{ - conn: conn, - challenge: hex.EncodeToString(challenge), - } + ws := challenge(conn) if s.options.perConnectionLimiter != nil { ws.limiter = rate.NewLimiter( @@ -88,6 +346,8 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { } s.clientsMu.Unlock() s.Log.Infof("disconnected from %s", conn.RemoteAddr().String()) + + ctx.Done() }() conn.SetReadLimit(maxMessageSize) @@ -130,247 +390,7 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { continue } - go func(message []byte) { - ctx = context.TODO() - - var notice string - defer func() { - if notice != "" { - ws.WriteJSON(nostr.NoticeEnvelope(notice)) - } - }() - - var request []json.RawMessage - if err := json.Unmarshal(message, &request); err != nil { - // stop silently - return - } - - if len(request) < 2 { - notice = "request has less than 2 parameters" - return - } - - var typ string - json.Unmarshal(request[0], &typ) - - switch typ { - case "EVENT": - // it's a new event - var evt nostr.Event - if err := json.Unmarshal(request[1], &evt); err != nil { - notice = "failed to decode event: " + err.Error() - return - } - - // check serialization - serialized := evt.Serialize() - - // assign ID - hash := sha256.Sum256(serialized) - evt.ID = hex.EncodeToString(hash[:]) - - // check signature (requires the ID to be set) - if ok, err := evt.CheckSignature(); err != nil { - reason := "error: failed to verify signature" - ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: false, Reason: &reason}) - return - } else if !ok { - reason := "invalid: signature is invalid" - ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: false, Reason: &reason}) - return - } - - if evt.Kind == 5 { - // event deletion -- nip09 - for _, tag := range evt.Tags { - if len(tag) >= 2 && tag[0] == "e" { - if advancedDeleter != nil { - advancedDeleter.BeforeDelete(ctx, tag[1], evt.PubKey) - } - - if err := store.DeleteEvent(ctx, tag[1], evt.PubKey); err != nil { - reason := fmt.Sprintf("error: %s", err.Error()) - ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: false, Reason: &reason}) - return - } - - if advancedDeleter != nil { - advancedDeleter.AfterDelete(tag[1], evt.PubKey) - } - } - } - return - } - - ok, message := AddEvent(ctx, s.relay, &evt) - var reason *string - if message != "" { - reason = &message - } - ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: ok, Reason: reason}) - case "COUNT": - counter, ok := store.(EventCounter) - if !ok { - notice = "restricted: this relay does not support NIP-45" - return - } - - var id string - json.Unmarshal(request[1], &id) - if id == "" { - notice = "COUNT has no " - return - } - - total := int64(0) - filters := make(nostr.Filters, len(request)-2) - for i, filterReq := range request[2:] { - if err := json.Unmarshal(filterReq, &filters[i]); err != nil { - notice = "failed to decode filter" - return - } - - filter := &filters[i] - - // prevent kind-4 events from being returned to unauthed users, - // only when authentication is a thing - if _, ok := s.relay.(Auther); ok { - if slices.Contains(filter.Kinds, 4) { - senders := filter.Authors - receivers, _ := filter.Tags["p"] - switch { - case ws.authed == "": - // not authenticated - notice = "restricted: this relay does not serve kind-4 to unauthenticated users, does your client implement NIP-42?" - return - case len(senders) == 1 && len(receivers) < 2 && (senders[0] == ws.authed): - // allowed filter: ws.authed is sole sender (filter specifies one or all receivers) - case len(receivers) == 1 && len(senders) < 2 && (receivers[0] == ws.authed): - // allowed filter: ws.authed is sole receiver (filter specifies one or all senders) - default: - // restricted filter: do not return any events, - // even if other elements in filters array were not restricted). - // client should know better. - notice = "restricted: authenticated user does not have authorization for requested filters." - return - } - } - } - - count, err := counter.CountEvents(ctx, filter) - if err != nil { - s.Log.Errorf("store: %v", err) - continue - } - total += count - } - - ws.WriteJSON([]interface{}{"COUNT", id, map[string]int64{"count": total}}) - case "REQ": - var id string - json.Unmarshal(request[1], &id) - if id == "" { - notice = "REQ has no " - return - } - - filters := make(nostr.Filters, len(request)-2) - for i, filterReq := range request[2:] { - if err := json.Unmarshal( - filterReq, - &filters[i], - ); err != nil { - notice = "failed to decode filter" - return - } - - filter := &filters[i] - - // prevent kind-4 events from being returned to unauthed users, - // only when authentication is a thing - if _, ok := s.relay.(Auther); ok { - if slices.Contains(filter.Kinds, 4) { - senders := filter.Authors - receivers, _ := filter.Tags["p"] - switch { - case ws.authed == "": - // not authenticated - notice = "restricted: this relay does not serve kind-4 to unauthenticated users, does your client implement NIP-42?" - return - case len(senders) == 1 && len(receivers) < 2 && (senders[0] == ws.authed): - // allowed filter: ws.authed is sole sender (filter specifies one or all receivers) - case len(receivers) == 1 && len(senders) < 2 && (receivers[0] == ws.authed): - // allowed filter: ws.authed is sole receiver (filter specifies one or all senders) - default: - // restricted filter: do not return any events, - // even if other elements in filters array were not restricted). - // client should know better. - notice = "restricted: authenticated user does not have authorization for requested filters." - return - } - } - } - - events, err := store.QueryEvents(ctx, filter) - if err != nil { - s.Log.Errorf("store: %v", err) - continue - } - - // ensures the client won't be bombarded with events in case Storage doesn't do limits right - if filter.Limit == 0 { - filter.Limit = 9999999999 - } - i := 0 - for event := range events { - ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event}) - i++ - if i > filter.Limit { - break - } - } - - // exhaust the channel (in case we broke out of it early) so it is closed by the storage - for range events { - } - } - - ws.WriteJSON(nostr.EOSEEnvelope(id)) - setListener(id, ws, filters) - case "CLOSE": - var id string - json.Unmarshal(request[1], &id) - if id == "" { - notice = "CLOSE has no " - return - } - - removeListenerId(ws, id) - case "AUTH": - if auther, ok := s.relay.(Auther); ok { - var evt nostr.Event - if err := json.Unmarshal(request[1], &evt); err != nil { - notice = "failed to decode auth event: " + err.Error() - return - } - if pubkey, ok := nip42.ValidateAuthEvent(&evt, ws.challenge, auther.ServiceURL()); ok { - ws.authed = pubkey - ctx = context.WithValue(ctx, AUTH_CONTEXT_KEY, pubkey) - ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: true}) - } else { - reason := "error: failed to authenticate" - ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: false, Reason: &reason}) - } - } - default: - if cwh, ok := s.relay.(CustomWebSocketHandler); ok { - cwh.HandleUnknownType(ws, typ, request) - } else { - notice = "unknown message type " + typ - } - } - }(message) + go s.handleMessage(ctx, ws, message, store) } }() @@ -386,8 +406,7 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { for { select { case <-ticker.C: - conn.SetWriteDeadline(time.Now().Add(writeWait)) - err := ws.WriteMessage(websocket.PingMessage, nil) + err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeWait)) if err != nil { s.Log.Errorf("error writing ping: %v; closing websocket", err) return From 4782a43a73430d1abc7532721b0283b780c08de1 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsumoto Date: Fri, 6 Oct 2023 08:32:24 +0900 Subject: [PATCH 08/10] create index --- storage/sqlite3/init.go | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/storage/sqlite3/init.go b/storage/sqlite3/init.go index 3810553..9b49556 100644 --- a/storage/sqlite3/init.go +++ b/storage/sqlite3/init.go @@ -18,9 +18,31 @@ var ddls = []string{ tags jsonb NOT NULL, content text NOT NULL, sig text NOT NULL);`, - `CREATE INDEX IF NOT EXISTS idx_event_id on event(id)`, - `CREATE INDEX IF NOT EXISTS idx_event_kind on event(kind)`, - `CREATE INDEX IF NOT EXISTS idx_event_kind_tags on event(kind, tags)`, + `CREATE UNIQUE INDEX IF NOT EXISTS ididx ON event(id)`, + `CREATE INDEX IF NOT EXISTS pubkeyprefix ON event(pubkey)`, + `CREATE INDEX IF NOT EXISTS timeidx ON event(created_at DESC)`, + `CREATE INDEX IF NOT EXISTS kindidx ON event(kind)`, +} + +func fixup(db *sqlx.DB) { + row, err := db.Query(`SELECT id, rowid FROM event GROUP BY id HAVING COUNT(id) > 1`) + if err == nil { + for row.Next() { + var id, rowid string + err = row.Scan(&id, &rowid) + if err != nil { + continue + } + result, err := db.Exec(`DELETE FROM event WHERE id = ? AND rowid != ?`, id, rowid) + if err != nil { + continue + } + num, _ := result.RowsAffected() + println(id, rowid, num) + } + row.Close() + println("DONE") + } } func (b *SQLite3Backend) Init() error { @@ -28,6 +50,7 @@ func (b *SQLite3Backend) Init() error { if err != nil { return err } + fixup(db) db.SetMaxOpenConns(b.MaxOpenConns) db.SetMaxIdleConns(b.MaxIdleConns) From 6b33ff0774cd08b2d36121828db031d70f69d1ad Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsumoto Date: Fri, 6 Oct 2023 12:06:37 +0900 Subject: [PATCH 09/10] fix context. this must be another context --- handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/handlers.go b/handlers.go index 47104f5..f676c15 100644 --- a/handlers.go +++ b/handlers.go @@ -390,7 +390,7 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { continue } - go s.handleMessage(ctx, ws, message, store) + go s.handleMessage(context.TODO(), ws, message, store) } }() From 88aa2fd51142a04541d97b0dd86bb59299e4a41b Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsumoto Date: Fri, 6 Oct 2023 12:41:04 +0900 Subject: [PATCH 10/10] delete fixup --- storage/sqlite3/init.go | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/storage/sqlite3/init.go b/storage/sqlite3/init.go index 9b49556..b5c104f 100644 --- a/storage/sqlite3/init.go +++ b/storage/sqlite3/init.go @@ -24,33 +24,11 @@ var ddls = []string{ `CREATE INDEX IF NOT EXISTS kindidx ON event(kind)`, } -func fixup(db *sqlx.DB) { - row, err := db.Query(`SELECT id, rowid FROM event GROUP BY id HAVING COUNT(id) > 1`) - if err == nil { - for row.Next() { - var id, rowid string - err = row.Scan(&id, &rowid) - if err != nil { - continue - } - result, err := db.Exec(`DELETE FROM event WHERE id = ? AND rowid != ?`, id, rowid) - if err != nil { - continue - } - num, _ := result.RowsAffected() - println(id, rowid, num) - } - row.Close() - println("DONE") - } -} - func (b *SQLite3Backend) Init() error { db, err := sqlx.Connect("sqlite3", b.DatabaseURL) if err != nil { return err } - fixup(db) db.SetMaxOpenConns(b.MaxOpenConns) db.SetMaxIdleConns(b.MaxIdleConns)