Skip to content

Commit

Permalink
Merge pull request #258 from crimson-gao/consumer-v4
Browse files Browse the repository at this point in the history
Consumer support signature algorithm v4
  • Loading branch information
crimson-gao authored Feb 29, 2024
2 parents ef4ec3b + a248f2f commit 0ed38c6
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 26 deletions.
5 changes: 4 additions & 1 deletion consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type LogHubConfig struct {
//:param AutoCommitDisabled: whether to disable commit checkpoint automatically, default is false, means auto commit checkpoint
// Note that if you set autocommit to false, you must use InitConsumerWorkerWithCheckpointTracker instead of InitConsumerWorker
//:param AutoCommitIntervalInSec: default auto commit interval, default is 30

//:param AuthVersion: signature algorithm version, default is sls.AuthV1
//:param Region: region of sls endpoint, eg. cn-hangzhou, region must be set if AuthVersion is sls.AuthV4
Endpoint string
AccessKeyID string
AccessKeySecret string
Expand All @@ -73,6 +74,8 @@ type LogHubConfig struct {
SecurityToken string
AutoCommitDisabled bool
AutoCommitIntervalInMS int64
AuthVersion sls.AuthVersionType
Region string
}

const (
Expand Down
26 changes: 17 additions & 9 deletions consumer/consumer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

type ConsumerClient struct {
option LogHubConfig
client *sls.Client
client sls.ClientInterface
consumerGroup sls.ConsumerGroup
logger log.Logger
}
Expand All @@ -33,19 +33,27 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient
if option.AutoCommitIntervalInMS == 0 {
option.AutoCommitIntervalInMS = 60 * 1000
}
client := &sls.Client{
Endpoint: option.Endpoint,
AccessKeyID: option.AccessKeyID,
AccessKeySecret: option.AccessKeySecret,
SecurityToken: option.SecurityToken,
UserAgent: option.ConsumerGroupName + "_" + option.ConsumerName,
}
var client sls.ClientInterface
if option.CredentialsProvider != nil {
client = client.WithCredentialsProvider(option.CredentialsProvider)
client = sls.CreateNormalInterfaceV2(option.Endpoint, option.CredentialsProvider)
} else {
client = sls.CreateNormalInterface(option.Endpoint,
option.AccessKeyID,
option.AccessKeySecret,
option.SecurityToken)
}
client.SetUserAgent(option.ConsumerGroupName + "_" + option.ConsumerName)

if option.HTTPClient != nil {
client.SetHTTPClient(option.HTTPClient)
}
if option.AuthVersion != "" {
client.SetAuthVersion(option.AuthVersion)
}
if option.Region != "" {
client.SetRegion(option.Region)
}

consumerGroup := sls.ConsumerGroup{
ConsumerGroupName: option.ConsumerGroupName,
Timeout: option.HeartbeatTimeoutInSecond,
Expand Down
2 changes: 1 addition & 1 deletion consumer/consumer_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestConsumerClient_createConsumerGroup(t *testing.T) {
}
}

func internalGetConsumerGroup(client *sls.Client, project, logstore, groupName string) (sls.ConsumerGroup, error) {
func internalGetConsumerGroup(client sls.ClientInterface, project, logstore, groupName string) (sls.ConsumerGroup, error) {
cgs, err := client.ListConsumerGroup(project, logstore)
if err != nil {
return sls.ConsumerGroup{}, err
Expand Down
14 changes: 8 additions & 6 deletions example/alert/alert_example.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package main

import (
sls "github.com/aliyun/aliyun-log-go-sdk"
"time"

sls "github.com/aliyun/aliyun-log-go-sdk"
)

func main() {
Expand All @@ -12,11 +13,12 @@ func main() {
logstore := "002"
dashboardName := "dashboardtest"
alertName := "test-alert"
client := &sls.Client{
Endpoint: "cn-hangzhou.log.aliyuncs.com",
AccessKeyID: accessKeyID,
AccessKeySecret: accessKeySecret,
}
client := sls.CreateNormalInterface(
"cn-hangzhou.log.aliyuncs.com",
accessKeyID,
accessKeySecret,
"",
)
chart := sls.Chart{
Title: "chart-1234567",
Type: "table",
Expand Down
13 changes: 7 additions & 6 deletions example/consumer/reset_checkpoint_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker cons
return "", nil
}

func updateCheckpoint(config consumerLibrary.LogHubConfig, client *sls.Client, shardId int) error {
func updateCheckpoint(config consumerLibrary.LogHubConfig, client sls.ClientInterface, shardId int) error {
from := fmt.Sprintf("%d", time.Now().Unix())
cursor, err := client.GetCursor(config.Project, config.Logstore, shardId, from)
if err != nil {
Expand All @@ -67,11 +67,12 @@ func updateCheckpoint(config consumerLibrary.LogHubConfig, client *sls.Client, s
}

func UpdateConsumerGroupCheckPoint(config consumerLibrary.LogHubConfig) error {
client := &sls.Client{
Endpoint: config.Endpoint,
AccessKeyID: config.AccessKeyID,
AccessKeySecret: config.AccessKeySecret,
}
client := sls.CreateNormalInterface(
config.Endpoint,
config.AccessKeyID,
config.AccessKeySecret,
"",
)
shards, err := client.ListShards(config.Project, config.Logstore)
if err != nil {
return err
Expand Down
7 changes: 4 additions & 3 deletions example/metric_agg/metric_agg_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package main
import (
"encoding/json"
"fmt"

sls "github.com/aliyun/aliyun-log-go-sdk"
)

func crud(client sls.Client, sourceProject string, aggRules *sls.MetricAggRules, testId string) {
func crud(client *sls.Client, sourceProject string, aggRules *sls.MetricAggRules, testId string) {
err := client.CreateMetricAggRules(sourceProject, aggRules)
if err != nil {
panic(err)
Expand Down Expand Up @@ -137,9 +138,9 @@ func main() {

testId := "metric_agg_rules1"
aggRules := sqlConfig(accessKeyID, accessKeySecret, testId)
crud(*client, sourceProject, aggRules, testId)
crud(client, sourceProject, aggRules, testId)

testId = "metric_agg_rules2"
aggRules = promqlConfig(accessKeyID, accessKeySecret, testId)
crud(*client, sourceProject, aggRules, testId)
crud(client, sourceProject, aggRules, testId)
}
44 changes: 44 additions & 0 deletions example/signv4/signv4.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package main

import (
"fmt"
"os"
"os/signal"
"syscall"

sls "github.com/aliyun/aliyun-log-go-sdk"
consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer"
"github.com/aliyun/aliyun-log-go-sdk/util"
"github.com/go-kit/kit/log/level"
)

func main() {
Expand All @@ -20,3 +27,40 @@ func main() {

client.GetProject("example-project") // call client API
}

func CreateSignV4Consumer() {
accessKeyId, accessKeySecret := "", "" // replace with your access key and secret
endpoint := "cn-hangzhou-intranet.log.aliyuncs.com" // replace with your endpoint
region, err := util.ParseRegion(endpoint) // parse region from endpoint
if err != nil {
panic(err)
}
option := consumerLibrary.LogHubConfig{
Endpoint: endpoint,
AccessKeyID: accessKeyId,
AccessKeySecret: accessKeySecret,
Project: "example-project",
Logstore: "example-logstore",
ConsumerGroupName: "example-consumer-group",
ConsumerName: "example-consumer-group-consumer-1",
CursorPosition: consumerLibrary.END_CURSOR,

AuthVersion: sls.AuthV4, // use signature v4
Region: region, // region must be set if using signature v4
}
// create consumer
consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process)
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
consumerWorker.Start()
if _, ok := <-ch; ok {
level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName)
consumerWorker.StopAndWait()
}
}

func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) {
fmt.Println(shardId, logGroupList)
checkpointTracker.SaveCheckPoint(true)
return "", nil
}

0 comments on commit 0ed38c6

Please sign in to comment.