Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logcli: Add parallel flags #8518

Merged
merged 15 commits into from
Feb 16, 2023
Prev Previous commit
Next Next commit
Add merge, and keep-parts flags
  • Loading branch information
angaz committed Feb 16, 2023
commit 12a2bb0fc2c1fc9d49b2f4c59d43300fa9e288aa
51 changes: 50 additions & 1 deletion cmd/logcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,54 @@ data points between the start and end query time. This output is used to
build graphs, similar to what is seen in the Grafana Explore graph view.
If you are querying metrics and just want the most recent data point
(like what is seen in the Grafana Explore table view), then you should use
the "instant-query" command instead.`)
the "instant-query" command instead.

Parallelization:

You can download logs in parallel, there are a few flags which control this
angaz marked this conversation as resolved.
Show resolved Hide resolved
behaviour:

--parallel-duration
--parallel-max-workers
--part-file-prefix
--overwrite-completed-parts
--merge-part-files
--keep-part-files

Refer to the help of these specific flags to understand what each of them do.

Example:

logcli query
--timezone=UTC
--from="2021-01-19T10:00:00Z"
--to="2021-01-19T20:00:00Z"
--output=jsonl
--parallel-duration="15m"
--parallel-max-workers="10"
--part-file-prefix="/tmp/my_query"
--merge-part-files
'my-query'

This will start 10 workers, and they will each start downloading 15 minute
slices of the specified time range.

Each worker will save a "part" file to the location specified in the prefix.
Different prefixes can be used to run multiple queries at the same time.
The timestamp of the start and end of the part is in the file name.
While the part is being downloaded, the filename will end in ".part", when it
angaz marked this conversation as resolved.
Show resolved Hide resolved
is complete, the file will be renamed to remove this ".part" extension.
By default, if a completed part file is found, that part will not be downloaded
again. This can be overridden with the --overwrite-completed-parts flag.

If you do not specify the --merge-part-files flag, the part files will be
downloaded, and logcli will exit, and you can process the files as you wish.
With the flag specified, the part files will be read in order, and the output
printed to the terminal. The lines will be printed as soon as the next part is
complete, you don't have to wait for all the parts to download before getting
output. --merge-part-files will remove the part files when it is done reading
each of them, to change this, you can add --keep-part-files and the part file
wParallelizationill not be removed.`)
rangeQuery = newQuery(false, queryCmd)
tail = queryCmd.Flag("tail", "Tail the logs").Short('t').Default("false").Bool()
follow = queryCmd.Flag("follow", "Alias for --tail").Short('f').Default("false").Bool()
Expand Down Expand Up @@ -380,6 +427,8 @@ func newQuery(instant bool, cmd *kingpin.CmdClause) *query.Query {
cmd.Flag("parallel-max-workers", "Max number of workers to start up for parallel jobs. A value of 1 will not create any parallel workers.").Default("1").IntVar(&q.ParallelMaxWorkers)
cmd.Flag("part-file-prefix", "When set, each server response will be saved to a file with this prefix. Creates files in the format: 'prefix-unix_start-unix_end.part'. Intended to be used with the parallel-* flags so that you can combine the files to maintain ordering based on the filename. Default is to write to stdout.").StringVar(&q.PartFilePrefix)
cmd.Flag("overwrite-completed-parts", "Overwrites completed part files. This will download the range again, and replace the original completed part file. Default will skip a range if it's part file is already downloaded.").Default("false").BoolVar(&q.OverwriteCompleted)
cmd.Flag("merge-part-files", "Reads the part files in order and writes the output to stdout. Original part files will be deleted with this option.").Default("false").BoolVar(&q.MergePartFiles)
cmd.Flag("keep-part-files", "Overrides the default behaviour of --merge-part-files which will delete the part files once all the files have been read. This option will keep the part files.").Default("false").BoolVar(&q.KeepPartFiles)
}

cmd.Flag("forward", "Scan forwards through logs.").Default("false").BoolVar(&q.Forward)
Expand Down
16 changes: 13 additions & 3 deletions pkg/logcli/query/part_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@ import (
"errors"
"fmt"
"os"
"sync"
)

// PartFile partially complete file.
// Expected usage:
// 1. Create the temp file: CreateTemp
// 2. Write the data to the file
// 3. When you're done, call Complete and the temp file will be renamed
// 3. When you're done, call Complete and the temp file will be closed and renamed
type PartFile struct {
fd *os.File
completeName string
fd *os.File
lock sync.Mutex
}

// NewPartFile creates a new partial file, setting the filename which will be used
// when the file is closed with the Complete function.
func NewPartFile(filename string) *PartFile {
return &PartFile{
fd: nil,
completeName: filename,
}
}
Expand All @@ -41,6 +42,9 @@ func (f *PartFile) Exists() (bool, error) {

// CreateTemp creates the temp file to store the data before Complete is called.
func (f *PartFile) CreateTemp() error {
f.lock.Lock()
defer f.lock.Unlock()

tmpName := f.completeName + ".tmp"

fd, err := os.Create(tmpName)
Expand All @@ -61,6 +65,9 @@ func (f *PartFile) Write(b []byte) (int, error) {
// Double close is handled gracefully without error so that Close can be deferred for errors,
// and is also called when Complete is called.
func (f *PartFile) Close() error {
f.lock.Lock()
defer f.lock.Unlock()

// Prevent double close
if f.fd == nil {
return nil
Expand Down Expand Up @@ -88,6 +95,9 @@ func (f *PartFile) Complete() error {
return fmt.Errorf("failed to close part file: %s: %s", tmpFileName, err)
}

f.lock.Lock()
defer f.lock.Unlock()

if err := os.Rename(tmpFileName, f.completeName); err != nil {
return fmt.Errorf("failed to rename part file: %s: %s", tmpFileName, err)
}
Expand Down
161 changes: 139 additions & 22 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"sort"
Expand Down Expand Up @@ -63,10 +64,31 @@ type Query struct {
ColoredOutput bool
LocalConfig string
FetchSchemaFromStorage bool
ParallelDuration time.Duration
ParallelMaxWorkers int
PartFilePrefix string
OverwriteCompleted bool

// Parallelization parameters.

// The duration of each part/job.
ParallelDuration time.Duration

// Number of workers to start.
ParallelMaxWorkers int

// Prefix of the name for each part file.
// The idea for this is to allow the user to download many different queries at the same
// time, and/or give a directory for the part files to be placed.
PartFilePrefix string

// By default (false value), if the part file has finished downloading, and another job with
// the same filename is run, it will skip the completed files. This will remove the completed
// files as each worker gets to that part file, so the part will be downloaded again.
OverwriteCompleted bool

// If true, the part files will be read in order, and the data will be output to stdout.
MergePartFiles bool

// If MergeParts is false, this parameter has no effect, part files will be kept.
// Otherwise, if this is true, the part files will not be deleted once they have been merged.
KeepPartFiles bool
}

// DoQuery executes the query and prints out the results
Expand Down Expand Up @@ -95,7 +117,6 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool)

if partFile != nil {
defer partFile.Close()

out = out.WithWriter(partFile)
}

Expand Down Expand Up @@ -220,11 +241,104 @@ func (q *Query) createPartFile() (*PartFile, bool) {
return partFile, false
}

func (q *Query) DoQueryParallel(c client.Client, out output.LogOutput, statistics bool) {
// rounds up duration d by the multiple m, and then divides by m.
func durationCeilDiv(d, m time.Duration) int64 {
return int64((d + m - 1) / m)
angaz marked this conversation as resolved.
Show resolved Hide resolved
}

// Returns the next job's start and end times.
func (q *Query) nextJob(start, end time.Time) (time.Time, time.Time) {
if q.Forward {
start = end
return start, minTime(start.Add(q.ParallelDuration), q.End)
}

end = start
return maxTime(end.Add(-q.ParallelDuration), q.Start), end
}

type parallelJob struct {
q Query
done chan struct{}
}

func newParallelJob(q Query) *parallelJob {
return &parallelJob{
q: q,
done: make(chan struct{}),
}
}

func (j *parallelJob) run(c client.Client, out output.LogOutput, statistics bool) {
j.q.DoQuery(c, out, statistics)
angaz marked this conversation as resolved.
Show resolved Hide resolved
j.done <- struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SN9NV this is the deadlock. An unbuffered channel will write into the target when read. That means it blocks here until the channel is read. However, we don't want to wait for that. See my fix #8553.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just came here to report the same thing, but you beat me to it. 😂

I was testing with very large number of threads, so this was kind of less noticeable. Smaller numbers makes the problem much more visible. So yeah, good catch.

}

func (q *Query) parallelJobs() (chan Query, []*parallelJob) {
nJobs := durationCeilDiv(q.End.Sub(q.Start), q.ParallelDuration)
jobsChan := make(chan Query, nJobs)
jobsArr := make([]*parallelJob, 0, nJobs)

// Normally `nextJob` will swap the start/end to get the next job. Here, we swap them
// on input so that we calculate the starting job instead of the next job.
start, end := q.nextJob(q.End, q.Start)

// Queue up jobs
for i := nJobs; i != 0; i-- {
rq := *q
rq.Start = start
rq.End = end

jobsChan <- rq
jobsArr = append(jobsArr, newParallelJob(rq))

start, end = q.nextJob(start, end)
}

close(jobsChan)

return jobsChan, jobsArr
}

// Waits for each job to finish in order, reads the part file and copies it to stdout
func (q *Query) mergeJobs(jobs []*parallelJob) error {
if !q.MergePartFiles {
return nil
}

for _, job := range jobs {
// wait for the next job to finish
<-job.done

f, err := os.Open(job.q.outputFilename())
angaz marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("open file error: %w", err)
}

_, err = io.Copy(os.Stdout, f)
if err != nil {
return fmt.Errorf("copying file error: %w", err)
}

if !q.KeepPartFiles {
err := os.Remove(job.q.outputFilename())
if err != nil {
return fmt.Errorf("removing file error: %w", err)
}
}
}

return nil
}

func (q *Query) startWorkers(
jobs chan Query,
c client.Client,
out output.LogOutput,
statistics bool,
) *sync.WaitGroup {
wg := sync.WaitGroup{}
jobs := make(chan Query)

// Start workers
for w := 0; w < q.ParallelMaxWorkers; w++ {
wg.Add(1)

Expand All @@ -236,25 +350,21 @@ func (q *Query) DoQueryParallel(c client.Client, out output.LogOutput, statistic
}()
}

start := q.Start
end := minTime(start.Add(q.ParallelDuration), q.End)
return &wg
}

// Queue up jobs
for {
rq := *q
rq.Start = start
rq.End = end
func (q *Query) DoQueryParallel(c client.Client, out output.LogOutput, statistics bool) {
if q.ParallelDuration < 1 {
log.Fatalf("Parallel duration has to be a positive value\n")
}

jobs <- rq
jobsChan, jobs := q.parallelJobs()

if end.Equal(q.End) {
break
}
wg := q.startWorkers(jobsChan, c, out, statistics)

start = end
end = minTime(start.Add(q.ParallelDuration), q.End)
if err := q.mergeJobs(jobs); err != nil {
log.Fatalf("Merging part files error: %s\n", err)
}
close(jobs)

wg.Wait()
}
Expand All @@ -266,6 +376,13 @@ func minTime(t1, t2 time.Time) time.Time {
return t2
}

func maxTime(t1, t2 time.Time) time.Time {
if t1.After(t2) {
return t1
}
return t2
}

func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
length := -1
var entry []*loghttp.Entry
Expand Down
1 change: 1 addition & 0 deletions pkg/logcli/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package query
import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
"reflect"
Expand Down