ConnectException: Cannot create mapping when using RegexRouter/TimestampRouter SMT #99
Closed
Description
Following config works fine
{
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"topics": "sqlite-foo",
"key.ignore": "true",
"type.name": "type.name=kafka-connect",
"connection.url": "http://localhost:9200"
},
"name": "es-sink-sqlite-foo"
}
But if I add org.apache.kafka.connect.transforms.TimestampRouter
SMT:
{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"topics": "sqlite-foo",
"key.ignore": "true",
"type.name": "type.name=kafka-connect",
"connection.url": "http://localhost:9200",
"transforms":"routeTS",
"transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter"
}
the Elasticsearch Sink fails:
[2017-07-12 16:06:16,869] ERROR Task es-sink-sqlite-foo-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
org.apache.kafka.connect.errors.ConnectException: Cannot create mapping {"type.name=kafka-connect":{"properties":{"c1":{"type":"long"},"c2":{"type":"string"},"messagetopic":{"type":"string"},"messagesource":{"type":"string"}}}} -- {"root_cause":[{"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"sqlite-foo-20170712","index_uuid":"_na_","index":"sqlite-foo-20170712"}],"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"sqlite-foo-20170712","index_uuid":"_na_","index":"sqlite-foo-20170712"}
at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:65)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:201)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:119)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-07-12 16:06:16,871] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456)
Same problem exists with using RegexRouter
e.g.:
"transforms":"dropPrefix",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"sqlite-(.*)",
"transforms.dropPrefix.replacement":"$1"
It looks like the Sink correctly autocreates indices based on the inbound topic, but if the topic mutates with SMT, it doesn't.
Metadata
Assignees
Labels
No labels