Skip to content

Logstash Ingest Pipelines - Nginx integration produces field error.message #58

Closed
@kaisecheng

Description

@kaisecheng

Version
logstash-filter-elastic_integration 0.0.1
Stack (Elasticsearch, agent, fleet) 8.7.0

Description
Fleet managed agent installs a policy with Nginx integration and config to output data to Logstash.
elastic_agent config with enrich => none
elastic_integration executes ingest pipeline but produces events with field error.message

    "error": {
      "message": "field [@timestamp] doesn't exist"
    }

Config

input {
    elastic_agent {
        port => 5044
        enrich => none
        ssl => true
        ssl_certificate_authorities => REDACT
        ssl_certificate => REDACT
        ssl_key => REDACT
        ssl_verify_mode => "force_peer"
    }
}
filter {
    elastic_integration {
        cloud_id => REDACT
        api_key => REDACT
        remove_field => ["_version"]
    }
}
output {
    elasticsearch {
        cloud_auth => REDACT
        cloud_id => REDACT
    }
}

Nginx Log

172.17.0.1 - - [27/Apr/2023:19:52:29 +0000] "GET / HTTP/1.1" 200 615 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36" "-"
172.17.0.1 - - [27/Apr/2023:19:52:30 +0000] "GET /favicon.ico HTTP/1.1" 404 555 "http://localhost:8080/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36" "-"
172.17.0.1 - - [27/Apr/2023:19:56:21 +0000] "GET /nginx_status HTTP/1.1" 404 153 "-" "Elastic-Metricbeat/8.7.0 (darwin; amd64; a8dbc6c06381f4fe33a5dc23906d63c04c9e2444; 2023-03-23 00:44:48 +0000 UTC)" "-"
Ingest pipeline logs-nginx.access-1.11.0 processors
[
  {
    "pipeline": {
      "if": "ctx.message.startsWith('{')",
      "name": "logs-nginx.access-1.11.0-third-party"
    }
  },
  {
    "set": {
      "field": "event.ingested",
      "value": "{{_ingest.timestamp}}"
    }
  },
  {
    "set": {
      "field": "ecs.version",
      "value": "8.5.1"
    }
  },
  {
    "rename": {
      "field": "message",
      "target_field": "event.original",
      "ignore_missing": true
    }
  },
  {
    "grok": {
      "field": "event.original",
      "patterns": [
        "(%{NGINX_HOST} )?\"?(?:%{NGINX_ADDRESS_LIST:nginx.access.remote_ip_list}|%{NOTSPACE:source.address}) - (-|%{DATA:user.name}) \\[%{HTTPDATE:nginx.access.time}\\] \"%{DATA:nginx.access.info}\" %{NUMBER:http.response.status_code:long} %{NUMBER:http.response.body.bytes:long} \"(-|%{DATA:http.request.referrer})\" \"(-|%{DATA:user_agent.original})\""
      ],
      "pattern_definitions": {
        "NGINX_HOST": "(?:%{IP:destination.ip}|%{NGINX_NOTSEPARATOR:destination.domain})(:%{NUMBER:destination.port})?",
        "NGINX_NOTSEPARATOR": "[^\t ,:]+",
        "NGINX_ADDRESS_LIST": "(?:%{IP}|%{WORD})(\"?,?\\s*(?:%{IP}|%{WORD}))*"
      },
      "ignore_missing": true
    }
  },
  {
    "grok": {
      "field": "nginx.access.info",
      "patterns": [
        "%{WORD:http.request.method} %{DATA:_tmp.url_orig} HTTP/%{NUMBER:http.version}",
        ""
      ],
      "ignore_missing": true
    }
  },
  {
    "uri_parts": {
      "field": "_tmp.url_orig",
      "ignore_failure": true
    }
  },
  {
    "set": {
      "field": "url.domain",
      "value": "{{destination.domain}}",
      "if": "ctx.url?.domain == null && ctx.destination?.domain != null"
    }
  },
  {
    "remove": {
      "field": [
        "nginx.access.info",
        "_tmp.url_orig"
      ],
      "ignore_missing": true
    }
  },
  {
    "split": {
      "field": "nginx.access.remote_ip_list",
      "separator": "\"?,?\\s+",
      "ignore_missing": true
    }
  },
  {
    "split": {
      "field": "nginx.access.origin",
      "separator": "\"?,?\\s+",
      "ignore_missing": true
    }
  },
  {
    "set": {
      "field": "source.address",
      "if": "ctx.source?.address == null",
      "value": ""
    }
  },
  {
    "script": {
      "if": "ctx.nginx?.access?.remote_ip_list != null && ctx.nginx.access.remote_ip_list.length > 0",
      "lang": "painless",
      "source": "boolean isPrivate(def dot, def ip) {\n  try {\n    StringTokenizer tok = new StringTokenizer(ip, dot);\n    int firstByte = Integer.parseInt(tok.nextToken());\n    int secondByte = Integer.parseInt(tok.nextToken());\n    if (firstByte == 10) {\n      return true;\n    }\n    if (firstByte == 192 && secondByte == 168) {\n      return true;\n    }\n    if (firstByte == 172 && secondByte >= 16 && secondByte <= 31) {\n      return true;\n    }\n    if (firstByte == 127) {\n      return true;\n    }\n    return false;\n  }\n  catch (Exception e) {\n    return false;\n  }\n} try {\n  ctx.source.address = null;\n  if (ctx.nginx.access.remote_ip_list == null) {\n    return;\n  }\n  def found = false;\n  for (def item : ctx.nginx.access.remote_ip_list) {\n    if (!isPrivate(params.dot, item)) {\n      ctx.source.address = item;\n      found = true;\n      break;\n    }\n  }\n  if (!found) {\n    ctx.source.address = ctx.nginx.access.remote_ip_list[0];\n  }\n} catch (Exception e) {\n  ctx.source.address = null;\n}",
      "params": {
        "dot": "."
      }
    }
  },
  {
    "remove": {
      "field": "source.address",
      "if": "ctx.source.address == null"
    }
  },
  {
    "grok": {
      "field": "source.address",
      "patterns": [
        "^%{IP:source.ip}$"
      ],
      "ignore_failure": true
    }
  },
  {
    "remove": {
      "field": "event.created",
      "ignore_missing": true,
      "ignore_failure": true
    }
  },
  {
    "rename": {
      "field": "@timestamp",
      "target_field": "event.created"
    }
  },
  {
    "convert": {
      "field": "destination.port",
      "type": "long",
      "if": "ctx.destination?.port != null"
    }
  },
  {
    "date": {
      "field": "nginx.access.time",
      "target_field": "@timestamp",
      "formats": [
        "dd/MMM/yyyy:H:m:s Z"
      ],
      "on_failure": [
        {
          "append": {
            "field": "error.message",
            "value": "{{ _ingest.on_failure_message }}"
          }
        }
      ]
    }
  },
  {
    "remove": {
      "field": "nginx.access.time"
    }
  },
  {
    "user_agent": {
      "field": "user_agent.original",
      "ignore_missing": true
    }
  },
  {
    "geoip": {
      "field": "source.ip",
      "target_field": "source.geo",
      "ignore_missing": true
    }
  },
  {
    "geoip": {
      "database_file": "GeoLite2-ASN.mmdb",
      "field": "source.ip",
      "target_field": "source.as",
      "properties": [
        "asn",
        "organization_name"
      ],
      "ignore_missing": true
    }
  },
  {
    "rename": {
      "field": "source.as.asn",
      "target_field": "source.as.number",
      "ignore_missing": true
    }
  },
  {
    "rename": {
      "field": "source.as.organization_name",
      "target_field": "source.as.organization.name",
      "ignore_missing": true
    }
  },
  {
    "set": {
      "field": "event.kind",
      "value": "event"
    }
  },
  {
    "append": {
      "field": "event.category",
      "value": "web"
    }
  },
  {
    "append": {
      "field": "event.type",
      "value": "access"
    }
  },
  {
    "set": {
      "field": "event.outcome",
      "value": "success",
      "if": "ctx?.http?.response?.status_code != null && ctx.http.response.status_code < 400"
    }
  },
  {
    "set": {
      "field": "event.outcome",
      "value": "failure",
      "if": "ctx?.http?.response?.status_code != null && ctx.http.response.status_code >= 400"
    }
  },
  {
    "append": {
      "field": "related.ip",
      "value": "{{source.ip}}",
      "if": "ctx?.source?.ip != null"
    }
  },
  {
    "append": {
      "field": "related.ip",
      "value": "{{destination.ip}}",
      "if": "ctx?.destination?.ip != null"
    }
  },
  {
    "append": {
      "field": "related.user",
      "value": "{{user.name}}",
      "if": "ctx?.user?.name != null"
    }
  },
  {
    "script": {
      "lang": "painless",
      "description": "This script processor iterates over the whole document to remove fields with null values.",
      "source": "void handleMap(Map map) {\n  for (def x : map.values()) {\n    if (x instanceof Map) {\n        handleMap(x);\n    } else if (x instanceof List) {\n        handleList(x);\n    }\n  }\n  map.values().removeIf(v -> v == null);\n}\nvoid handleList(List list) {\n  for (def x : list) {\n      if (x instanceof Map) {\n          handleMap(x);\n      } else if (x instanceof List) {\n          handleList(x);\n      }\n  }\n}\nhandleMap(ctx);\n"
    }
  },
  {
    "remove": {
      "field": "event.original",
      "if": "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))",
      "ignore_failure": true,
      "ignore_missing": true
    }
  },
  {
    "pipeline": {
      "name": "logs-nginx.access@custom",
      "ignore_missing_pipeline": true
    }
  }
]
Event in Elasticsearch
{
  "_index": ".ds-logs-nginx.access-default-2023.04.28-000001",
  "_id": "LnOzyIcBFbk5iRTEPF7P",
  "_score": 1,
  "_source": {
    "agent": {
      "name": "macbook.local",
      "id": "b07a757b-cd28-43c8-ac18-22e15495351a",
      "type": "filebeat",
      "ephemeral_id": "e1a1a572-7500-4ac7-984b-bead5068c7b0",
      "version": "8.7.0"
    },
    "nginx": {
      "access": {
        "time": "28/Apr/2023:16:30:01 +0000",
        "remote_ip_list": [
          "172.17.0.1"
        ]
      }
    },
    "log": {
      "file": {
        "path": "/Users/kcheng/docker/nginx/log/access.log"
      },
      "offset": 225990
    },
    "elastic_agent": {
      "id": "b07a757b-cd28-43c8-ac18-22e15495351a",
      "version": "8.7.0",
      "snapshot": false
    },
    "source": {
      "address": "172.17.0.1",
      "ip": "172.17.0.1"
    },
    "error": {
      "message": "field [@timestamp] doesn't exist"
    },
    "url": {
      "path": "/nginx_status",
      "original": "/nginx_status",
      "scheme": null,
      "domain": null
    },
    "tags": [
      "nginx-access"
    ],
    "input": {
      "type": "log"
    },
    "@timestamp": "2023-04-28T16:30:01.301Z",
    "_tmp": {},
    "ecs": {
      "version": "8.5.1"
    },
    "data_stream": {
      "namespace": "default",
      "type": "logs",
      "dataset": "nginx.access"
    },
    "host": {
      "hostname": "macbook.local",
      "os": {
        "build": "22E261",
        "kernel": "22.4.0",
        "name": "macOS",
        "family": "darwin",
        "type": "macos",
        "version": "13.3.1",
        "platform": "darwin"
      },
      "ip": [
        "fe80::aede:48ff:fe00:1122",
        "fe80::1c11:8dad:673d:a337",
        "fdde:4d75:7647:4aed:c53:2ec3:6a77:396a",
        "192.168.1.70",
        "fe80::e448:47ff:feca:53da",
        "fe80::e448:47ff:feca:53da",
        "fe80::fe41:bd14:a8c9:3128",
        "fe80::7176:c858:cb28:3216",
        "fe80::ce81:b1c:bd2c:69e"
      ],
      "name": "macbook.local",
      "id": "E02831AD-4852-509D-9092-E65FB85EDFA7",
      "mac": [
        "3C-22-FB-D4-1C-44",
        "3E-22-FB-D4-1C-44",
        "AC-DE-48-00-11-22",
        "D2-29-FC-62-8C-00",
        "D2-29-FC-62-8C-01",
        "D2-29-FC-62-8C-04",
        "D2-29-FC-62-8C-05",
        "E6-48-47-CA-53-DA"
      ],
      "architecture": "x86_64"
    },
    "@version": "1",
    "http": {
      "request": {
        "method": "GET"
      },
      "response": {
        "status_code": 404,
        "body": {
          "bytes": 153
        }
      },
      "version": "1.1"
    },
    "event": {
      "agent_id_status": "auth_metadata_missing",
      "ingested": "2023-04-28T16:30:02Z",
      "original": "172.17.0.1 - - [28/Apr/2023:16:30:01 +0000] \"GET /nginx_status HTTP/1.1\" 404 153 \"-\" \"Elastic-Metricbeat/8.7.0 (darwin; amd64; a8dbc6c06381f4fe33a5dc23906d63c04c9e2444; 2023-03-23 00:44:48 +0000 UTC)\" \"-\"",
      "timezone": "+01:00",
      "dataset": "nginx.access"
    },
    "user_agent": {
      "original": "Elastic-Metricbeat/8.7.0 (darwin; amd64; a8dbc6c06381f4fe33a5dc23906d63c04c9e2444; 2023-03-23 00:44:48 +0000 UTC)"
    }
  }
}

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions