Skip to content

Commit

Permalink
More progress towards zero downtime restarts
Browse files Browse the repository at this point in the history
  • Loading branch information
jogramming committed Jul 13, 2018
1 parent 08426bb commit f42e076
Show file tree
Hide file tree
Showing 14 changed files with 208 additions and 74 deletions.
18 changes: 11 additions & 7 deletions bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,6 @@ func Run() {
go MemberFetcher.Run()
go mergedMessageSender()

// Initialize all plugins
for _, plugin := range common.Plugins {
if initBot, ok := plugin.(BotInitHandler); ok {
initBot.BotInit()
}
}

masterAddr := os.Getenv("YAGPDB_MASTER_CONNECT_ADDR")
if masterAddr != "" {
stateLock.Lock()
Expand All @@ -138,6 +131,8 @@ func Run() {
state = StateRunningNoMaster
stateLock.Unlock()

InitPlugins()

log.Println("Running normally without a master")
go ShardManager.Start()
go MonitorLoading()
Expand Down Expand Up @@ -172,6 +167,15 @@ func MonitorLoading() {
}
}

func InitPlugins() {
// Initialize all plugins
for _, plugin := range common.Plugins {
if initBot, ok := plugin.(BotInitHandler); ok {
initBot.BotInit()
}
}
}

func BotStarted() {
for _, p := range common.Plugins {
starter, ok := p.(BotStartedHandler)
Expand Down
39 changes: 37 additions & 2 deletions bot/botrest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,31 @@ import (
"net/http/pprof"
"strconv"
"strings"
"sync"
"time"
)

var serverAddr = ":5002"

func StartServer() {
func RegisterPlugin() {
common.RegisterPlugin(&Plugin{})
}

var (
_ bot.BotInitHandler = (*Plugin)(nil)
_ bot.BotStopperHandler = (*Plugin)(nil)
)

type Plugin struct {
srv *http.Server
}

func (p *Plugin) Name() string {
return "botrest"
}

func (p *Plugin) BotInit() {

muxer := goji.NewMux()
muxer.Use(dropNonLocal)

Expand All @@ -38,7 +57,23 @@ func StartServer() {
muxer.HandleFunc(pat.Get("/debug2/pproff/symbol"), pprof.Symbol)
muxer.HandleFunc(pat.Get("/debug2/pproff/trace"), pprof.Trace)

http.ListenAndServe(serverAddr, muxer)
p.srv = &http.Server{
Addr: serverAddr,
Handler: muxer,
}

go func() {
logrus.Println("starting botrest on ", serverAddr)
err := p.srv.ListenAndServe()
if err != nil {
logrus.Println("Failed starting botrest http server")
}
}()
}

func (p *Plugin) StopBot(wg *sync.WaitGroup) {
p.srv.Shutdown(nil)
wg.Done()
}

func ServeJson(w http.ResponseWriter, r *http.Request, data interface{}) {
Expand Down
19 changes: 15 additions & 4 deletions bot/slaveimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ func (s *SlaveImpl) FullStart() {
logrus.Println("Starting full start")

stateLock.Lock()
if state != StateSoftStarting {
go ShardManager.Start()
}

stateTmp := state
state = StateRunningWithMaster
stateLock.Unlock()

if stateTmp != StateSoftStarting {
InitPlugins()
go ShardManager.Start()
}
}

func (s *SlaveImpl) SoftStart() {
Expand All @@ -36,6 +38,8 @@ func (s *SlaveImpl) SoftStart() {
state = StateSoftStarting
stateLock.Unlock()

InitPlugins()

go ShardManager.Start()
}

Expand All @@ -58,6 +62,9 @@ func (s *SlaveImpl) StartShardTransferFrom() int {

func (s *SlaveImpl) StartShardTransferTo(numShards int) {
ShardManager.SetNumShards(numShards)

InitPlugins()

err := ShardManager.Init()
if err != nil {
panic("Failed initializing shard manager: " + err.Error())
Expand All @@ -81,6 +88,8 @@ func (s *SlaveImpl) StopShard(shard int) (sessionID string, sequence int64) {

numShards := ShardManager.GetNumShards()

started := time.Now()

// Send the guilds on this shard
guildsToSend := make([]*dstate.GuildState, 0)
State.RLock()
Expand Down Expand Up @@ -113,6 +122,8 @@ func (s *SlaveImpl) StopShard(shard int) (sessionID string, sequence int64) {
State.Unlock()
}

logrus.Println("Took ", time.Since(started), " to transfer ", len(guildsToSend), "guildstates")

sessionID, sequence = ShardManager.Sessions[shard].GatewayManager.GetSessionInfo()
return
}
Expand Down
23 changes: 23 additions & 0 deletions cmd/yagmaster/listensignal_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package main

import (
"github.com/sirupsen/logrus"
"os"
"os/signal"
"syscall"
)

func ListenSignal() {

sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill, syscall.SIGUSR1)
for {
sign := <-sc
if sign == syscall.SIGUSR1 {
LaunchNewSlave()
} else {
logrus.Println("Got ", sign.String())
os.Exit(0)
}
}
}
4 changes: 4 additions & 0 deletions cmd/yagmaster/listensignal_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package main

func ListenSignal() {
}
7 changes: 7 additions & 0 deletions cmd/yagmaster/yagmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,12 @@ func main() {
ForceColors: true,
})

go ListenSignal()

master.Listen(os.Getenv("YAGPDB_MASTER_LISTEN_ADDR"))
}

func LaunchNewSlave() {
logrus.Println("Launching new slave")
go master.StartSlave()
}
82 changes: 42 additions & 40 deletions cmd/yagpdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,24 @@ import (
"github.com/jonas747/yagpdb/feeds"
"github.com/jonas747/yagpdb/web"
// Plugin imports
"github.com/jonas747/yagpdb/automod"
"github.com/jonas747/yagpdb/autorole"
"github.com/jonas747/yagpdb/aylien"
"github.com/jonas747/yagpdb/commands"
"github.com/jonas747/yagpdb/customcommands"
"github.com/jonas747/yagpdb/discordlogger"
"github.com/jonas747/yagpdb/logs"
"github.com/jonas747/yagpdb/moderation"
"github.com/jonas747/yagpdb/notifications"
"github.com/jonas747/yagpdb/reddit"
"github.com/jonas747/yagpdb/reminders"
"github.com/jonas747/yagpdb/reputation"
"github.com/jonas747/yagpdb/rolecommands"
"github.com/jonas747/yagpdb/serverstats"
"github.com/jonas747/yagpdb/soundboard"
"github.com/jonas747/yagpdb/stdcommands"
"github.com/jonas747/yagpdb/streaming"
"github.com/jonas747/yagpdb/youtube"
// "github.com/jonas747/yagpdb/automod"
// "github.com/jonas747/yagpdb/autorole"
// "github.com/jonas747/yagpdb/aylien"
// "github.com/jonas747/yagpdb/commands"
// "github.com/jonas747/yagpdb/customcommands"
// "github.com/jonas747/yagpdb/discordlogger"
// "github.com/jonas747/yagpdb/logs"
// "github.com/jonas747/yagpdb/moderation"
// "github.com/jonas747/yagpdb/notifications"
// "github.com/jonas747/yagpdb/reddit"
// "github.com/jonas747/yagpdb/reminders"
// "github.com/jonas747/yagpdb/reputation"
// "github.com/jonas747/yagpdb/rolecommands"
// "github.com/jonas747/yagpdb/serverstats"
// "github.com/jonas747/yagpdb/soundboard"
// "github.com/jonas747/yagpdb/stdcommands"
// "github.com/jonas747/yagpdb/streaming"
// "github.com/jonas747/yagpdb/youtube"
)

var (
Expand Down Expand Up @@ -76,6 +76,8 @@ func main() {
ForceColors: true,
})

AddSyslogHooks()

if os.Getenv("YAGPDB_SENTRY_DSN") != "" {
hook, err := logrus_sentry.NewSentryHook(os.Getenv("YAGPDB_SENTRY_DSN"), []log.Level{
log.PanicLevel,
Expand Down Expand Up @@ -119,32 +121,32 @@ func main() {
//BotSession.LogLevel = discordgo.LogInformational

// Setup plugins
discordlogger.Register()
commands.RegisterPlugin()
stdcommands.RegisterPlugin()
serverstats.RegisterPlugin()
notifications.RegisterPlugin()
customcommands.RegisterPlugin()
reddit.RegisterPlugin()
moderation.RegisterPlugin()
reputation.RegisterPlugin()
aylien.RegisterPlugin()
streaming.RegisterPlugin()
automod.RegisterPlugin()
logs.InitPlugin()
autorole.RegisterPlugin()
reminders.RegisterPlugin()
soundboard.RegisterPlugin()
youtube.RegisterPlugin()
rolecommands.RegisterPlugin()
// discordlogger.Register()
// commands.RegisterPlugin()
// stdcommands.RegisterPlugin()
// serverstats.RegisterPlugin()
// notifications.RegisterPlugin()
// customcommands.RegisterPlugin()
// reddit.RegisterPlugin()
// moderation.RegisterPlugin()
// reputation.RegisterPlugin()
// aylien.RegisterPlugin()
// streaming.RegisterPlugin()
// automod.RegisterPlugin()
// logs.InitPlugin()
// autorole.RegisterPlugin()
// reminders.RegisterPlugin()
// soundboard.RegisterPlugin()
// youtube.RegisterPlugin()
// rolecommands.RegisterPlugin()

if flagDryRun {
log.Println("This is a dry run, exiting")
return
}

// Setup plugins for bot, but run later if enabled
commands.InitCommands()
// commands.InitCommands()
mqueue.InitStores()

// RUN FORREST RUN
Expand All @@ -158,8 +160,8 @@ func main() {
}

if flagRunBot || flagRunEverything {
botrest.RegisterPlugin()
bot.Run()
go botrest.StartServer()
go mqueue.StartPolling()
}

Expand Down Expand Up @@ -205,8 +207,8 @@ func listenSignal() {
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

<-c
log.Info("SHUTTING DOWN...")
sig := <-c
log.Info("SHUTTING DOWN... ", sig.String())

shouldWait := false
var wg sync.WaitGroup
Expand Down
5 changes: 5 additions & 0 deletions cmd/yagpdb/yagpdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// +build !linux

package main

func AddSyslogHooks() {}
17 changes: 17 additions & 0 deletions cmd/yagpdb/yagpdb_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package main

import (
"github.com/sirupsen/logrus"
lsyslog "github.com/sirupsen/logrus/hooks/syslog"
"log/syslog"
)

func AddSyslogHooks() {
hook, err := lsyslog.NewSyslogHook("", "", syslog.LOG_INFO, "")

if err == nil {
logrus.AddHook(hook)
} else {
logrus.Println("failed initializing syslog hook")
}
}
8 changes: 4 additions & 4 deletions master/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ func (c *Conn) Listen() {
}
}

c.MessageHandler(&Message{EvtID: id, Body: body})
c.MessageHandler(&Message{EvtID: EventType(id), Body: body})
}
}

// Send sends the specified message over the connection, marshaling the data using json
// this locks the writer
func (c *Conn) Send(evtID uint32, data interface{}) error {
func (c *Conn) Send(evtID EventType, data interface{}) error {
c.sendmu.Lock()
defer c.sendmu.Unlock()

Expand All @@ -85,12 +85,12 @@ func (c *Conn) Send(evtID uint32, data interface{}) error {

// SendNoLock sends the specified message over the connection, marshaling the data using json
// This does no locking and the caller is responsible for making sure its not called in multiple goroutines at the same time
func (c *Conn) SendNoLock(evtID uint32, data interface{}) error {
func (c *Conn) SendNoLock(evtID EventType, data interface{}) error {

var buf bytes.Buffer

tmpBuf := make([]byte, 4)
binary.LittleEndian.PutUint32(tmpBuf, evtID)
binary.LittleEndian.PutUint32(tmpBuf, uint32(evtID))
buf.Write(tmpBuf)

l := uint32(0)
Expand Down
Loading

0 comments on commit f42e076

Please sign in to comment.