Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lambda-promtail: Add ability to ingest logs from S3 #5065

Merged
merged 26 commits into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fd80dd0
Add ability to ingest logs from S3 on lambda-promtail
AndreZiviani Jan 6, 2022
a2fec89
fix ci
AndreZiviani Jan 6, 2022
774c5e8
fix typo
AndreZiviani Jan 6, 2022
611d604
bump golang and alpine version
AndreZiviani Jan 6, 2022
dc1b849
update changelog
AndreZiviani Jan 7, 2022
b42243e
add s3 permissions on terraform
AndreZiviani Jan 7, 2022
0ab4647
use for_each instead of count
AndreZiviani Jan 7, 2022
394b6c7
fix typo
AndreZiviani Jan 7, 2022
74f88ac
improve function naming
AndreZiviani Jan 13, 2022
ea56137
add documentation and an example of a s3 file path
AndreZiviani Jan 24, 2022
8331cc1
refact logic to identify event type
AndreZiviani Jan 24, 2022
373eb18
add missing iam permission to allow lambda to run inside a vpc
AndreZiviani Jan 24, 2022
e57b6f6
fix typo
AndreZiviani Jan 24, 2022
bdfda42
allow lambda to access only specified s3 buckets
AndreZiviani Jan 26, 2022
5e75cea
configure a default log retention policy on log group
AndreZiviani Jan 26, 2022
df9eceb
add missing depends_on to make sure iam role is created before lambda…
AndreZiviani Jan 26, 2022
5cfd06a
update docs
AndreZiviani Jan 31, 2022
6e5d6a3
fix label naming convention
AndreZiviani Jan 31, 2022
3b025ff
fix merge conflicts
AndreZiviani Jan 31, 2022
227f77c
fix merge conflict
AndreZiviani Jan 31, 2022
b99d800
use new backoff lib and update dependencies
AndreZiviani Jan 31, 2022
2c241b7
add option to limit batch size
AndreZiviani Feb 3, 2022
b443ebc
cache s3 client
AndreZiviani Feb 3, 2022
7d3c4f8
update docs and terraform
AndreZiviani Feb 3, 2022
370f116
address some feedback on PR
AndreZiviani Feb 4, 2022
b31063a
fix typo
AndreZiviani Feb 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add option to limit batch size
  • Loading branch information
AndreZiviani committed Feb 3, 2022
commit 2c241b76cf85dd44b0b9a9e323b4ede764d9086f
4 changes: 2 additions & 2 deletions tools/lambda-promtail/lambda-promtail/cw.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func parseCWEvent(ctx context.Context, b *batch, ev *events.CloudwatchLogsEvent)
for _, event := range data.LogEvents {
timestamp := time.UnixMilli(event.Timestamp)

b.add(entry{labels, logproto.Entry{
b.add(ctx, entry{labels, logproto.Entry{
Line: event.Message,
Timestamp: timestamp,
}})
Expand All @@ -38,7 +38,7 @@ func parseCWEvent(ctx context.Context, b *batch, ev *events.CloudwatchLogsEvent)
}

func processCWEvent(ctx context.Context, ev *events.CloudwatchLogsEvent) error {
batch := newBatch()
batch, _ := newBatch(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not swallow this error. Perhaps newBatch shouldn't accept entries and then newBatch won't need to return an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the batch will always be empty here it should never fail, thats why I ignored it


err := parseCWEvent(ctx, batch, ev)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions tools/lambda-promtail/lambda-promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/url"
"os"
"strconv"
"strings"

"github.com/aws/aws-lambda-go/events"
Expand All @@ -24,6 +25,7 @@ var (
writeAddress *url.URL
username, password string
keepStream bool
batchSize int
)

func init() {
Expand Down Expand Up @@ -54,6 +56,13 @@ func init() {
keepStream = true
}
fmt.Println("keep stream: ", keepStream)

batch := os.Getenv("BATCH_SIZE")
if batch != "" {
batchSize, _ = strconv.Atoi(batch)
} else {
batchSize = 131072 // 128kb
}
AndreZiviani marked this conversation as resolved.
Show resolved Hide resolved
}

func checkEventType(ev map[string]interface{}) (interface{}, error) {
Expand Down
41 changes: 33 additions & 8 deletions tools/lambda-promtail/lambda-promtail/promtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,44 @@ type entry struct {

type batch struct {
streams map[string]*logproto.Stream
size int
}

func newBatch(entries ...entry) *batch {
func newBatch(ctx context.Context, entries ...entry) (*batch, error) {
b := &batch{
streams: map[string]*logproto.Stream{},
}

for _, entry := range entries {
b.add(entry)
err := b.add(ctx, entry)
return b, err
}

return b
return b, nil
}

func (b *batch) add(e entry) {
func (b *batch) add(ctx context.Context, e entry) error {
labels := labelsMapToString(e.labels, reservedLabelTenantID)
if stream, ok := b.streams[labels]; ok {
stream.Entries = append(stream.Entries, e.entry)
return

b.size += stream.Size()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Size() method doesn't actually measure what we're expecting (it's part of the generated protobuf). Instead, I'd keep track of the line's bytel length as they're added, like so:
https://github.com/grafana/loki/blob/main/clients/pkg/promtail/client/batch.go#L44-L45

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, will fix that


} else {

b.streams[labels] = &logproto.Stream{
Labels: labels,
Entries: []logproto.Entry{e.entry},
}

b.size += b.streams[labels].Size()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit again: we could simplify to something like this

stream, ok := b.streams[labels];
if  !ok {
    b.streams[labels] = &logproto.Stream{
	Labels:  labels,
    }
    stream = b.stream[labels]
}
stream.Entries = append(stream.Entries, e.entry)
b.size += stream.Size()

}

b.streams[labels] = &logproto.Stream{
Labels: labels,
Entries: []logproto.Entry{e.entry},
if b.size > batchSize {
return b.flushBatch(ctx)
}

return nil
}

func labelsMapToString(ls model.LabelSet, without ...model.LabelName) string {
AndreZiviani marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -103,6 +116,18 @@ func (b *batch) createPushRequest() (*logproto.PushRequest, int) {
return &req, entriesCount
}

func (b *batch) flushBatch(ctx context.Context) error {
fmt.Println("flusing batch")
AndreZiviani marked this conversation as resolved.
Show resolved Hide resolved
AndreZiviani marked this conversation as resolved.
Show resolved Hide resolved
err := sendToPromtail(ctx, b)
if err != nil {
return err
}

b.streams = make(map[string]*logproto.Stream)

return nil
}

func sendToPromtail(ctx context.Context, b *batch) error {
buf, _, err := b.encode()
AndreZiviani marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions tools/lambda-promtail/lambda-promtail/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func getS3Object(ctx context.Context, labels map[string]string) (io.ReadCloser,
return obj.Body, nil
}

func parseS3Log(b *batch, labels map[string]string, obj io.ReadCloser) error {
func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.ReadCloser) error {
gzreader, err := gzip.NewReader(obj)
if err != nil {
return err
Expand All @@ -75,7 +75,7 @@ func parseS3Log(b *batch, labels map[string]string, obj io.ReadCloser) error {
return err
}

b.add(entry{ls, logproto.Entry{
b.add(ctx, entry{ls, logproto.Entry{
Line: log_line,
Timestamp: timestamp,
}})
Expand Down Expand Up @@ -106,7 +106,7 @@ func getLabels(record events.S3EventRecord) (map[string]string, error) {

func processS3Event(ctx context.Context, ev *events.S3Event) error {

batch := newBatch()
batch, _ := newBatch(ctx)

for _, record := range ev.Records {
labels, err := getLabels(record)
Expand All @@ -119,7 +119,7 @@ func processS3Event(ctx context.Context, ev *events.S3Event) error {
return err
}

err = parseS3Log(batch, labels, obj)
err = parseS3Log(ctx, batch, labels, obj)
if err != nil {
return err
}
Expand Down