Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upgrade go version to 1.22.2; franz-go module from v1.10.4 to 1… #698

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

zhengbucuo
Copy link

feat: upgrade go version to 1.22.2; franz-go module from v1.10.4 to 1.18.1, add support for kafka3.x (Kafka >= 0.8.0 through v3.8+)

loggie 当前引用的franz-go版本是 v1.10.4,Sink 把日志输出至kafka 3.7,在打开自动创建topic情况下,依旧报UNKNOWN_TOPIC_OR_PARTITION错误。

附加Sink crd:

apiVersion: loggie.io/v1beta1
kind: LogConfig
metadata:
  name: xxx
  namespace: xxx
spec:
  pipeline:
    interceptorRef: default
    sinkRef: default
    sources: |
      - type: file
        name: mylog
        multi:
          active: true
          pattern: '^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}\]'
        paths:
        - stdout
  selector:
    labelSelector:
      app: xxx
    type: pod
---
apiVersion: loggie.io/v1beta1
kind: Sink
metadata:
  name: default
spec:
  sink: |
    type: franzKafka
    brokers: ["kafka.opensearch.svc.cluster.local:9092"]
    topic: "logs-${fields.namespace}-${fields.containername}"
    sasl:
      enabled: true
      mechanism: PLAIN
      username: xxx
      password: xxx
---
apiVersion: loggie.io/v1beta1
kind: Interceptor
metadata:
  name: default
spec:
  interceptors: |
    - type: rateLimit
      qps: 90000

附加报错日志:

{"level":"info","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:70","message":"producing to a new topic for the first time, fetching metadata to learn its partitions[\"topic\",\"logs-platform-merchant-data-api-test\"]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"producing to a new topic for the first time, fetching metadata to learn its partitions[\"topic\",\"logs-platform-merchant-data-api-test\"]"}
{"level":"info","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:70","message":"immediate metadata update triggered[\"why\",\"forced load because we are producing to a new topic for the first time\"]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"immediate metadata update triggered[\"why\",\"forced load because we are producing to a new topic for the first time\"]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"opening connection to broker[\"addr\",\"kafka.opensearch:9092\",\"broker\",\"seed 0\"]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"opening connection to broker[\"addr\",\"kafka.opensearch:9092\",\"broker\",\"seed 0\"]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"connection opened to broker[\"addr\",\"kafka.opensearch:9092\",\"broker\",\"seed 0\"]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"connection opened to broker[\"addr\",\"kafka.opensearch:9092\",\"broker\",\"seed 0\"]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"issuing api versions request[\"broker\",\"seed 0\",\"version\",3]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"issuing api versions request[\"broker\",\"seed 0\",\"version\",3]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"wrote ApiVersions v3[\"broker\",\"seed 0\",\"bytes_written\",31,\"write_wait\",25795,\"time_to_write\",47455,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"wrote ApiVersions v3[\"broker\",\"seed 0\",\"bytes_written\",31,\"write_wait\",25795,\"time_to_write\",47455,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"read ApiVersions v3[\"broker\",\"seed 0\",\"bytes_read\",475,\"read_wait\",47706,\"time_to_read\",1760265,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"read ApiVersions v3[\"broker\",\"seed 0\",\"bytes_read\",475,\"read_wait\",47706,\"time_to_read\",1760265,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"issuing SASLHandshakeRequest[\"broker\",\"seed 0\"]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"issuing SASLHandshakeRequest[\"broker\",\"seed 0\"]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"wrote SASLHandshake v1[\"broker\",\"seed 0\",\"bytes_written\",24,\"write_wait\",7969,\"time_to_write\",41673,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"wrote SASLHandshake v1[\"broker\",\"seed 0\",\"bytes_written\",24,\"write_wait\",7969,\"time_to_write\",41673,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"read SASLHandshake v1[\"broker\",\"seed 0\",\"bytes_read\",21,\"read_wait\",33763,\"time_to_read\",1711582,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"read SASLHandshake v1[\"broker\",\"seed 0\",\"bytes_read\",21,\"read_wait\",33763,\"time_to_read\",1711582,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"beginning sasl authentication[\"broker\",\"seed 0\",\"addr\",\"kafka.opensearch:9092\",\"mechanism\",\"PLAIN\",\"authenticate\",true]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"beginning sasl authentication[\"broker\",\"seed 0\",\"addr\",\"kafka.opensearch:9092\",\"mechanism\",\"PLAIN\",\"authenticate\",true]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"issuing SASLAuthenticate[\"broker\",\"seed 0\",\"version\",2,\"step\",0]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"issuing SASLAuthenticate[\"broker\",\"seed 0\",\"version\",2,\"step\",0]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"wrote SASLAuthenticate v2[\"broker\",\"seed 0\",\"bytes_written\",37,\"write_wait\",25560,\"time_to_write\",52920,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"wrote SASLAuthenticate v2[\"broker\",\"seed 0\",\"bytes_written\",37,\"write_wait\",25560,\"time_to_write\",52920,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"read SASLAuthenticate v2[\"broker\",\"seed 0\",\"bytes_read\",22,\"read_wait\",45372,\"time_to_read\",1698860,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"read SASLAuthenticate v2[\"broker\",\"seed 0\",\"bytes_read\",22,\"read_wait\",45372,\"time_to_read\",1698860,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"connection initialized successfully[\"addr\",\"kafka.opensearch:9092\",\"broker\",\"seed 0\"]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"connection initialized successfully[\"addr\",\"kafka.opensearch:9092\",\"broker\",\"seed 0\"]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"wrote Metadata v12[\"broker\",\"seed 0\",\"bytes_written\",76,\"write_wait\",12249252,\"time_to_write\",40403,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"wrote Metadata v12[\"broker\",\"seed 0\",\"bytes_written\",76,\"write_wait\",12249252,\"time_to_write\",40403,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"read Metadata v12[\"broker\",\"seed 0\",\"bytes_read\",357,\"read_wait\",93101,\"time_to_read\",1853819,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"read Metadata v12[\"broker\",\"seed 0\",\"bytes_read\",357,\"read_wait\",93101,\"time_to_read\",1853819,\"err\",null]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"immediate metadata update had inner errors, re-updating[\"errors\",\"UNKNOWN_TOPIC_OR_PARTITION{logs-platform-merchant-data-api-test}\",\"update_after\",250000000]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"immediate metadata update had inner errors, re-updating[\"errors\",\"UNKNOWN_TOPIC_OR_PARTITION{logs-platform-merchant-data-api-test}\",\"update_after\",250000000]"}
{"level":"info","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:70","message":"new topic metadata wait failed, retrying wait[\"topic\",\"logs-platform-merchant-data-api-test\",\"err\",{\"Message\":\"UNKNOWN_TOPIC_OR_PARTITION\",\"Code\":3,\"Retriable\":true,\"Description\":\"This server does not host this topic-partition.\"}]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"new topic metadata wait failed, retrying wait[\"topic\",\"logs-platform-merchant-data-api-test\",\"err\",{\"Message\":\"UNKNOWN_TOPIC_OR_PARTITION\",\"Code\":3,\"Retriable\":true,\"Description\":\"This server does not host this topic-partition.\"}]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:72","message":"opening connection to broker[\"addr\",\"kafka-controller-0.kafka-controller-headless.opensearch.svc.cluster.local:9092\",\"broker\",\"0\"]"}
{"level":"debug","time":"2025-03-06 19:14:22","caller":"/pkg/sink/franz/log.go:74","message":"opening connection to broker[\"addr\",\"kafka-controller-0.kafka-controller-headless.opensearch.svc.cluster.local:9092\",\"broker\",\"0\"]"}
fatal error: unexpected signal during runtime execution
[signal SIGSEGV: segmentation violation code=0x1 addr=0x47 pc=0x7f05b016b360]

runtime stack:
runtime.throw({0x2db56d4?, 0x7f05b003d6e0?})
	/usr/local/go/src/runtime/panic.go:992 +0x71
runtime.sigpanic()
	/usr/local/go/src/runtime/signal_unix.go:802 +0x389

goroutine 445 [syscall]:
runtime.cgocall(0x2384dd0, 0xc000513598)
	/usr/local/go/src/runtime/cgocall.go:157 +0x5c fp=0xc000513570 sp=0xc000513538 pc=0x405a9c
net._C2func_getaddrinfo(0xc001c1ac30, 0x0, 0xc001155860, 0xc001b9e8a8)
	_cgo_gotypes.go:94 +0x56 fp=0xc000513598 sp=0xc000513570 pc=0x577496
net.cgoLookupIPCNAME.func1({0xc001c1ac30, 0x1000000008cc8?, 0x0?}, 0xc001b64230?, 0x49e8cc0?)
	/usr/local/go/src/net/cgo_unix.go:160 +0x9f fp=0xc0005135f0 sp=0xc000513598 pc=0x5791bf
net.cgoLookupIPCNAME({0x2d5cf59, 0x3}, {0xc001b64230, 0x49})
	/usr/local/go/src/net/cgo_unix.go:160 +0x16d fp=0xc000513738 sp=0xc0005135f0 pc=0x578a2d
net.cgoIPLookup(0xc0005137d0?, {0x2d5cf59?, 0xc0000010e0?}, {0xc001b64230?, 0xc0008d0720?})
	/usr/local/go/src/net/cgo_unix.go:217 +0x3b fp=0xc0005137a8 sp=0xc000513738 pc=0x57927b
net.cgoLookupIP.func1()
	/usr/local/go/src/net/cgo_unix.go:227 +0x36 fp=0xc0005137e0 sp=0xc0005137a8 pc=0x5796b6
runtime.goexit()
	/usr/local/go/src/runtime/asm_amd64.s:1571 +0x1 fp=0xc0005137e8 sp=0xc0005137e0 pc=0x46d9e1
created by net.cgoLookupIP
	/usr/local/go/src/net/cgo_unix.go:227 +0x12a

goroutine 1 [chan receive]:
main.main()
	/cmd/loggie/main.go:147 +0xab9

goroutine 4 [chan receive]:
github.com/panjf2000/ants/v2.(*Pool).purgePeriodically(0xc00034dc00)
	/vendor/github.com/panjf2000/ants/v2/pool.go:69 +0x8b
created by github.com/panjf2000/ants/v2.NewPool
	/vendor/github.com/panjf2000/ants/v2/pool.go:137 +0x34a

goroutine 474 [select]:
github.com/twmb/franz-go/pkg/kgo.(*Client).waitUnknownTopic(0xc001d2e000, {0x31f57f8, 0xc000058060}, {0xc001cf4cf0, 0x24}, 0xc001950ed0)
	/vendor/github.com/twmb/franz-go/pkg/kgo/producer.go:860 +0x2ef
created by github.com/twmb/franz-go/pkg/kgo.(*Client).addUnknownTopicRecord
	/vendor/github.com/twmb/franz-go/pkg/kgo/producer.go:827 +0x33c

….18.1, add support for kafka3.x (Kafka >= 0.8.0 through v3.8+)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant