Skip to content

Commit

Permalink
flume doc update
Browse files Browse the repository at this point in the history
flume doc update
  • Loading branch information
xxzuo committed Jan 15, 2024
1 parent 99a194e commit 91d3fbe
Showing 1 changed file with 244 additions and 0 deletions.
244 changes: 244 additions & 0 deletions source/_posts/flume/Flume Sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -523,4 +523,248 @@ a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>

**Kerberos and Kafka Source:**

要将 Kafka Source 与受 Kerberos 保护的 Kafka 集群一起使用,请为消费者设置上面提到的 Consumer.security.protocol 属性。 与 Kafka 代理一起使用的 Kerberos 密钥表和主体在 JAAS 文件的“KafkaClient”部分中指定。 “客户端”部分描述了 Zookeeper 连接(如果需要)。 有关 JAAS 文件内容的信息,请参阅 [Kafka 文档](https://kafka.apache.org/documentation.html#security_sasl_clientconfig)。 该 JAAS 文件的位置以及可选的系统范围 kerberos 配置可以通过 Flume-env.sh 中的 JAVA_OPTS 指定:
```shell
JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
```

使用 SASL_PLAINTEXT 的安全配置示例:

```properties
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
```

使用 SASL_SSL 的安全配置示例:
```properties
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>
```

示例 JAAS 文件。 有关其内容的参考,请参阅 [SASL 配置的 Kafka 文档](https://kafka.apache.org/documentation/#security_sasl_clientconfig)中所需身份验证机制 (GSSAPI/PLAIN) 的客户端配置部分。 由于Kafka Source也可能连接到Zookeeper进行偏移迁移,因此本示例中还添加了“Client”部分。 除非您需要偏移迁移,或者您需要此部分用于其他安全组件,否则不需要此部分。 另请确保 Flume 进程的操作系统用户具有 jaas 和 keytab 文件的读取权限。

```
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
```


#### NetCat TCP Source

类似 netcat 的源,监听给定端口并将每一行文本转换为一个事件。 其作用类似于
`nc -k -l [host] [port]`
换句话说,它打开指定的端口并监听数据。 期望提供的数据是换行分隔的文本。 每行文本都会转化为 Flume 事件并通过连接的通道发送。
必需的属性以粗体显示

|属性名称|默认值|描述|
|---|---|---|
|**channels**|||
|**type**||组件类型名称,需要为 netcat|
|**bind**||要绑定的主机名或 IP 地址|
|**port**||要绑定的端口号|
|max-line-length|512|每条事件体的最大行长度(以字节为单位)|
|ack-every-event|true|对收到的每条事件响应“OK”|
|selector.type|replicating|replicating 或 multiplexing|
|selector.*||依赖于 selector.type 值|
|interceptors||空格分隔的拦截器列表|
|interceptors.*|||

名为 a1 的代理示例:

```properties
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
```


#### NetCat UDP Source

根据原始 Netcat (TCP) 源,该源侦听给定端口并将每行文本转换为事件并通过连接的通道发送。 行为就像 `nc -u -k -l [host] [port]`
必需的属性以粗体显示

| 属性名称 | 默认值 | 描述 |
| ---- | ---- | ---- |
| **channels** || |
| **type** || 组件类型名称,需要为 `netcatudp` |
| **bind** || 要绑定的主机名或 IP 地址 |
| **port** || 要绑定的端口号 |
| remoteAddressHeader || |
| selector.type | replicating | replicating 或 multiplexing |
| selector.* | | 依赖于 selector.type 值 |
| interceptors || 空格分隔的拦截器列表 |
| interceptors.* | | |

名为 a1 的代理示例:
```properties
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcatudp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
```

#### Sequence Generator Source

一个简单的序列生成器,它使用从 0 开始、以 1 递增并在totalEvents 处停止的计数器连续生成事件。 当无法将事件发送到通道时重试。 主要用于测试。 在重试期间,它会保持重试消息的正文与之前相同,以便在目的地进行重复数据删除之后,唯一事件的数量预计等于指定的totalEvents。 必需的属性以粗体显示。

|属性名称|默认值|描述|
|---|---|---|
|**channels**|||
|**type**||组件类型名称,需要为 seq|
|selector.type||replicating 或 multiplexing|
|selector.*|replicating|取决于 selector.type 值|
|interceptors||空格分隔的拦截器列表|
|interceptors.*|||
|batchSize|1|每次请求循环尝试处理的事件数。|
|totalEvents|Long.MAX_VALUE|源发送的唯一事件数。|

名为 a1 的代理示例:
```properties
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1
```

#### Syslog Sources

读取系统日志数据并生成 Flume 事件。 UDP 源将整个消息视为单个事件。 TCP 源为每个由换行符 (‘n’) 分隔的字符串创建一个新事件。
必需的属性以粗体显示。
##### Syslog TCP Source

原始的、经过验证的 syslog TCP 源。

|属性名称|默认值|描述|
|---|---|---|
|**channels**|||
|**type**||组件类型名称,需要为 syslogtcp|
|**host**||要绑定的主机名或 IP 地址|
|**port**||要绑定的端口号|
|eventSize|2500|单个事件行的最大大小(以字节为单位)|
|keepFields|none|将此设置为“all”将在事件正文中保留优先级、时间戳和主机名。也允许以空格分隔的字段列表。当前,可以包含以下字段:priority、version、timestamp、hostname。值“true”和“false”已弃用,改用“all”和“none”。 |
|clientIPHeader||如果指定,客户端的 IP 地址将使用此处指定的标头名称存储在每个事件的标头中。这允许拦截器和通道选择器根据客户端的 IP 地址自定义路由逻辑。不要在这里使用标准的 Syslog 标头名称(如 _host_),因为事件标头在那种情况下会被覆盖。|
|clientHostnameHeader||如果指定,客户端的主机名将使用此处指定的标头名称存储在每个事件的标头中。这允许拦截器和通道选择器根据客户端的主机名自定义路由逻辑。检索主机名可能涉及名称服务反向查找,这可能会影响性能。不要在这里使用标准的 Syslog 标头名称(如 _host_),因为事件标头在那种情况下会被覆盖。|
|selector.type||replicating 或 multiplexing|
|selector.*|replicating|取决于 selector.type 值|
|interceptors||空格分隔的拦截器列表|
|interceptors.*|||
|ssl|false|将此设置为 true 以启用 SSL 加密。如果启用 SSL,还必须通过组件级参数(参见下文)或全局 SSL 参数(参见 [SSL/TLS support](https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#ssl-tls-support)) 部分)指定“keystore”和“keystore-password”。 |
|keystore||这是 Java 密钥库文件的路径。如果在这里未指定,则将使用全局密钥库(如果已定义,否则将产生配置错误)。|
|keystore-password|| Java 密钥库的密码。如果在此未指定,则将使用全局密钥库密码(如果已定义,否则会产生配置错误)。|
|keystore-type|JKS| Java 密钥库的类型。可以是 “JKS” 或 “PKCS12”。如果在此处未指定,则将使用全局密钥库类型(如果已定义,否则默认为 JKS)。|
|exclude-protocols|SSLv3|要排除的 SSL/TLS 协议的以空格分隔的列表。除了指定的协议之外,还将始终排除 SSLv3。|
|include-protocols||要包括的 SSL/TLS 协议的以空格分隔的列表。启用的协议将是不含排除协议的包含协议。如果 include-protocols 为空,它将包含每个支持的协议。|
|exclude-cipher-suites||要排除的密码套件的以空格分隔的列表。|
|include-cipher-suites||要包含的密码组合的以空格分隔的列表。启用的密码组合将是不含排除的密码组合的包含密码组合。如果 include-cipher-suites 为空,它将包含每个支持的密码组合。|

例如,名为 a1 的代理的 syslog TCP source:
```properties
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
```


##### Multiport Syslog TCP Source

这是 Syslog TCP 源的更新、更快、支持多端口的版本。 请注意,端口配置设置已替换端口。 多端口功能意味着它可以高效地同时侦听多个端口。 该源使用 Apache Mina 库来执行此操作。 提供对 RFC-3164 和许多常见 RFC-5424 格式消息的支持。 还提供配置每个端口使用的字符集的功能。

|属性名称|默认值|描述|
|---|---|---|
|**channels**|||
|**type**||组件类型名称,需要为 multiport_syslogtcp|
|**host**||要绑定的主机名或 IP 地址|
|**ports**||要绑定的一个或多个端口的空格分隔列表|
|eventSize|2500|单个事件行的最大大小(以字节为单位)|
|keepFields|none|将此设置为“all”将在事件正文中保留优先级、时间戳和主机名。也允许以空格分隔的字段列表。 当前可以包含以下字段:priority、version、timestamp、hostname。值“true”和“false”已弃用,改用“all”和“none”。|
|portHeader||如果指定,端口号将使用此处指定的标头名称存储在每个事件的标头中。这允许拦截器和通道选择器根据传入端口自定义路由逻辑。|
|clientIPHeader||如果指定,客户端的IP地址将使用此处指定的标头名称存储在每个事件的标头中。这允许拦截器和通道选择器根据客户端的IP地址自定义路由逻辑。不要在这里使用标准的Syslog标头名称(如 \_host\_) 因为事件标头在那种情况下会被覆盖。 |
|clientHostnameHeader||如果指定,客户端的主机名将使用此处指定的标头名称存储在每个事件的标头中。这允许拦截器和通道选择器根据客户端的主机名自定义路由逻辑。检索主机名可能涉及名称服务反向查找,这可能会影响性能。不要在这里使用标准的Syslog标头名称(如 \_host\_)因为事件标头在那种情况下会被覆盖。 |
|charset.default|UTF-8|解析syslog事件为字符串时使用的默认字符集。|
|charset.port.\<port> ||可以对每个端口配置字符集。|
|batchSize|100|每次请求循环尝试处理的最大事件数。使用默认值通常就好。|
|readBufferSize|1024|内部Mina读取缓冲区的大小。提供性能优化。使用默认值通常就好。|
|numProcessors|(auto-detected)|用于处理消息的系统上可用的处理器数。默认情况下,使用Java运行时API自动检测CPU数。Mina将为每个检测到的CPU生成2个请求处理线程,这通常是合理的。|
|selector.type|replicating|replicating、multiplexing或custom|
|selector.*||依赖于selector.type的值|
|interceptors||空格分隔的拦截器列表。|
|interceptors.*|||
|ssl|false|将此设置为true可启用SSL加密。 如果启用SSL,还必须通过组件级参数(参见下文)或全局SSL参数(请参阅[SSL/TLS支持](https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#ssl-tls-support)部分)指定“keystore”和“keystore-password”。|
|keystore||这是Java密钥库文件的路径。如果在这里未指定,则将使用全局密钥库(如果已定义,否则将产生配置错误)。|
|keystore-password||Java密钥库的密码。如果在此未指定,则将使用全局密钥库密码(如果已定义,否则会产生配置错误)。|
|keystore-type|JKS|Java密钥库的类型。可以是“JKS”或“PKCS12”。如果在此处未指定,则将使用全局密钥库类型(如果已定义,否则默认为JKS)。|
|exclude-protocols|SSLv3|要排除的SSL/TLS协议的以空格分隔的列表。除了指定的协议之外,还将始终排除SSLv3。|
|include-protocols||要包含的SSL/TLS协议的以空格分隔的列表。启用的协议将是不含排除协议的包含协议。如果include-protocols为空,它将包含每个支持的协议。|
|exclude-cipher-suites||要排除的密码套件的以空格分隔的列表。|
|include-cipher-suites||要包含的密码组合的以空格分隔的列表。启用的密码组合将是不含排除的密码组合的包含密码组合。如果include-cipher-suites为空,它将包含每个支持的密码组合。|

##### Syslog UDP Source

|属性名称|默认值|描述|
|---|---|---|
|**channels**|||
|**type**||组件类型名称,需要为 syslogudp|
|**host**||要绑定的主机名或 IP 地址|
|**port**||要绑定的端口号|
|keepFields|false|将此设置为 true 将在事件正文中保留优先级、时间戳和主机名。|
|clientIPHeader||如果指定,客户端的 IP 地址将使用此处指定的标头名称存储在每个事件的标头中。这允许拦截器和通道选择器根据客户端的 IP 地址自定义路由逻辑。不要在这里使用标准的 Syslog 标头名称(如 \_host\_),因为事件标头在那种情况下会被覆盖。 |
|clientHostnameHeader||如果指定,客户端的主机名将使用此处指定的标头名称存储在每个事件的标头中。这允许拦截器和通道选择器根据客户端的主机名自定义路由逻辑。检索主机名可能涉及名称服务反向查找,这可能会影响性能。不要在这里使用标准的 Syslog 标头名称(如 \_host\_),因为事件标头在那种情况下会被覆盖。 |
|selector.type||replicating 或 multiplexing|
|selector.*|replicating|取决于 selector.type 值|
|interceptors||空格分隔的拦截器列表|
|interceptors.*|||

例如,名为 a1 的代理的 syslog UDP source:
```properties
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
```


#### HTTP Source

通过 HTTP POST 和 GET 接受 Flume 事件的源。 GET 只能用于实验。 HTTP 请求通过可插入“handler”转换为 Flume 事件,该处理程序必须实现 HTTPSourceHandler 接口。 该处理程序接受 HttpServletRequest 并返回 Flume 事件列表。 从一个 Http 请求处理的所有事件都会在一个事务中提交到通道,从而提高文件通道等通道的效率。 如果处理程序引发异常,该源将返回 HTTP 状态 400。如果通道已满,或者源无法将事件附加到通道,则源将返回 HTTP 503 - 暂时不可用状态。

在一个 post 请求中发送的所有事件都被视为一批并在一笔事务中插入到通道中。

该源基于 Jetty 9.4,并提供设置其他 Jetty 特定参数的能力,这些参数将直接传递到 Jetty 组件。

0 comments on commit 91d3fbe

Please sign in to comment.