Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: add mcache #46

Merged
merged 6 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions app/app.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package app

import (
"context"
"encoding/json"
"errors"
"github.com/webhookx-io/webhookx/admin"
"github.com/webhookx-io/webhookx/admin/api"
"github.com/webhookx-io/webhookx/config"
"github.com/webhookx-io/webhookx/db"
"github.com/webhookx-io/webhookx/dispatcher"
"github.com/webhookx-io/webhookx/eventbus"
"github.com/webhookx-io/webhookx/mcache"
"github.com/webhookx-io/webhookx/pkg/cache"
"github.com/webhookx-io/webhookx/pkg/log"
"github.com/webhookx-io/webhookx/pkg/taskqueue"
Expand All @@ -15,6 +19,7 @@
"github.com/webhookx-io/webhookx/worker/deliverer"
"go.uber.org/zap"
"sync"
"time"
)

var (
Expand All @@ -35,6 +40,7 @@
queue taskqueue.TaskQueue
dispatcher *dispatcher.Dispatcher
cache cache.Cache
bus eventbus.Bus

admin *admin.Admin
gateway *proxy.Gateway
Expand Down Expand Up @@ -65,24 +71,43 @@
zap.ReplaceGlobals(log)
app.log = zap.S()

// cache
client := cfg.RedisConfig.GetClient()
app.cache = cache.NewRedisCache(client)

mcache.Set(mcache.NewMCache(&mcache.Options{
L1Size: 1000,
L1TTL: time.Second * 10,
L2: app.cache,
}))

app.bus = eventbus.NewDatabaseEventBus(cfg.DatabaseConfig.GetDSN(), app.log)
app.bus.Subscribe(eventbus.EventInvalidation, func(data []byte) {
maps := make(map[string]interface{})
if err := json.Unmarshal(data, &maps); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

declare subscribe handlers somewhere else, keep app init code clean

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. Updated.

return

Check warning on line 88 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L86-L88

Added lines #L86 - L88 were not covered by tests
}
if cacheKey, ok := maps["cache_key"]; ok {
err := mcache.Invalidate(context.TODO(), cacheKey.(string))
if err != nil {
app.log.Errorf("failed to invalidate cache: key=%s %v", cacheKey, err)

Check warning on line 93 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L90-L93

Added lines #L90 - L93 were not covered by tests
}
}
})

// db
db, err := db.NewDB(&cfg.DatabaseConfig)
if err != nil {
return err
}
app.db = db

client := cfg.RedisConfig.GetClient()

// queue
queue := taskqueue.NewRedisQueue(taskqueue.RedisTaskQueueOptions{
Client: client,
}, app.log)
app.queue = queue

// cache
app.cache = cache.NewRedisCache(client)

app.dispatcher = dispatcher.NewDispatcher(log.Sugar(), queue, db)

// worker
Expand Down Expand Up @@ -122,6 +147,9 @@
return ErrApplicationStarted
}

if err := app.bus.Start(); err != nil {
return err

Check warning on line 151 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L151

Added line #L151 was not covered by tests
}
if app.admin != nil {
app.admin.Start()
}
Expand Down Expand Up @@ -156,6 +184,7 @@
app.log.Infof("stopped")
}()

_ = app.bus.Stop()
// TODO: timeout
if app.admin != nil {
app.admin.Stop()
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"encoding/json"
"github.com/creasty/defaults"
uuid "github.com/satori/go.uuid"
"github.com/webhookx-io/webhookx/pkg/envconfig"
"gopkg.in/yaml.v3"
"os"
Expand All @@ -11,6 +12,7 @@ import (
var (
VERSION = "dev"
COMMIT = "unknown"
NODE = uuid.NewV4().String()
)

var cfg Config
Expand Down
23 changes: 13 additions & 10 deletions config/database.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
package config

import (
"database/sql"
"fmt"
)

type DatabaseConfig struct {
DSN string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DSN string
dsn string

already have GetDSN() method, should keep DSN private.

Copy link
Member Author

@vm-001 vm-001 Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to add a new configuration field for the database section. For example:

database:
  dsn: postgres://username:password@127.0.0.1:5432/webhookx?sslmode=disable
  ...

The dsn will have the highest priority compared the the rest of all (username, password, etc)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted. (will be done on another PR)

Host string `yaml:"host" default:"localhost"`
Port uint32 `yaml:"port" default:"5432"`
Username string `yaml:"username" default:"webhookx"`
Password string `yaml:"password" default:""`
Database string `yaml:"database" default:"webhookx"`
}

func (cfg DatabaseConfig) GetDB() (*sql.DB, error) {
dsn := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=disable",
cfg.Username,
cfg.Password,
cfg.Host,
cfg.Port,
cfg.Database,
)
return sql.Open("postgres", dsn)
func (cfg DatabaseConfig) GetDSN() string {
dsn := cfg.DSN
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dsn := cfg.DSN
dsn := cfg.dsn

if dsn == "" {
dsn = fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=disable",
cfg.Username,
cfg.Password,
cfg.Host,
cfg.Port,
cfg.Database,
)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
cfg.dsn = dsn
}

return dsn
}

func (cfg DatabaseConfig) Validate() error {
Expand Down
24 changes: 24 additions & 0 deletions constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package constants

import (
"github.com/webhookx-io/webhookx/config"
"strings"
"time"
)

Expand All @@ -26,6 +27,29 @@ const (
RequeueInterval = time.Second * 60
)

type CacheKey string

func (c CacheKey) Build(id string) string {
var sb strings.Builder
sb.WriteString(Namespace)
sb.WriteString(":")
sb.WriteString(string(c))
sb.WriteString(":")
sb.WriteString(id)
return sb.String()
}

const (
Namespace string = "webhookx"
EventCacheKey CacheKey = "events"
EndpointCacheKey CacheKey = "endpoints"
SourceCacheKey CacheKey = "sources"
WorkspaceCacheKey CacheKey = "workspaces"
AttemptCacheKey CacheKey = "attempts"
PluginCacheKey CacheKey = "plugins"
AttemptDetailCacheKey CacheKey = "attempt_details"
)

var (
DefaultResponseHeaders = map[string]string{
"Content-Type": "application/json",
Expand Down
10 changes: 9 additions & 1 deletion db/dao/attempt_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/webhookx-io/webhookx/constants"
"github.com/webhookx-io/webhookx/db/entities"
"github.com/webhookx-io/webhookx/pkg/types"
)
Expand All @@ -22,8 +23,15 @@ type AttemptResult struct {
}

func NewAttemptDao(db *sqlx.DB, workspace bool) AttemptDAO {
opts := Options{
Table: "attempts",
EntityName: "Attempt",
Workspace: workspace,
CachePropagate: false,
CacheKey: constants.AttemptCacheKey,
}
return &attemptDao{
DAO: NewDAO[entities.Attempt]("attempts", db, workspace),
DAO: NewDAO[entities.Attempt](db, opts),
}
}

Expand Down
10 changes: 9 additions & 1 deletion db/dao/attempt_detail_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dao

import (
"context"
"github.com/webhookx-io/webhookx/constants"
"time"

"github.com/jmoiron/sqlx"
Expand All @@ -13,8 +14,15 @@ type attemptDetailDao struct {
}

func NewAttemptDetailDao(db *sqlx.DB, workspace bool) AttemptDetailDAO {
opts := Options{
Table: "attempt_details",
EntityName: "AttemptDetail",
Workspace: workspace,
CachePropagate: false,
CacheKey: constants.AttemptDetailCacheKey,
}
return &attemptDetailDao{
DAO: NewDAO[entities.AttemptDetail]("attempt_details", db, workspace),
DAO: NewDAO[entities.AttemptDetail](db, opts),
}
}

Expand Down
Loading
Loading