Skip to content

Commit

Permalink
feat(pubsub): add support for ingestion platform logging settings (#1…
Browse files Browse the repository at this point in the history
…0969)

* feat(pubsub): add support for ingestion platform logging settings

* remove kafka logging
  • Loading branch information
hongalex authored Oct 14, 2024
1 parent f0b05e2 commit c60241f
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
40 changes: 39 additions & 1 deletion pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ type TopicConfigToUpdate struct {
// data source into this topic.
//
// When changing this value, the entire data source settings object must be applied,
// rather than just the differences.
// rather than just the differences. This includes the source and logging settings.
//
// Use the zero value &IngestionDataSourceSettings{} to remove the ingestion settings from the topic.
IngestionDataSourceSettings *IngestionDataSourceSettings
Expand Down Expand Up @@ -429,6 +429,8 @@ func messageStoragePolicyToProto(msp *MessageStoragePolicy) *pb.MessageStoragePo
// IngestionDataSourceSettings enables ingestion from a data source into this topic.
type IngestionDataSourceSettings struct {
Source IngestionDataSource

PlatformLogsSettings *PlatformLogsSettings
}

// IngestionDataSource is the kind of ingestion source to be used.
Expand Down Expand Up @@ -624,6 +626,13 @@ func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *In
MatchGlob: cs.GetMatchGlob(),
}
}

if pbs.PlatformLogsSettings != nil {
s.PlatformLogsSettings = &PlatformLogsSettings{
Severity: PlatformLogsSeverity(pbs.PlatformLogsSettings.Severity),
}
}

return s
}

Expand All @@ -636,6 +645,11 @@ func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings
return nil
}
pbs := &pb.IngestionDataSourceSettings{}
if i.PlatformLogsSettings != nil {
pbs.PlatformLogsSettings = &pb.PlatformLogsSettings{
Severity: pb.PlatformLogsSettings_Severity(i.PlatformLogsSettings.Severity),
}
}
if out := i.Source; out != nil {
if k, ok := out.(*IngestionDataSourceAWSKinesis); ok {
pbs.Source = &pb.IngestionDataSourceSettings_AwsKinesis_{
Expand Down Expand Up @@ -694,6 +708,30 @@ func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings
return pbs
}

// PlatformLogsSettings configures logging produced by Pub/Sub.
// Currently only valid on Cloud Storage ingestion topics.
type PlatformLogsSettings struct {
Severity PlatformLogsSeverity
}

// PlatformLogsSeverity are the severity levels of Platform Logs.
type PlatformLogsSeverity int32

const (
// PlatformLogsSeverityUnspecified is the default value. Logs level is unspecified. Logs will be disabled.
PlatformLogsSeverityUnspecified PlatformLogsSeverity = iota
// PlatformLogsSeverityDisabled means logs will be disabled.
PlatformLogsSeverityDisabled
// PlatformLogsSeverityDebug means debug logs and higher-severity logs will be written.
PlatformLogsSeverityDebug
// PlatformLogsSeverityInfo means info logs and higher-severity logs will be written.
PlatformLogsSeverityInfo
// PlatformLogsSeverityWarning means warning logs and higher-severity logs will be written.
PlatformLogsSeverityWarning
// PlatformLogsSeverityError means only error logs will be written.
PlatformLogsSeverityError
)

// Config returns the TopicConfig for the topic.
func (t *Topic) Config(ctx context.Context) (TopicConfig, error) {
pbt, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name})
Expand Down
6 changes: 6 additions & 0 deletions pubsub/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ func TestTopic_IngestionCloudStorage(t *testing.T) {
MinimumObjectCreateTime: time.Now().Add(-time.Hour),
MatchGlob: "**.txt",
},
PlatformLogsSettings: &PlatformLogsSettings{
Severity: PlatformLogsSeverityDisabled,
},
},
}

Expand All @@ -204,6 +207,9 @@ func TestTopic_IngestionCloudStorage(t *testing.T) {
MinimumObjectCreateTime: time.Now().Add(-2 * time.Hour),
MatchGlob: "**.txt",
},
PlatformLogsSettings: &PlatformLogsSettings{
Severity: PlatformLogsSeverityError,
},
}
config2, err := topic.Update(ctx, TopicConfigToUpdate{IngestionDataSourceSettings: settings})
if err != nil {
Expand Down

0 comments on commit c60241f

Please sign in to comment.