Skip to content

Commit

Permalink
[add] set http client interface for consumer and producer
Browse files Browse the repository at this point in the history
  • Loading branch information
shabicheng committed Apr 29, 2022
1 parent c8ff383 commit 5f85a3c
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 5 deletions.
5 changes: 4 additions & 1 deletion consumer/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package consumerLibrary

import "net/http"

type LogHubConfig struct {
//:param Endpoint:
//:param AccessKeyID:
Expand Down Expand Up @@ -30,6 +32,7 @@ type LogHubConfig struct {
// is to retain all old log files (though MaxAge may still cause them to get
// deleted.)
//:param LogCompass: Compress determines if the rotated log files should be compressed using gzip.
//:param HTTPClient: custom http client for sending data to sls

Endpoint string
AccessKeyID string
Expand All @@ -50,6 +53,7 @@ type LogHubConfig struct {
LogMaxSize int
LogMaxBackups int
LogCompass bool
HTTPClient *http.Client
// SecurityToken string
}

Expand All @@ -65,4 +69,3 @@ const (
CONSUME_PROCESSING_DONE = "CONSUME_PROCESSING_DONE"
SHUTDOWN_COMPLETE = "SHUTDOWN_COMPLETE"
)

8 changes: 6 additions & 2 deletions consumer/consumer_client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package consumerLibrary

import (
"github.com/aliyun/aliyun-log-go-sdk"
"time"

sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"time"
)

type ConsumerClient struct {
Expand Down Expand Up @@ -32,6 +33,9 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient
// SecurityToken: option.SecurityToken,
UserAgent: option.ConsumerGroupName + "_" + option.ConsumerName,
}
if option.HTTPClient != nil {
client.SetHTTPClient(option.HTTPClient)
}
consumerGroup := sls.ConsumerGroup{
option.ConsumerGroupName,
option.HeartbeatIntervalInSecond * 3,
Expand Down
3 changes: 3 additions & 0 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func InitProducer(producerConfig *ProducerConfig) *Producer {
client = stsClient
}
}
if producerConfig.HTTPClient != nil {
client.SetHTTPClient(producerConfig.HTTPClient)
}
finalProducerConfig := validateProducerConfig(producerConfig)
retryQueue := initRetryQueue()
errorStatusMap := func() map[int]*string {
Expand Down
8 changes: 6 additions & 2 deletions producer/producer_config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package producer

import "time"
import (
"net/http"
"time"
)

const Delimiter = "|"

Expand All @@ -27,8 +30,9 @@ type ProducerConfig struct {
AccessKeyID string
AccessKeySecret string
NoRetryStatusCodeList []int
UpdateStsToken func() (accessKeyID, accessKeySecret, securityToken string, expireTime time.Time, err error)
UpdateStsToken func() (accessKeyID, accessKeySecret, securityToken string, expireTime time.Time, err error)
StsTokenShutDown chan struct{}
HTTPClient *http.Client
}

func GetDefaultProducerConfig() *ProducerConfig {
Expand Down

0 comments on commit 5f85a3c

Please sign in to comment.