Closed
Description
I am trying to connect Telegram to Kafka using the Telegram Source Connector. It works but the messages put onto the Kafka topic are the toString()
representation of the Telegram message, i.e. IncomingMessage{messageId=20, date=2020-05-05T07:53:42Z,...}, text='test 13',...}
. These are hard to parse from another application perspective and so I would like to transform them before they are put onto the Kafka topic.
I believe that using the transforms below should work but they do not have an effect on the messages put onto the Kafka topic.
transforms: xxx
transforms.xxx.type: org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value
target.type: String.class
I'm using Strimzi 0.17 with the KafkaConnector Operator managing the connectors. Here are the CRs I'm using the bring up the cluster and configure the connector
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: sjwoodman/camel-kafkaconnect
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
externalConfiguration:
volumes:
- name: connector-config
secret:
secretName: telegram-credentials
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: telegram-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.camel.kafkaconnector.CamelSourceConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
transforms: xxx
transforms.xxx.type: org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value
target.type: String.class
camel.source.kafka.topic: telegram-topic
camel.source.url: telegram:bots
camel.component.telegram.authorizationToken: ${file:/opt/kafka/external-configuration/connector-config/telegram.properties:token}
Metadata
Metadata
Assignees
Labels
No labels