Skip to content

Commit

Permalink
create stats meta partition one day in advance using the statistics p…
Browse files Browse the repository at this point in the history
…rocess
  • Loading branch information
peterbitfly committed Nov 4, 2022
1 parent 96a1c42 commit d710b96
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cmd/explorer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func main() {
Name: cfg.Frontend.ReaderDatabase.Name,
Host: cfg.Frontend.ReaderDatabase.Host,
Port: cfg.Frontend.ReaderDatabase.Port,
}, cfg.Frontend.SessionSecret)
})
}()

wg.Add(1)
Expand Down
25 changes: 25 additions & 0 deletions cmd/statistics/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@ func main() {
defer db.ReaderDb.Close()
defer db.WriterDb.Close()

db.MustInitFrontendDB(&types.DatabaseConfig{
Username: cfg.Frontend.WriterDatabase.Username,
Password: cfg.Frontend.WriterDatabase.Password,
Name: cfg.Frontend.WriterDatabase.Name,
Host: cfg.Frontend.WriterDatabase.Host,
Port: cfg.Frontend.WriterDatabase.Port,
}, &types.DatabaseConfig{
Username: cfg.Frontend.ReaderDatabase.Username,
Password: cfg.Frontend.ReaderDatabase.Password,
Name: cfg.Frontend.ReaderDatabase.Name,
Host: cfg.Frontend.ReaderDatabase.Host,
Port: cfg.Frontend.ReaderDatabase.Port,
})
defer db.FrontendReaderDB.Close()
defer db.FrontendWriterDB.Close()

db.InitBigtable(cfg.Bigtable.Project, cfg.Bigtable.Instance, fmt.Sprintf("%d", utils.Config.Chain.Config.DepositChainID))

if *statisticsDaysToExport != "" {
Expand Down Expand Up @@ -101,6 +117,15 @@ func main() {

func statisticsLoop() {
for {

// create stats parition on users table
now := time.Now()
nowTs := now.Unix()
var day int = int(nowTs / 86400)

db.CreateNewStatsMetaPartition(day)
db.CreateNewStatsMetaPartition(day + 1)

latestEpoch, err := db.GetLatestEpoch()
if err != nil {
logrus.Errorf("error retreiving latest epoch from the db: %v", err)
Expand Down
44 changes: 24 additions & 20 deletions db/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
var FrontendReaderDB *sqlx.DB
var FrontendWriterDB *sqlx.DB

func MustInitFrontendDB(writer *types.DatabaseConfig, reader *types.DatabaseConfig, sessionSecret string) {
func MustInitFrontendDB(writer *types.DatabaseConfig, reader *types.DatabaseConfig) {
FrontendWriterDB, FrontendReaderDB = mustInitDB(writer, reader)
}

Expand Down Expand Up @@ -618,37 +618,41 @@ func InsertStatsMeta(tx *sql.Tx, userID uint64, data *types.StatsMeta) (uint64,
return id, err
}

func CreateNewStatsMetaPartition() error {

now := time.Now()
nowTs := now.Unix()
var day int = int(nowTs / 86400)

func CreateNewStatsMetaPartition(day int) error {
partitionName := "stats_meta_" + strconv.Itoa(day)
logger.Info("creating new partition table " + partitionName)

tx, err := FrontendWriterDB.Begin()
tx, err := FrontendWriterDB.Beginx()
if err != nil {
logger.Errorf("error starting tx for creating partition %v", err)
return err
}
defer tx.Rollback()

_, err = tx.Exec("CREATE TABLE " + partitionName + " PARTITION OF stats_meta_p FOR VALUES IN (" + strconv.Itoa(day) + ")")
var exists bool
err = tx.Get(&exists, `SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = $1);`, partitionName)
if err != nil {
logger.Errorf("error creating partition %v", err)
return err
}
_, err = tx.Exec("CREATE UNIQUE INDEX " + partitionName + "_user_id_created_trunc_process_machine_key ON public." + partitionName + " USING btree (user_id, created_trunc, process, machine)")
if err != nil {
logger.Errorf("error creating index %v", err)
logger.Errorf("error checking if partition %s already exists: %v", partitionName, err)
return err
}

_, err = tx.Exec("CREATE INDEX idx_" + partitionName + "_user_machine ON " + partitionName + " (user_id, machine)")
if err != nil {
logger.Errorf("error creating index %v", err)
return err
if !exists {
logger.Infof("creating partition %s", partitionName)
_, err = tx.Exec("CREATE TABLE " + partitionName + " PARTITION OF stats_meta_p FOR VALUES IN (" + strconv.Itoa(day) + ")")
if err != nil {
logger.Errorf("error creating partition %v", err)
return err
}
_, err = tx.Exec("CREATE UNIQUE INDEX " + partitionName + "_user_id_created_trunc_process_machine_key ON public." + partitionName + " USING btree (user_id, created_trunc, process, machine)")
if err != nil {
logger.Errorf("error creating index %v", err)
return err
}

_, err = tx.Exec("CREATE INDEX idx_" + partitionName + "_user_machine ON " + partitionName + " (user_id, machine)")
if err != nil {
logger.Errorf("error creating index %v", err)
return err
}
}

err = tx.Commit()
Expand Down
6 changes: 5 additions & 1 deletion handlers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2336,7 +2336,11 @@ func insertStats(userData *types.UserWithPremium, machine string, body *map[stri
if strings.Contains(err.Error(), "no partition of relation") {
tx.Rollback()

db.CreateNewStatsMetaPartition()
now := time.Now()
nowTs := now.Unix()
var day int = int(nowTs / 86400)
db.CreateNewStatsMetaPartition(day)

tx, err = db.NewTransaction()
if err != nil {
logger.Errorf("could not transact | %v", err)
Expand Down

0 comments on commit d710b96

Please sign in to comment.