Skip to content

Cannot apply transforms to Telegram Source Connector #188

Closed
@sjwoodman

Description

@sjwoodman

I am trying to connect Telegram to Kafka using the Telegram Source Connector. It works but the messages put onto the Kafka topic are the toString() representation of the Telegram message, i.e. IncomingMessage{messageId=20, date=2020-05-05T07:53:42Z,...}, text='test 13',...}. These are hard to parse from another application perspective and so I would like to transform them before they are put onto the Kafka topic.

I believe that using the transforms below should work but they do not have an effect on the messages put onto the Kafka topic.

transforms: xxx
transforms.xxx.type: org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value
target.type: String.class

I'm using Strimzi 0.17 with the KafkaConnector Operator managing the connectors. Here are the CRs I'm using the bring up the cluster and configure the connector

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: sjwoodman/camel-kafkaconnect
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  config:
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
    config.providers: file
    config.providers.file.class:  org.apache.kafka.common.config.provider.FileConfigProvider
  externalConfiguration:
    volumes:
      - name: connector-config
        secret:
          secretName: telegram-credentials
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: telegram-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.camel.kafkaconnector.CamelSourceConnector
  tasksMax: 1
  config:
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    transforms: xxx
    transforms.xxx.type: org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value
    target.type: String.class
    camel.source.kafka.topic: telegram-topic
    camel.source.url: telegram:bots
    camel.component.telegram.authorizationToken: ${file:/opt/kafka/external-configuration/connector-config/telegram.properties:token}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions