Skip to content

Commit

Permalink
botrest: botrest now uses service discovery, stores listen address fo…
Browse files Browse the repository at this point in the history
…r shards in redis
  • Loading branch information
jogramming committed Oct 23, 2018
1 parent 1b87fdb commit 7b3fc28
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 32 deletions.
2 changes: 1 addition & 1 deletion autorole/plugin_botrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ func botRestPostFullScan(guildID int64) error {
return ErrAlreadyProcessingFullGuild
}

err = botrest.Post(strconv.FormatInt(guildID, 10)+"/autorole/fullscan", nil, nil)
err = botrest.Post(bot.GuildShardID(guildID), strconv.FormatInt(guildID, 10)+"/autorole/fullscan", nil, nil)
return err
}
6 changes: 0 additions & 6 deletions bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,3 @@ func GuildCountsFunc() []int {

return result
}

// IsGuildOnCurrentProcess returns whether the guild is on one of the shards for this process
func IsGuildOnCurrentProcess(guildID int64) bool {
shardID := int((guildID >> 22) % int64(TotalShardCount))
return shardID >= RunShardOffset && shardID < RunShardOffset+ProcessShardCount
}
9 changes: 9 additions & 0 deletions bot/botrest/botrest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package botrest

import (
"strconv"
)

func RedisKeyShardAddressMapping(shardID int) string {
return "botrest_shard_mapping:" + strconv.Itoa(shardID)
}
55 changes: 42 additions & 13 deletions bot/botrest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"encoding/json"
"fmt"
"github.com/jonas747/discordgo"
"github.com/jonas747/yagpdb/bot"
"github.com/jonas747/yagpdb/common"
"github.com/mediocregopher/radix.v3"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"net/http"
Expand All @@ -15,10 +18,31 @@ import (
)

var (
ErrServerError = errors.New("reststate server is having issues")
ErrServerError = errors.New("botrest server is having issues")
ErrCantFindServer = errors.New("can't find botrest server for provided shard")
)

func Get(url string, dest interface{}) error {
func GetServerAddrForGuild(guildID int64) string {
shard := bot.GuildShardID(guildID)
return GetServerAddrForShard(shard)
}

func GetServerAddrForShard(shard int) string {
resp := ""
err := common.RedisPool.Do(radix.Cmd(&resp, "GET", RedisKeyShardAddressMapping(shard)))
if err != nil {
logrus.WithError(err).Error("[botrest] failed retrieving shard server addr")
}

return resp
}

func Get(shard int, url string, dest interface{}) error {
serverAddr := GetServerAddrForShard(shard)
if serverAddr == "" {
return ErrCantFindServer
}

resp, err := http.Get("http://" + serverAddr + "/" + url)
if err != nil {
return err
Expand All @@ -38,7 +62,12 @@ func Get(url string, dest interface{}) error {
return errors.WithMessage(json.NewDecoder(resp.Body).Decode(dest), "json.Decode")
}

func Post(url string, bodyData interface{}, dest interface{}) error {
func Post(shard int, url string, bodyData interface{}, dest interface{}) error {
serverAddr := GetServerAddrForShard(shard)
if serverAddr == "" {
return ErrCantFindServer
}

var bodyBuf bytes.Buffer
if bodyData != nil {
encoder := json.NewEncoder(&bodyBuf)
Expand Down Expand Up @@ -71,17 +100,17 @@ func Post(url string, bodyData interface{}, dest interface{}) error {
}

func GetGuild(guildID int64) (g *discordgo.Guild, err error) {
err = Get(discordgo.StrID(guildID)+"/guild", &g)
err = Get(bot.GuildShardID(guildID), discordgo.StrID(guildID)+"/guild", &g)
return
}

func GetBotMember(guildID int64) (m *discordgo.Member, err error) {
err = Get(discordgo.StrID(guildID)+"/botmember", &m)
err = Get(bot.GuildShardID(guildID), discordgo.StrID(guildID)+"/botmember", &m)
return
}

func GetOnlineCount(guildID int64) (c int64, err error) {
err = Get(discordgo.StrID(guildID)+"/onlinecount", &c)
err = Get(bot.GuildShardID(guildID), discordgo.StrID(guildID)+"/onlinecount", &c)
return
}

Expand All @@ -94,7 +123,7 @@ func GetMembers(guildID int64, members ...int64) (m []*discordgo.Member, err err
query := url.Values{"users": stringed}
encoded := query.Encode()

err = Get(discordgo.StrID(guildID)+"/members?"+encoded, &m)
err = Get(bot.GuildShardID(guildID), discordgo.StrID(guildID)+"/members?"+encoded, &m)
return
}

Expand All @@ -109,17 +138,17 @@ func GetMemberColors(guildID int64, members ...int64) (m map[string]int, err err
query := url.Values{"users": stringed}
encoded := query.Encode()

err = Get(discordgo.StrID(guildID)+"/membercolors?"+encoded, &m)
err = Get(bot.GuildShardID(guildID), discordgo.StrID(guildID)+"/membercolors?"+encoded, &m)
return
}

func GetChannelPermissions(guildID, channelID int64) (perms int64, err error) {
err = Get(discordgo.StrID(guildID)+"/channelperms/"+discordgo.StrID(channelID), &perms)
err = Get(bot.GuildShardID(guildID), discordgo.StrID(guildID)+"/channelperms/"+discordgo.StrID(channelID), &perms)
return
}

func GetShardStatuses() (st []*ShardStatus, err error) {
err = Get("gw_status", &st)
err = Get(0, "gw_status", &st)
return
}

Expand All @@ -129,7 +158,7 @@ func SendReconnectShard(shardID int, reidentify bool) (err error) {
queryParams = "?reidentify=1"
}

err = Post(fmt.Sprintf("shard/%d/reconnect"+queryParams, shardID), nil, nil)
err = Post(shardID, fmt.Sprintf("shard/%d/reconnect"+queryParams, shardID), nil, nil)
return
}

Expand All @@ -139,7 +168,7 @@ func SendReconnectAll(reidentify bool) (err error) {
queryParams = "?reidentify=1"
}

err = Post("shard/*/reconnect"+queryParams, nil, nil)
err = Post(0, "shard/*/reconnect"+queryParams, nil, nil)
return
}

Expand All @@ -154,7 +183,7 @@ func RunPinger() {
time.Sleep(time.Second)

var dest string
err := Get("ping", &dest)
err := Get(0, "ping", &dest)
if err != nil {
if !lastFailed {
logrus.Warn("Ping to bot failed: ", err)
Expand Down
75 changes: 67 additions & 8 deletions bot/botrest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/jonas747/dutil"
"github.com/jonas747/yagpdb/bot"
"github.com/jonas747/yagpdb/common"
"github.com/mediocregopher/radix.v3"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"goji.io"
Expand All @@ -21,8 +22,6 @@ import (
"time"
)

var serverAddr = ":5002"

func RegisterPlugin() {
common.RegisterPlugin(&Plugin{})
}
Expand All @@ -37,7 +36,8 @@ type BotRestPlugin interface {
}

type Plugin struct {
srv *http.Server
srv *http.Server
srvMU sync.Mutex
}

func (p *Plugin) Name() string {
Expand Down Expand Up @@ -74,17 +74,76 @@ func (p *Plugin) BotInit() {
}

p.srv = &http.Server{
Addr: serverAddr,
Handler: muxer,
}

currentPort := 5010

go func() {
logrus.Println("starting botrest on ", serverAddr)
err := p.srv.ListenAndServe()
if err != nil {
logrus.WithError(err).Error("Failed starting botrest http server")
for {
address := ":" + strconv.Itoa(currentPort)

logrus.Println("[botrest] starting botrest on ", address)

p.srvMU.Lock()
p.srv.Addr = address
p.srvMU.Unlock()

err := p.srv.ListenAndServe()
if err != nil {
// Shutdown was called for graceful shutdown
if err == http.ErrServerClosed {
logrus.Info("[botrest] server closed, shutting down...")
return
}

// Retry with a higher port until we succeed
logrus.WithError(err).Error("[botrest] failed starting botrest http server on ", address, " trying again on next port")
currentPort++
time.Sleep(time.Millisecond)
continue
}

logrus.Println("[botrest] botrest returned without any error")
break
}
}()

// Wait for the server address to stop changing
go func() {
lastAddr := ""
lastChange := time.Now()
for {
p.srvMU.Lock()
addr := p.srv.Addr
p.srvMU.Unlock()

if lastAddr != addr {
lastAddr = addr
time.Sleep(time.Second)
lastChange = time.Now()
continue
}

if time.Since(lastChange) > time.Second*5 {
// found avaiable port
p.mapAddressToShards(lastAddr)
return
}

time.Sleep(time.Second)
}
}()
}

func (p *Plugin) mapAddressToShards(address string) {
logrus.Info("[botrest] mapping ", address, " to current process shards")
for i := bot.RunShardOffset; i < bot.ProcessShardCount; i++ {
err := common.RedisPool.Do(radix.Cmd(nil, "SET", RedisKeyShardAddressMapping(i), address))
if err != nil {
logrus.WithError(err).Error("[botrest] failed mapping botrest")
}
}
}

func (p *Plugin) StopBot(wg *sync.WaitGroup) {
Expand Down
12 changes: 12 additions & 0 deletions bot/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,15 @@ func SendMessageEmbedGS(gs *dstate.GuildState, channelID int64, msg *discordgo.M
permsOK = true
return
}

// IsGuildOnCurrentProcess returns whether the guild is on one of the shards for this process
func IsGuildOnCurrentProcess(guildID int64) bool {
shardID := int((guildID >> 22) % int64(TotalShardCount))
return shardID >= RunShardOffset && shardID < RunShardOffset+ProcessShardCount
}

// GuildShardID returns the shard id for the provided guild id
func GuildShardID(guildID int64) int {
shardID := int((guildID >> 22) % int64(TotalShardCount))
return shardID
}
10 changes: 6 additions & 4 deletions common/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package pubsub
import (
"encoding/json"
"fmt"
"github.com/jonas747/yagpdb/bot"
"github.com/jonas747/yagpdb/common"
"github.com/mediocregopher/radix.v3"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -87,9 +88,6 @@ func PollEvents() {
handleEvent(string(msg.Message))
}
}

for {
}
}

func handleEvent(evt string) {
Expand All @@ -104,6 +102,11 @@ func handleEvent(evt string) {
name := split[1]
data := split[2]

parsedTarget, _ := strconv.ParseInt(target, 10, 64)
if !bot.IsGuildOnCurrentProcess(parsedTarget) {
return
}

t, ok := eventTypes[name]
if !ok && data != "" {
// No handler for this event
Expand All @@ -130,7 +133,6 @@ func handleEvent(evt string) {
Data: decoded,
}

parsedTarget, _ := strconv.ParseInt(target, 10, 64)
event.TargetGuildInt = parsedTarget

defer func() {
Expand Down

0 comments on commit 7b3fc28

Please sign in to comment.