Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dead letter queue on sinks #1772

Open
Jeffail opened this issue Feb 11, 2020 · 21 comments
Open

Support dead letter queue on sinks #1772

Jeffail opened this issue Feb 11, 2020 · 21 comments
Labels
domain: reliability Anything related to Vector's reliability domain: sinks Anything related to the Vector's sinks meta: idea Anything in the idea phase. Needs further discussion and consensus before work can begin. needs: approval Needs review & approval before work can begin. needs: more demand Needs more demand before work can begin, +1 or comment to support. needs: requirements Needs a a list of requirements before work can be begin

Comments

@Jeffail
Copy link
Contributor

Jeffail commented Feb 11, 2020

In the context of an event processor a dead letter queue can mean a number of things. We can already support content based DLQs using transforms to route certain events to a secondary sink when they aren't suitable for our primary sink.

However, it would also be nice to support dead letter queuing by chaining sinks as fallback options, where failed sends can be routed under certain circumstances (rather than retrying in an infinite loop, or being dropped completely).

It'd be cool to be able to do something like this:

[sinks.foo]
  inputs = [ "somewhere" ]
  type = "something"
  [sinks.foo.dlq]
    after_retries = 3

[sinks.bar]
  inputs = [ "foo.dlq" ]
  type = "something_else"

And since we're just producing output from sinks here there's no reason we can't add transforms in there as well:

[sinks.foo]
  inputs = [ "somewhere" ]
  type = "something"
  [sinks.foo.dlq]
    after_retries = 3

[transforms.baz]
  inputs = [ "foo.dlq" ]
  type = "clean_up"

[sinks.bar]
  inputs = [ "baz" ]
  type = "something_else"
@Jeffail Jeffail added domain: sinks Anything related to the Vector's sinks needs: approval Needs review & approval before work can begin. needs: requirements Needs a a list of requirements before work can be begin labels Feb 11, 2020
@binarylogic
Copy link
Contributor

I support this. I'm curious if this could be architected in a way that wouldn't require a ton of manual individual sink work?

@binarylogic binarylogic added domain: reliability Anything related to Vector's reliability meta: idea Anything in the idea phase. Needs further discussion and consensus before work can begin. needs: more demand Needs more demand before work can begin, +1 or comment to support. labels Aug 7, 2020
@jszwedko
Copy link
Member

Noting here that it'd be useful to "replay" messages from the DLQs easily.

@binarylogic binarylogic changed the title Support dead letter queue sinks Support dead letter queue sources Sep 16, 2020
@jszwedko
Copy link
Member

jszwedko commented Aug 12, 2021

Use case from Discord: https://discord.com/channels/742820443487993987/746070591097798688/875360138612064286

Hi guys, 
I'm having some errors on a vector instance that is responsible to read messages from kafka and sink them to elastic.
There is 2 kind of messages 
ERROR sink{component_kind="sink" component_name=elasticsearch_sink_out component_type=elasticsearch}:request{request_id=2929383}: vector::sinks::util::retries: Not retriable; dropping the request. reason="error type: mapper_parsing_exception, reason: object mapping for [deactivated_id] tried to parse field [null] as object, but found a concrete value"

ERROR sink{component_kind="sink" component_name=elasticsearch_sink_out component_type=elasticsearch}:request{request_id=2929418}: vector::sinks::util::retries: Not retriable; dropping the request. reason="error type: mapper_parsing_exception, reason: failed to parse field [price_effective_date_active] of type [date]"


However, as I use one sink for multiple topics, I'm unable to know which topic is error related. As those fields are present is multiple topics also.
Is there any possibilities to customize logs to have more informations ? 
I tried debug logs, but didn't get much in it.
If I could have the source topic and an offset or at least the target index (one topic goes to one index) that would be really helpful

@shamil
Copy link

shamil commented Feb 7, 2022

Any news regarding this? We desperately need this DLQ feature. Logstash DLQ helps us a lot and this is the only feature that keeps us from migrating

@jszwedko
Copy link
Member

jszwedko commented Feb 7, 2022

Hi @shamil ! It's still something we very much want to do, but hasn't been scheduled yet.

@shamil
Copy link

shamil commented Feb 7, 2022

Thanks @jszwedko I will wait ;)

@ottramst
Copy link

Would like this feature as well :)
We're working in an environment where we capture multiple different log formats and try to send them to Elasticsearch. Needless to say that this feature would help us out a lot.

We previously were using the Dead Letter Queue on Logstash for this where we read the failed documents straight out of the Dead Letter Queue with a dedicated plugin, did some transforms and indexed them back (in a different format) to Elasticsearch for further investigation.

@rlazimi-dev
Copy link

Just bumping - the need for this feature came up for me again.

@rlazimi-dev
Copy link

I think it's important that the DLQ contains actionable messages where an actionable message looks like: {error msg, payload that caused the error}. Through such actionable messages, we can figure out what failed and also calculate exactly why it failed.

If it is too much work to architect a DLQ that chains off of sinks, then I think for the sake of delivering such functionality, it is worth extending a feature that you guys already have architected.

The feature im referring to is the internal_logs source.

Proposed (simple) solution:
Considering the internal_logs source already produces events per error, which already record the error and the sink that caused the error, then all that's needed to improve the existing error handling is to include another field that is the original payload that caused the error.

Example
Just to prove the decent power of the current internal_logs implementation, below is an example of the fields it captures. A comment denotes the change I think will be helpful.

{
    "response": "Response { status: 400, version: HTTP/1.1, headers: {\"server\": \"openresty/1.15.8.1\ ... cut for brevity}",
    // "payload": {...}, //<------ payload should contain the original message that was in the component that the error came from
    "metadata": {
      "level": "ERROR",
      "kind": "event",
      "target": "vector::sinks::util::sink",
      "module_path": "vector::sinks::util::sink"
    },
    "pid": 1,
    "timestamp": "2022-06-13T15:37:11.637097037Z",
    "source_type": "internal_logs",
    "host": "vector-deployment-edge-container-id",
    "message": "Response failed."
  }

Note: I'm aware that this implementation does not really produce a DLQ, and that it does not technically satisfy the OP's description, but I believe many people would be satisfied with this idea for now, because it at least drastically helps in diagnosing sink errors in components. Thumbs up if you think this is "good enough" for now or thumbs down in case I'm misguided. I just want something to be done as opposed to waiting too long for the perfect solution.

@jszwedko @binarylogic what do you guys think? I havent read the code, but i would guess that it's straightforward to implement considering it seems you guys already have the codified infrastructure set up for internal logs to pick up sink errors.

Thank you for the read!

@jszwedko
Copy link
Member

Thanks for the thoughts @rlazimi-dev ! I think this is something we might pick up in Q3.

@awangc
Copy link

awangc commented Sep 27, 2022

Any updates on this? It's something that could be really useful.

@syntastical
Copy link

I just effectively ran into this issue too. I'm using the HTTP sink and I'd really like to have access to the output produced from the HTTP request for use in other transforms and sinks.

@kushalhalder
Copy link

Folks, any updates on this? We cannot use this for any stable pipeline if DLQ is not supported. Or is there an alternative to DLQ strategies?

@kushalhalder
Copy link

For example: https://github.com/vectordotdev/vector-test-harness/blob/master/cases/tcp_to_http_performance/ansible/config_files/vector.toml
Even in this case, we are testing for speed, but what about retries and stability and fault tolerance?

@jszwedko
Copy link
Member

No updates yet unfortunately. Sinks do retry requests for transient failures.

@tastyfrankfurt
Copy link

Have to say pretty big gap to be missing, as some of those logs could be critical and need recovering, or you also could be put in a position where you end up duplicating logs through the stack just to recover a couple. I think this is a must have feature for sinks, especially given something similar has been implemented in VRL.

Maybe a simpler way for doing the elastic sink is to just add additional configuration item for specifying the dlq index and vector can output to that index if the current one fails all retries.

@ravenjm
Copy link

ravenjm commented Sep 27, 2023

I think it's not about retries especially when we talking about elasticsearch.
The most common issue is mapping mismatch for example:

{
     "field1": "this is a string"
}

{
     "field1": 12345
}

@knightprog
Copy link

I have the same exact case as @ravenjm, using elasticsearch sink, rejected logs because of format mismatch are just dropped.
It would be really interesting to be able to route them in another sink.

@roniez
Copy link

roniez commented Apr 23, 2024

I have come in to the same issue and it would be really nice to send failed messages to another sink to maybe send them back to a kafka topic for dedicated DLQ messages or to file on disk.

@marcus-crane
Copy link

marcus-crane commented May 2, 2024

One particular use case I have is around using Vector for delivering logs. We persist some logs in Kafka which then go through a processing pipeline but occasionally, someone will send through a log line that is bigger than the maximum Kafka payload size (that we probably have as a default).

In these cases, it would be nice to have a DLQ action. Probably not a literal queue but say; rerouting an event like that to an S3 bucket. Ironically, a standard pattern of publishing to a dead letter Kafka topic wouldn't work because it too would not be sized large enough plus it's rare enough that ordering is not that useful of a property.

@DimDroll
Copy link

DimDroll commented May 9, 2024

someone will send through a log line that is bigger than the maximum Kafka payload size (that we probably have as a default).

@marcus-crane for such cases we developed a number of remaps that handle "dead letter" cases.

First one that checks event's size and then "abort"s processing:

[transforms.defaultChecks]
  type            = "remap"
  inputs          = ["parseLogs"]
  drop_on_abort   = true
  drop_on_error   = true
  reroute_dropped = true

  source = '''
# Log events to console:
  if ${TOML_DEBUG} { log("PRE-defaultChecks: " + encode_json(.), level: "info", rate_limit_secs: 60) }

####################################################################
####################### WHEN EVENT IS > 3MB ########################
####################################################################

# initiate possibly empty error object
  if is_object(.error) {
    .error = object!(.error)
  } else {
    .error = {}
  }

# if root objects byte size is bigger then Kafkas max.message.bytes (3MB),
# we should invalidate it for later inspection
  if length(encode_json(.)) > 3000000 {
    .error = merge(.error, {"type": "pipeline", "message": "MessageSizeTooLarge"})
    abort
  }

  . = compact(.)

# Log events to console:
  if ${TOML_DEBUG} { log("POST-defaultChecks: " + encode_json(.), level: "info", rate_limit_secs: 60) }
'''

Next, ALL aborted messages in previous remap's are going through catch-all remap that tags them properly. "route" is internal field name we use for dynamic events routing:

[transforms.catchAbortAndError]
  type   = "remap"
  inputs = ["*.dropped"]

  source = '''
# Log events to console:
  if ${TOML_DEBUG} { log("PRE-catchAbortAndError: " + encode_json(.), level: "info", rate_limit_secs: 60) }
  if ${TOML_DEBUG} { log("METADATA: " + encode_json(%), level: "info", rate_limit_secs: 60) }

####################################################################
###################### WHEN EVENT HAS ERRORS #######################
####################################################################

# metadata contains abort! error information
  .event.metadata    = encode_json(%)

  .route.type      = "all"
  .route.dataset   = "catch"
  .route.namespace = "all"

# Log events to console:
  if ${TOML_DEBUG} { log("POST-catchAbortAndError: " + encode_json(.), level: "info", rate_limit_secs: 60) }
'''

And last, all our kafka sinks are configured to support large messages with:

[sinks.tranformedKafka]
  type              = "kafka"
  inputs            = ["catchAbortAndError", "defaultChecks"]
  bootstrap_servers = "${KAFKA_BROKERS}"
  topic             = "{{`{{ .route.type }}`}}-{{`{{ .route.dataset }}`}}-{{`{{ .route.namespace }}`}}"
  compression       = "none"
  healthcheck       = true

  [sinks.tranformedKafka.encoding]
    codec = "json"

  [sinks.tranformedKafka.tls]
    enabled  = true
    ca_file  = "/etc/kafka-cluster-ca/ca.crt"
    crt_file = "/etc/${VECTOR_USER}-user-certs/user.crt"
    key_file = "/etc/${VECTOR_USER}-user-certs/user.key"

  # handleDropped produced events more then 100MB
  [sinks.tranformedKafka.librdkafka_options]
    "message.max.bytes" = "100000000" # 100MB

this of course can't handle cases when ElasticSearch rejects a message due to mapping conflicts, which is shame, but theoretically can be worked around by having strict schema verification in vector, but this is still in our to do list. hopefully this RFC will be finished and merged before that :)
#14708

@jszwedko jszwedko changed the title Support dead letter queue sources Support dead letter queue on sinks Sep 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: reliability Anything related to Vector's reliability domain: sinks Anything related to the Vector's sinks meta: idea Anything in the idea phase. Needs further discussion and consensus before work can begin. needs: approval Needs review & approval before work can begin. needs: more demand Needs more demand before work can begin, +1 or comment to support. needs: requirements Needs a a list of requirements before work can be begin
Projects
None yet
Development

No branches or pull requests