diff --git a/consumer/config.go b/consumer/config.go index 1e66471d..8a27cc99 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -1,5 +1,7 @@ package consumerLibrary +import "net/http" + type LogHubConfig struct { //:param Endpoint: //:param AccessKeyID: @@ -30,6 +32,7 @@ type LogHubConfig struct { // is to retain all old log files (though MaxAge may still cause them to get // deleted.) //:param LogCompass: Compress determines if the rotated log files should be compressed using gzip. + //:param HTTPClient: custom http client for sending data to sls Endpoint string AccessKeyID string @@ -50,6 +53,7 @@ type LogHubConfig struct { LogMaxSize int LogMaxBackups int LogCompass bool + HTTPClient *http.Client // SecurityToken string } @@ -65,4 +69,3 @@ const ( CONSUME_PROCESSING_DONE = "CONSUME_PROCESSING_DONE" SHUTDOWN_COMPLETE = "SHUTDOWN_COMPLETE" ) - diff --git a/consumer/consumer_client.go b/consumer/consumer_client.go index f201682a..5f00678c 100644 --- a/consumer/consumer_client.go +++ b/consumer/consumer_client.go @@ -1,10 +1,11 @@ package consumerLibrary import ( - "github.com/aliyun/aliyun-log-go-sdk" + "time" + + sls "github.com/aliyun/aliyun-log-go-sdk" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "time" ) type ConsumerClient struct { @@ -32,6 +33,9 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient // SecurityToken: option.SecurityToken, UserAgent: option.ConsumerGroupName + "_" + option.ConsumerName, } + if option.HTTPClient != nil { + client.SetHTTPClient(option.HTTPClient) + } consumerGroup := sls.ConsumerGroup{ option.ConsumerGroupName, option.HeartbeatIntervalInSecond * 3, diff --git a/producer/producer.go b/producer/producer.go index a6f5f00b..27cc28c6 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -41,6 +41,9 @@ func InitProducer(producerConfig *ProducerConfig) *Producer { client = stsClient } } + if producerConfig.HTTPClient != nil { + client.SetHTTPClient(producerConfig.HTTPClient) + } finalProducerConfig := validateProducerConfig(producerConfig) retryQueue := initRetryQueue() errorStatusMap := func() map[int]*string { diff --git a/producer/producer_config.go b/producer/producer_config.go index db780f6f..acc4a366 100644 --- a/producer/producer_config.go +++ b/producer/producer_config.go @@ -1,6 +1,9 @@ package producer -import "time" +import ( + "net/http" + "time" +) const Delimiter = "|" @@ -27,8 +30,9 @@ type ProducerConfig struct { AccessKeyID string AccessKeySecret string NoRetryStatusCodeList []int - UpdateStsToken func() (accessKeyID, accessKeySecret, securityToken string, expireTime time.Time, err error) + UpdateStsToken func() (accessKeyID, accessKeySecret, securityToken string, expireTime time.Time, err error) StsTokenShutDown chan struct{} + HTTPClient *http.Client } func GetDefaultProducerConfig() *ProducerConfig {