Skip to content

Commit

Permalink
* progress report on channels and threads
Browse files Browse the repository at this point in the history
* speed and average speed
* same output style for messages, channels, and threads
* tweak the tier-2 boost
* fetch channels with context to enable ^C termination
  • Loading branch information
rusq committed Feb 24, 2022
1 parent 9feb561 commit 95675ce
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 22 deletions.
17 changes: 15 additions & 2 deletions channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"runtime/trace"
"strings"
"text/tabwriter"
"time"

"github.com/rusq/dlog"
"github.com/slack-go/slack"
)

Expand All @@ -32,25 +34,36 @@ func (sd *SlackDumper) getChannels(ctx context.Context, chanTypes []string) (Cha

params := &slack.GetConversationsParameters{Types: chanTypes, Limit: sd.options.ChannelsPerReq}
allChannels := make([]slack.Channel, 0, 50)
for {
fetchStart := time.Now()
for i := 1; ; i++ {
var (
chans []slack.Channel
nextcur string
)
reqStart := time.Now()
if err := withRetry(ctx, limiter, sd.options.Tier3Retries, func() error {
var err error
trace.WithRegion(ctx, "GetConversations", func() {
chans, nextcur, err = sd.client.GetConversations(params)
chans, nextcur, err = sd.client.GetConversationsContext(ctx, params)
})
return err

}); err != nil {
return nil, err
}
allChannels = append(allChannels, chans...)

dlog.Printf("channels request #%5d, fetched: %4d, total: %8d (speed: %6.2f/sec, avg: %6.2f/sec)\n",
i, len(chans), len(allChannels),
float64(len(chans))/float64(time.Since(reqStart).Seconds()),
float64(len(allChannels))/float64(time.Since(fetchStart).Seconds()),
)

if nextcur == "" {
dlog.Printf("channels fetch complete, total: %d channels", len(allChannels))
break
}

params.Cursor = nextcur
limiter.Wait(ctx)
}
Expand Down
4 changes: 2 additions & 2 deletions channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestSlackDumper_getChannels(t *testing.T) {
allChanTypes,
},
func(mc *mockClienter) {
mc.EXPECT().GetConversations(&slack.GetConversationsParameters{
mc.EXPECT().GetConversationsContext(context.Background(), &slack.GetConversationsParameters{
Types: allChanTypes,
}).Return(Channels{
slack.Channel{GroupConversation: slack.GroupConversation{
Expand All @@ -58,7 +58,7 @@ func TestSlackDumper_getChannels(t *testing.T) {
allChanTypes,
},
func(mc *mockClienter) {
mc.EXPECT().GetConversations(&slack.GetConversationsParameters{
mc.EXPECT().GetConversationsContext(context.Background(), &slack.GetConversationsParameters{
Types: allChanTypes,
}).Return(
nil,
Expand Down
12 changes: 6 additions & 6 deletions clienter_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion cmd/slackdump/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/signal"
"path/filepath"
"runtime/trace"
"syscall"

"github.com/rusq/slackdump"
"github.com/rusq/slackdump/internal/app"
Expand Down Expand Up @@ -56,7 +57,7 @@ func main() {
dlog.SetDebug(true)
}

ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

if err := run(ctx, params); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/joho/godotenv v1.4.0
github.com/pkg/errors v0.9.1
github.com/rusq/dlog v1.3.3
github.com/slack-go/slack v0.9.5
github.com/slack-go/slack v0.10.2
github.com/stretchr/testify v1.4.0
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11
)
Expand Down
2 changes: 1 addition & 1 deletion internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (app *App) Run(ctx context.Context) error {
if err != nil {
return err
}
dlog.Printf("job finished, dumped %d channels", n)
dlog.Printf("job finished, dumped %d item(s)", n)
}
return nil
}
Expand Down
26 changes: 21 additions & 5 deletions messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,16 @@ func (sd *SlackDumper) dumpMessages(ctx context.Context, channelID string, oldes
}

var (
messages []Message
cursor string
messages []Message
cursor string
fetchStart = time.Now()
)
for i := 1; ; i++ {
var (
resp *slack.GetConversationHistoryResponse
params = sd.convHistoryParams(channelID, cursor, oldest, latest)
)
reqStart := time.Now()
if err := withRetry(ctx, convLimiter, sd.options.Tier3Retries, func() error {
var err error
trace.WithRegion(ctx, "GetConversationHistoryContext", func() {
Expand All @@ -165,10 +167,14 @@ func (sd *SlackDumper) dumpMessages(ctx context.Context, channelID string, oldes
sd.pipeFiles(filesC, chunk)
messages = append(messages, chunk...)

dlog.Printf("request #%5d, fetched: %4d, (with threads: %4d) total: %8d\n",
i, len(resp.Messages), threads, len(messages))
dlog.Printf("messages request #%5d, fetched: %4d (with threads: %4d), total: %8d (speed: %6.2f/sec, avg: %6.2f/sec)\n",
i, len(resp.Messages), threads, len(messages),
float64(len(resp.Messages))/float64(time.Since(reqStart).Seconds()),
float64(len(messages))/float64(time.Since(fetchStart).Seconds()),
)

if !resp.HasMore {
dlog.Printf("messages fetch complete, total: %d", len(messages))
break
}

Expand Down Expand Up @@ -351,12 +357,14 @@ func (sd *SlackDumper) dumpThread(ctx context.Context, l *rate.Limiter, channelI
var thread []Message

var cursor string
for {
fetchStart := time.Now()
for i := 1; ; i++ {
var (
msgs []slack.Message
hasmore bool
nextCursor string
)
reqStart := time.Now()
if err := withRetry(ctx, l, sd.options.Tier3Retries, func() error {
var err error
trace.WithRegion(ctx, "GetConversationRepliesContext", func() {
Expand All @@ -371,7 +379,15 @@ func (sd *SlackDumper) dumpThread(ctx context.Context, l *rate.Limiter, channelI
}

thread = append(thread, sd.convertMsgs(msgs)...)

dlog.Printf(" thread request #%5d, fetched: %4d, total: %8d (speed: %6.2f/sec, avg: %6.2f/sec)\n",
i, len(msgs), len(thread),
float64(len(msgs))/float64(time.Since(reqStart).Seconds()),
float64(len(thread))/float64(time.Since(fetchStart).Seconds()),
)

if !hasmore {
dlog.Printf(" thread fetch complete, total: %d", len(thread))
break
}
cursor = nextCursor
Expand Down
4 changes: 2 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ var DefOptions = Options{
DumpFiles: false,
Workers: defNumWorkers, // number of workers doing the file download
DownloadRetries: 3, // this shouldn't even happen, as we have no limiter on files download.
Tier2Boost: 0, // slack is being difficult, so no boost for tier 2.
Tier2Boost: 20, // seems to work fine with this boost
Tier2Burst: 1, // limiter will wait indefinitely if it is less than 1.
Tier2Retries: 20, // see #28, sometimes slack is being difficult
Tier3Boost: 120, // playing safe there, but generally value of 120 is fine.
Tier3Burst: 1, // safe value, who would ever want to modify it? I don't know.
Tier3Retries: 3, // on tier 3 this was never a problem, even with limiter-boost=120
ConversationsPerReq: 200, // this is the recommended value by Slack. But who listens to them anyway.
ChannelsPerReq: 500, // channels are tier2 rate limited. We need to get as much per request as possible.
ChannelsPerReq: 100, // channels are tier2 rate limited. Slack is greedy and never returns more than 100 per call.
UserCacheFilename: "users.json", // seems logical
MaxUserCacheAge: 4 * time.Hour, // quick math: that's 1/6th of a day, how's that, huh?
}
Expand Down
4 changes: 2 additions & 2 deletions slackdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type clienter interface {
GetConversationInfoContext(ctx context.Context, channelID string, includeLocale bool) (*slack.Channel, error)
GetConversationHistoryContext(ctx context.Context, params *slack.GetConversationHistoryParameters) (*slack.GetConversationHistoryResponse, error)
GetConversationRepliesContext(ctx context.Context, params *slack.GetConversationRepliesParameters) (msgs []slack.Message, hasMore bool, nextCursor string, err error)
GetConversations(params *slack.GetConversationsParameters) (channels []slack.Channel, nextCursor string, err error)
GetConversationsContext(ctx context.Context, params *slack.GetConversationsParameters) (channels []slack.Channel, nextCursor string, err error)
GetFile(downloadURL string, writer io.Writer) error
GetUsers() ([]slack.User, error)
}
Expand Down Expand Up @@ -130,7 +130,7 @@ func withRetry(ctx context.Context, l *rate.Limiter, maxAttempts int, fn func()

msg := fmt.Sprintf("got rate limited, sleeping %s", rle.RetryAfter)
trace.Log(ctx, "info", msg)
dlog.Debug(msg)
dlog.Print(msg)

time.Sleep(rle.RetryAfter)
}
Expand Down

0 comments on commit 95675ce

Please sign in to comment.