Skip to content

Commit

Permalink
Switch to SQLite for activity tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
DaneEveritt committed Jul 10, 2022
1 parent e1e7916 commit 7bd11c1
Show file tree
Hide file tree
Showing 16 changed files with 264 additions and 740 deletions.
7 changes: 6 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"github.com/pterodactyl/wings/internal/cron"
"github.com/pterodactyl/wings/internal/sqlite"
log2 "log"
"net/http"
_ "net/http/pprof"
Expand Down Expand Up @@ -131,6 +132,10 @@ func rootCmdRun(cmd *cobra.Command, _ []string) {
}),
)

if err := sqlite.Initialize(cmd.Context()); err != nil {
log.WithField("error", err).Fatal("failed to initialize database")
}

manager, err := server.NewManager(cmd.Context(), pclient)
if err != nil {
log.WithField("error", err).Fatal("failed to load server configurations")
Expand Down Expand Up @@ -260,7 +265,7 @@ func rootCmdRun(cmd *cobra.Command, _ []string) {
}
}()

if s, err := cron.Scheduler(manager); err != nil {
if s, err := cron.Scheduler(cmd.Context(), manager); err != nil {
log.WithField("error", err).Fatal("failed to initialize cron system")
} else {
log.WithField("subsystem", "cron").Info("starting cron processes")
Expand Down
25 changes: 18 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ require (
gopkg.in/yaml.v2 v2.4.0
)

require github.com/goccy/go-json v0.9.6
require (
github.com/go-co-op/gocron v1.15.0
github.com/goccy/go-json v0.9.6
github.com/klauspost/compress v1.15.1
modernc.org/sqlite v1.17.3
)

require golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e // indirect

Expand All @@ -54,7 +59,6 @@ require (
github.com/Microsoft/hcsshim v0.9.2 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/containerd/containerd v1.6.2 // indirect
github.com/containerd/fifo v1.0.0 // indirect
Expand All @@ -66,7 +70,6 @@ require (
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/gammazero/deque v0.1.1 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-co-op/gocron v1.15.0 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.10.1 // indirect
Expand All @@ -77,7 +80,6 @@ require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/magefile/mage v1.13.0 // indirect
Expand All @@ -98,24 +100,33 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
github.com/ulikunitz/xz v0.5.10 // indirect
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
github.com/xujiajun/mmap-go v1.0.1 // indirect
github.com/xujiajun/nutsdb v0.9.0 // indirect
github.com/xujiajun/utils v0.0.0-20190123093513-8bf096c4f53b // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
golang.org/x/tools v0.1.1 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20220324131243-acbaeb5b85eb // indirect
google.golang.org/grpc v1.45.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/uint128 v1.1.1 // indirect
modernc.org/cc/v3 v3.36.0 // indirect
modernc.org/ccgo/v3 v3.16.6 // indirect
modernc.org/libc v1.16.7 // indirect
modernc.org/mathutil v1.4.1 // indirect
modernc.org/memory v1.1.1 // indirect
modernc.org/opt v0.1.1 // indirect
modernc.org/strutil v1.1.1 // indirect
modernc.org/token v1.0.0 // indirect
)
342 changes: 44 additions & 298 deletions go.sum

Large diffs are not rendered by default.

156 changes: 78 additions & 78 deletions internal/cron/activity_cron.go
Original file line number Diff line number Diff line change
@@ -1,102 +1,102 @@
package cron

import (
"bytes"
"context"
"database/sql"
"emperror.dev/errors"
"github.com/apex/log"
"github.com/goccy/go-json"
"github.com/pterodactyl/wings/internal/database"
"encoding/gob"
"github.com/pterodactyl/wings/internal/sqlite"
"github.com/pterodactyl/wings/server"
"github.com/pterodactyl/wings/system"
"github.com/xujiajun/nutsdb"
"strings"
)

var key = []byte("events")
var activityCron system.AtomicBool
type activityCron struct {
mu *system.AtomicBool
manager *server.Manager
max int64
}

const queryRegularActivity = `
SELECT id, event, user_uuid, server_uuid, metadata, ip, timestamp FROM activity_logs
WHERE event NOT LIKE 'server:sftp.%'
ORDER BY timestamp
LIMIT ?
`

type QueriedActivity struct {
id int
b []byte
server.Activity
}

func processActivityLogs(m *server.Manager, c int64) error {
// Parse parses the internal query results into the QueriedActivity type and then properly
// sets the Metadata onto it. This also sets the ID that was returned to ensure we're able
// to then delete all of the matching rows in the database after we're done.
func (qa *QueriedActivity) Parse(r *sql.Rows) error {
if err := r.Scan(&qa.id, &qa.Event, &qa.User, &qa.Server, &qa.b, &qa.IP, &qa.Timestamp); err != nil {
return errors.Wrap(err, "cron: failed to parse activity log")
}
if err := gob.NewDecoder(bytes.NewBuffer(qa.b)).Decode(&qa.Metadata); err != nil {
return errors.WithStack(err)
}
return nil
}

// Run executes the cronjob and ensures we fetch and send all of the stored activity to the
// Panel instance. Once activity is sent it is deleted from the local database instance. Any
// SFTP specific events are not handled in this cron, they're handled seperately to account
// for de-duplication and event merging.
func (ac *activityCron) Run(ctx context.Context) error {
// Don't execute this cron if there is currently one running. Once this task is completed
// go ahead and mark it as no longer running.
if !activityCron.SwapIf(true) {
log.WithField("subsystem", "cron").WithField("cron", "activity_logs").Warn("cron: process overlap detected, skipping this run")
return nil
if !ac.mu.SwapIf(true) {
return errors.WithStack(ErrCronRunning)
}
defer activityCron.Store(false)
defer ac.mu.Store(false)

var list [][]byte
err := database.DB().View(func(tx *nutsdb.Tx) error {
// Grab the oldest 100 activity events that have been logged and send them back to the
// Panel for processing. Once completed, delete those events from the database and then
// release the lock on this process.
end := int(c)
if s, err := tx.LSize(database.ServerActivityBucket, key); err != nil {
if errors.Is(err, nutsdb.ErrBucket) {
return nil
}
return errors.WithStackIf(err)
} else if s < end || s == 0 {
if s == 0 {
return nil
}
end = s
}
l, err := tx.LRange(database.ServerActivityBucket, key, 0, end)
if err != nil {
// This error is returned when the bucket doesn't exist, which is likely on the
// first invocations of Wings since we haven't yet logged any data. There is nothing
// that needs to be done if this error occurs.
if errors.Is(err, nutsdb.ErrBucket) {
return nil
}
return errors.WithStackIf(err)
}
list = l
return nil
})

if err != nil || len(list) == 0 {
return errors.WithStackIf(err)
rows, err := sqlite.Instance().QueryContext(ctx, queryRegularActivity, ac.max)
if err != nil {
return errors.Wrap(err, "cron: failed to query activity logs")
}
defer rows.Close()

var processed []json.RawMessage
for _, l := range list {
var v json.RawMessage
if err := json.Unmarshal(l, &v); err != nil {
log.WithField("error", errors.WithStack(err)).Warn("failed to parse activity event json, skipping entry")
continue
var logs []server.Activity
var ids []int
for rows.Next() {
var qa QueriedActivity
if err := qa.Parse(rows); err != nil {
return err
}
processed = append(processed, v)
ids = append(ids, qa.id)
logs = append(logs, qa.Activity)
}

if err := m.Client().SendActivityLogs(context.Background(), processed); err != nil {
if err := rows.Err(); err != nil {
return errors.WithStack(err)
}
if len(logs) == 0 {
return nil
}
if err := ac.manager.Client().SendActivityLogs(context.Background(), logs); err != nil {
return errors.WrapIf(err, "cron: failed to send activity events to Panel")
}

return database.DB().Update(func(tx *nutsdb.Tx) error {
if m, err := tx.LSize(database.ServerActivityBucket, key); err != nil {
return errors.WithStack(err)
} else if m > len(list) {
// As long as there are more elements than we have in the length of our list
// we can just use the existing `LTrim` functionality of nutsdb. This will remove
// all of the values we've already pulled and sent to the API.
return errors.WithStack(tx.LTrim(database.ServerActivityBucket, key, len(list), -1))
} else {
i := 0
// This is the only way I can figure out to actually empty the items out of the list
// because you cannot use `LTrim` (or I cannot for the life of me figure out how) to
// trim the slice down to 0 items without it triggering an internal logic error. Perhaps
// in a future release they'll have a function to do this (based on my skimming of issues
// on GitHub that I cannot read due to translation barriers).
for {
if i >= m {
break
}
if _, err := tx.LPop(database.ServerActivityBucket, key); err != nil {
return errors.WithStack(err)
}
i++
}
if tx, err := sqlite.Instance().Begin(); err != nil {
return err
} else {
t := make([]string, len(ids))
params := make([]interface{}, len(ids))
for i := 0; i < len(ids); i++ {
t[i] = "?"
params[i] = ids[i]
}
return nil
})
q := strings.Join(t, ",")
_, err := tx.Exec(`DELETE FROM activity_logs WHERE id IN(`+q+`)`, params...)
if err != nil {
return errors.Combine(errors.WithStack(err), tx.Rollback())
}
return errors.WithStack(tx.Commit())
}
}
25 changes: 13 additions & 12 deletions internal/cron/cron.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cron

import (
"context"
"emperror.dev/errors"
"github.com/apex/log"
"github.com/go-co-op/gocron"
Expand All @@ -17,7 +18,7 @@ var o system.AtomicBool
// Scheduler configures the internal cronjob system for Wings and returns the scheduler
// instance to the caller. This should only be called once per application lifecycle, additional
// calls will result in an error being returned.
func Scheduler(m *server.Manager) (*gocron.Scheduler, error) {
func Scheduler(ctx context.Context, m *server.Manager) (*gocron.Scheduler, error) {
if !o.SwapIf(true) {
return nil, errors.New("cron: cannot call scheduler more than once in application lifecycle")
}
Expand All @@ -26,20 +27,20 @@ func Scheduler(m *server.Manager) (*gocron.Scheduler, error) {
return nil, errors.Wrap(err, "cron: failed to parse configured system timezone")
}

s := gocron.NewScheduler(l)
_, _ = s.Tag("activity").Every(int(config.Get().System.ActivitySendInterval)).Seconds().Do(func() {
if err := processActivityLogs(m, config.Get().System.ActivitySendCount); err != nil {
log.WithField("error", err).Error("cron: failed to process activity events")
}
})
activity := activityCron{
mu: system.NewAtomicBool(false),
manager: m,
max: config.Get().System.ActivitySendCount,
}

_, _ = s.Tag("sftp").Every(20).Seconds().Do(func() {
runner := sftpEventProcessor{mu: system.NewAtomicBool(false), manager: m}
if err := runner.Run(); err != nil {
s := gocron.NewScheduler(l)
// int(config.Get().System.ActivitySendInterval)
_, _ = s.Tag("activity").Every(5).Seconds().Do(func() {
if err := activity.Run(ctx); err != nil {
if errors.Is(err, ErrCronRunning) {
log.WithField("cron", "sftp_events").Warn("cron: job already running, skipping...")
log.WithField("cron", "activity").Warn("cron: process is already running, skipping...")
} else {
log.WithField("error", err).Error("cron: failed to process sftp events")
log.WithField("error", err).Error("cron: failed to process activity events")
}
}
})
Expand Down
Loading

0 comments on commit 7bd11c1

Please sign in to comment.