Skip to content

Commit

Permalink
finish source doc
Browse files Browse the repository at this point in the history
finish source doc
  • Loading branch information
xxzuo committed Jan 18, 2024
1 parent 91d3fbe commit 504a455
Show file tree
Hide file tree
Showing 2 changed files with 235 additions and 1 deletion.
12 changes: 11 additions & 1 deletion source/_posts/flume/Flume Sinks.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
---
title: Flume-Sinks文档(未开始)
title: Flume-Sinks文档(进行中)
author: xxzuo
tags:
- flume文档
categories:
- flume
date: 2023-07-13 22:21:31
---

### Flume Sinks

#### HDFS Sink

该 Sink 将事件写入 Hadoop 分布式文件系统 (HDFS)。 它目前支持创建文本和序列文件。 它支持两种文件类型的压缩。 可以根据经过的时间或数据大小或事件数量定期滚动文件(关闭当前文件并创建新文件)。 它还按时间戳或事件起源的机器等属性对数据进行存储/分区。 HDFS 目录路径可能包含格式化转义序列,这些序列将被 HDFS 接收器替换,以生成用于存储事件的目录/文件名。 使用此Sink 需要安装 hadoop,以便 Flume 可以使用 Hadoop jar 与 HDFS 集群进行通信。 请注意,需要支持sync() 调用的Hadoop版本。

以下是支持的转义序列:


224 changes: 224 additions & 0 deletions source/_posts/flume/Flume Sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -768,3 +768,227 @@ a1.sources.r1.channels = c1

该源基于 Jetty 9.4,并提供设置其他 Jetty 特定参数的能力,这些参数将直接传递到 Jetty 组件。

|属性名称|默认值|描述|
|---|---|---|
|**type**||组件类型名称,需要为http|
|**port**||源应绑定到的端口|
|bind|0.0.0.0|要监听的主机名或 IP 地址|
|handler|org.apache.flume.source.http.JSONHandler|handler 类的 FQCN|
|handler.*||handler的配置参数|
|selector.type|replicating|replicating 或 multiplexing|
|selector.*||依赖于selector.type值|
|interceptors||空格分隔的拦截器列表|
|interceptors.*|||
|ssl|false|将属性设置为 true 以启用 SSL。HTTP Source 不支持 SSLv3。|
|exclude-protocols|SSLv3|要排除的 SSL/TLS 协议的以空格分隔的列表。除了指定的协议之外,还将始终排除 SSLv3。|
|include-protocols||要包含的 SSL/TLS 协议的以空格分隔的列表。启用的协议将是不含排除协议的包含协议。如果 include-protocols 为空,它将包含每个支持的协议。|
|exclude-cipher-suites||要排除的密码套件的以空格分隔的列表。|
|include-cipher-suites||要包含的密码组合的以空格分隔的列表。启用的密码组合将是不含排除的密码组合的包含密码组合。|
|keystore||密钥库的位置,包括密钥库文件名。如果启用 SSL 但未在此指定 keystore,则将使用全局 keystore(如果已定义,否则配置错误)。|
|keystore-password||密钥库密码。如果启用 SSL 但未在此指定 keystore 密码,则将使用全局 keystore 密码(如果已定义,否则配置错误)。|
|keystore-type|JKS|密钥库类型。可以是 “JKS” 或 “PKCS12”。|
|QueuedThreadPool.*||将在 org.eclipse.jetty.util.thread.QueuedThreadPool 上设置的 Jetty 特定设置。注意:仅在设置了该类的至少一个属性时才会使用 QueuedThreadPool。|
|HttpConfiguration.*||将在 org.eclipse.jetty.server.HttpConfiguration 上设置的 Jetty 特定设置|
|SslContextFactory.*||将在 org.eclipse.jetty.util.ssl.SslContextFactory(仅在 _ssl_ 设置为 true 时适用)上设置的 Jetty 特定设置|
|ServerConnector.*||将在 org.eclipse.jetty.server.ServerConnector 上设置的 Jetty 特定设置|


已弃用的属性

|Property Name|Default|Description|
|---|---|---|
|keystorePassword||使用 _keystore-password_. 已弃用的值将被新值覆盖。 |
|excludeProtocols|SSLv3|使用 _exclude-protocols_. 已弃用的值将被新值覆盖。 |
|enableSSL|false|使用 _ssl_. 已弃用的值将被新值覆盖。 |

注意: Jetty 特定的设置是使用上面列出的对象上的 setter 方法来设置的。 有关完整详细信息,请参阅这些类([QueuedThreadPool](https://www.eclipse.org/jetty/javadoc/jetty-9/org/eclipse/jetty/util/thread/QueuedThreadPool.html)[HttpConfiguration](https://eclipse.dev/jetty/javadoc/jetty-9/org/eclipse/jetty/server/HttpConfiguration.html)[SslContextFactory](https://eclipse.dev/jetty/javadoc/jetty-9/org/eclipse/jetty/util/ssl/SslContextFactory.html)[ServerConnector](https://eclipse.dev/jetty/javadoc/jetty-9/org/eclipse/jetty/server/ServerConnector.html))的 Javadoc。

当使用 Jetty 特定的设置时,上面的命名属性将优先(例如,excludeProtocols 将优先于 SslContextFactory.ExcludeProtocols)。 所有属性的首字母均为小写。

名为 a1 的代理的 http 源示例:

```properties
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props
a1.sources.r1.HttpConfiguration.sendServerVersion = false
a1.sources.r1.ServerConnector.idleTimeout = 300
```


##### JSONHandler

开箱即用地提供了一个处理程序,它可以处理以 JSON 格式表示的事件,并支持 UTF-8、UTF-16 和 UTF-32 字符集。 该处理程序接受一组事件(即使只有一个事件,该事件也必须以数组形式发送),并根据请求中指定的编码将它们转换为 Flume 事件。 如果未指定编码,则假定为 UTF-8。 JSON 处理程序支持 UTF-8、UTF-16 和 UTF-32。 事件表示如下。

```json
[{
"headers" : {
"timestamp" : "434324343",
"host" : "random_host.example.com"
},
"body" : "random_body"
},
{
"headers" : {
"namenode" : "namenode.example.com",
"datanode" : "random_datanode.example.com"
},
"body" : "really_random_body"
}]
```

要设置字符集,请求的内容类型必须指定为 `application/json;charset=UTF-8`(根据需要将 UTF-8 替换为 UTF-16 或 UTF-32)。

以此处理程序期望的格式创建事件的一种方法是使用 Flume SDK 中提供的 JSONEvent 并使用 Google Gson 通过 Gson#fromJson(Object, Type) 方法创建 JSON 字符串。 作为事件列表的该方法的第二个参数传递的类型标记可以通过以下方式创建:

```java
Type type = new TypeToken<List<JSONEvent>>() {}.getType();
```



##### BlobHandler

默认情况下,HTTPSource 将 JSON 输入拆分为 Flume 事件。 作为替代方案,BlobHandler 是 HTTPSource 的处理程序,它返回一个事件,其中包含请求参数以及随此请求上传的二进制大对象 (BLOB)。 例如 PDF 或 JPG 文件。 请注意,此方法不适合非常大的对象,因为它将整个 BLOB 缓冲在 RAM 中。

|属性名 |默认值 |Description|
|---|---|---|
|**handler**||此类的 FQCN:`org.apache.flume.sink.solr.morphline.BlobHandler` |
|handler.maxBlobLength|100000000|针对给定请求读取和缓冲的最大字节数 |


#### Stress Source

StressSource 是一个内部负载生成源实现,对于压力测试非常有用。 它允许用户配置带有空标头的事件有效负载的大小。 用户可以配置要发送的事件总数以及要发送的成功事件的最大数量。

必需的属性以**粗体**显示。

|属性名称|默认值|描述|
|---|---|---|
|**type**||组件类型名称,需要为 org.apache.flume.source.StressSource|
|size|500|每个事件的有效负载大小。单位:**byte**|
|maxTotalEvents|-1|要发送的最大事件数|
|maxSuccessfulEvents|-1|已成功发送的最大事件数|
|batchSize|1|一次批处理中要发送的事件数|
|maxEventsPerSecond|0|如果设置为大于零的整数,则对源施加速率限制器。|

名为 a1 的代理示例:

```properties
a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1
```


#### Legacy Sources

Legacy Sources 允许 Flume 1.x agent接收来自 Flume 0.9.4 agent 的事件。 它接受 Flume 0.9.4 格式的事件,将它们转换为 Flume 1.0 格式,并将它们存储在连接的通道中。 0.9.4 事件属性(例如时间戳、pri、主机、nanos 等)将转换为 1.x 事件标头属性。 Legacy Sources 支持 Avro 和 Thrift RPC 连接。 要在两个 Flume 版本之间使用此桥梁,您需要使用 avroLegacy 或 thriftLegacy 源启动 Flume 1.x agent。 0.9.4 agent 应该让 agent Sink 指向 1.x 代理的主机/端口。

>说明: Flume 1.x 的可靠性语义与 Flume 0.9.x 的可靠性语义不同。 Legacy Sources 不支持 Flume 0.9.x agent 的 E2E 或 DFO 模式。 唯一支持的 0.9.x 模式是 best effort ,尽管 1.x 流的可靠性设置将适用于由 Legacy Sources 保存到 Flume 1.x 通道中的事件。
必需的属性以**粗体**显示。

##### Avro Legacy Source

| 属性名称 | 默认值 | 描述 |
| ---- | ---- | ---- |
| **channels** || |
| **type** || 组件的类型名称,需要是org.apache.flume.source.avroLegacy.AvroLegacySource |
| **host** || 要绑定的主机名或 IP 地址 |
| **port** || 要监听的端口号 |
| selector.type | | 复制或多路复用 |
| selector.* | replicating | 取决于selector.type的值 |
| interceptors || 空格分隔的拦截器列表 |
| interceptors.* | | |

名为 a1 的代理示例:

```properties
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
```


##### Thrift Legacy Source

|属性名称 |默认值 |描述 |
|---|---|---|
|**channels**|||
|**type**||组件的类型名称,需要是 org.apache.flume.source.thriftLegacy.ThriftLegacySource |
|**host**||要绑定的主机名或 IP 地址 |
|**port**||要监听的端口号 |
|selector.type||复制或多路复用 |
|selector.*|replicating|取决于selector.type的值 |
|interceptors||空格分隔的拦截器列表 |
|interceptors.*|||

名为 a1 的代理示例:

```properties
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
```


#### Custom Source

Custom Source 是您自己的 Source 接口的实现。 启动 Flume 代理时,Custom Source 的类及其依赖项必须包含在代理的类路径中。 自定义源的类型是其 FQCN。

| 属性名称 | 默认值 | 描述 |
| ---- | ---- | ---- |
| **channels** || |
| **type** || 组件的类型名称,需要是 FQCN |
| selector.type | | 复制或多路复用 |
| selector.* | replicating | 取决于selector.type的值 |
| interceptors || 空格分隔的拦截器列表 |
| interceptors.* | | |

名为 a1 的代理示例:
```properties
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1
```



#### Scribe Source

Scribe 是另一种类型的摄取系统。 为了采用现有的Scribe摄取系统,Flume应该使用基于Thrift且兼容传输协议的ScribeSource。 如需部署 Scribe,请遵循 Facebook 的指南。 必需的属性以**粗体**显示。

|属性名称 |默认值 |描述 |
|---|---|---|
|**type**||组件的类型名称,需要是 `org.apache.flume.source.scribe.ScribeSource` |
|port|1499|Scribe 应连接的端口 |
|maxReadBufferBytes|16384000|Thrift 默认帧缓冲区大小 |
|workerThreads|5|在 Thrift 中处理线程数 |
|selector.type|||
|selector.*|||

名为 a1 的代理示例:

```properties
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1
```

0 comments on commit 504a455

Please sign in to comment.