介绍
概述
Apache Flume 是一个用来从非常多不同的源有效地收集。聚集和移动大量的日志数据到一个中心数据仓库的分布式的,可靠的和可用的系统。
Apache Flume 是 Apache 软件基金会的顶级项目。眼下有两个可获得的公布代码路线,0.9.x 版本号和 1.x 版本号。
本文档适用于 1.x 代码线。对于 0.9.x 代码线。请看 Flume 0.9.x 开发指南。
结构
数据流模型
一个 Event 是在 Flume 代理之间流动的数据单元。Event 从 Source 流动到 Channel 再到 Sink。并由一个 Event 接口的实现表示。
一个 Event 携带着一个有效负载(字节数组)和一个可选的头部(字符串属性)集合。一个 Flume 代理是一个进程(JVM),它能控制组件同意 Events 从一个外部源流向一个外部目的地。
一个 Source 消耗有特殊格式的 Events,而且那些 Events 通过像 Webserver 之类的外部源被传送到 Source。比如。一个 AvroSource 能够用来从 client 或从流中的其它的 Flume 代理接收 Avro Events。当一个源收到了一个 Event。它将它存入到一个或多个 Channel 中。Channel 採用被动存储的形式,Channel 会缓存该 Event 直到它被一个 Sink 处理。在 Flume 中。有一种 Channel 类型是 FileChannel,它使用本地文件系统作为它的备份仓库。一个 Sink 负责将 Event 从 Channel 中移除,并将它放到外部仓库中,比如 HDFS(这样的情况下使用的是 HDFSEventSink),或者将它放置到流中下一跳的 Source 中。在给定的代理中。Source 和 Sink 是异步执行的,由于 Events 会缓存在 Channel 中。
可靠性
一个 Event 被缓存在 Flume 代理的 Channel 中。然后就是 Sink 的任务来将 Event 传送到流中的下一个代理或者目标仓库(比如 HDFS)。Sink 仅仅有在 Event 存储到下一个代理的 Channel 或者目标仓库中,才会将 Event 从 Channel 中移除。这就是单跳消息传递语义怎样在 Flume 中提供端到端的流的可靠性。Flume 使用一个事务处理方法保证 Events 传输的可靠性。Sources 和 Sinks 在由 Channel 提供的事务中封装了 Events 的存储和检索。这保证了 Events 集合可靠地在流中点到点传输。在多跳流的样例中,前一跳的 Sink 和后一跳的 Source 都有各自的事务执行来保证数据被安全地存储在下一跳的 Channel 中。
构建 Flume
获取源码
使用 Git 检出代码。
获取 git 仓库根文件夹点击此处 https://git-wip-us.apache.org/repos/asf/flume.git。
Flume 1.x 的开发在 "trunk" 分支之下进行。所以能够使用以下的命令行:
- git clone https: //git-wip-us.apache.org/repos/asf/flume.git
编译 / 測试 Flume
Flume 是以 Maven 方式构建的。你能够使用标准的 Maven 命令编译 Flume:
1. 仅仅编译:mvn clean compile
2. 编译并执行单元測试:mvn clean test
3. 执行独立測试:mvn clean test –Dtest=<Test1><Test2>,…-DfailIfNoTests=false
4. 创建 tarball 包:mvn clean install
5. 创建 tarball 包(跳过单元測试):mvn clean install –DskipTests
请注意,Flume 的构建须要 GoogleProtocol Buffers 编译器在路径中。
你能够通过以下的介绍下载并安装它 https://developers.google.com/protocol-buffers/,。
开发自己定义组件
client
Client 在 Event 的起始点进行操作。并将他们传送到一个 Flume 代理上。Client 通常在它们处理的数据来自于的程序的进程空间内操作。
Flume 眼下支持 Avro,log4j,syslog 和 Http POST(使用一个 JSON)等方式从一个外部源数据传输。
除此之外,有一个 ExecSource 能够处理本地进程的输出作为给 Flume 的输入。
非常有可能有一个用例使得当前存在的选项都没有效。
在这样的情况下,你能够建立一个自己定义的机制发送数据给 Flume。
要实现这个有两个方法。
第一个方法是创建一个自己定义的 Client 来跟 Flume 已经存在的 Source 像 AvroSource 或者 SyslogTcpSource 通信。这里 Client 应该把它的数据转换成这些 Flume Source 能够理解的数据。
还有一个选择是写一个自己定义的 Flume Source,使用 IPC 或者 RPC 协议,直接和你已有的 client 程序通信,并把 client 的数据转换成 Flume Events 进行发送。注意全部存储在一个 Flume 代理的 Channel 中 events 必须以 Flume Events 的形式存在。
clientSDK
虽然 Flume 包括了多个内建的机制(比如 Sources)来接收数据,可是人们常常想要可以从一个自己定义的程序直接与 Flume 交互。Flume Client SDK 就是一个可以让应用程序使用 RPC 协议连接到 Flume 并给 Flume 的数据流发送数据的库。
RPCclient 接口
Flume RpcClient 接口的实现封装着 Flume 支持的 RPC 机制。
用户的程序能够简单地调用 Flume Client SDK 中的 append(Event) 或者 appendBatch(List<Event>) 来发送数据而不必操心底层信息交换的细节。直接实现 Event 接口,同一个方便的实现 SimpleEvent 类。或者通过使用 EventBuilder 的重载的静态辅助方法 wintBody()。用户能够提供须要的 Event 參数。
RPCclient——Avro 和 Thrift
在 Flume1.4.0 中,Avro 是默认的 RPC 协议。NettyAvroRpcClient 和 ThriftRpcClient 实现了 RpcClient 接口。client 须要目标 Flume 代理的主机地址和 port 号来创建这个对象,然后就能够使用 RpcClient 将数据发送给代理。以下的样例展示了在一个用户的数据生成程序中怎样使用 Flume Client SDK API:
- import org.apache.flume.Event;
- import org.apache.flume.EventDeliveryException;
- import org.apache.flume.api.RpcClient;
- import org.apache.flume.api.RpcClientFactory;
- The remote Flume agent needs to have an AvroSource(or aThriftSourceif you are using a
- Thrift client) listening on some port. Below is an example Flumeagent configuration that's
- import org.apache.flume.event.EventBuilder;
- import java.nio.charset.Charset;
- public class MyApp {
- public static void main(String[] args) {
- MyRpcClientFacade client = new MyRpcClientFacade();
- // Initialize client with the remote Flume agent's host and port
- client.init("host.example.org", 41414);
- // Send 10 events to the remote Flume agent. That agent should be
- // configured to listen with an AvroSource.
- String sampleData = "Hello Flume!";
- for (int i = 0; i < 10; i++) {
- client.sendDataToFlume(sampleData);
- }
- client.cleanUp();
- }
- }
- class MyRpcClientFacade {
- private RpcClient client;
- private String hostname;
- private int port;
- public void init(String hostname, int port) {
- // Setup the RPC connection
- this.hostname = hostname;
- this.port = port;
- this.client = RpcClientFactory.getDefaultInstance(hostname, port);
- // Use the following method to create a thrift client (instead ofthe above line):
- // this.client = RpcClientFactory.getThriftInstance(hostname, port);
- }
- public void sendDataToFlume(String data) {
- // Create a Flume Event object that encapsulates the sample data
- Event event = EventBuilder.withBody(data,Charset.forName("UTF-8"));
- // Send the event
- try {
- client.append(event);
- } catch (EventDeliveryException e) {
- // clean up and recreate the client
- client.close();
- client = null;
- client = RpcClientFactory.getDefaultInstance(hostname, port);
- // Use the following method to create a thrift client (instead ofthe above line):
- // this.client = RpcClientFactory.getThriftInstance(hostname, port);
- }
- }
- public void cleanUp() {
- // Close the RPC connection
- client.close();
- }
- }
远程 Flume 代理须要有一个 AvroSource(或者假设你用的是 Thriftclient 的话那就是 ThriftSource)监听某个 port。以下是一个 Flume 代理的配置在等带来自与 MyApp 的连接:
- a1.channels = c1
- a1.sources = r1
- a1.sinks = k1
- a1.channels.c1.type = memory
- a1.sources.r1.channels = c1
- a1.sources.r1.type = avro
- # For using a thrift source set the following instead of the aboveline.
- # a1.source.r1.type = thrift
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 41414
- a1.sinks.k1.channel = c1
- a1.sinks.k1.type = logger
为了更具灵活性,默认的 Flumeclient 实现(NettyAvroRpcClient 和 ThriftRpcClient)能够用以下的属性进行配置:
- client.type =
- default(
- for avro) or thrift(
- for thrift)
- hosts = h1#
- default clientaccepts only 1 host# (additional hosts will be ignored) hosts.h1 = host1.example.org: 41414#host and port must both be specified# (neither has a
- default) batch - size = 100#Must be >= 1(
- default:
- 100) connect - timeout = 20000#Must be >= 1000(
- default:
- 20000) request - timeout = 20000#Must be >= 1000(
- default:
- 20000)
故障转移 client
这个类封装了默认的 RPCclient 来给 client 提供故障转移能力。这个须要由空格分隔的 <主机>:<port> 列表表示的 Flume 代理组成一个故障转移组。故障转移 RPCclient 眼下不支持 Thrift。假设和眼下选择的主机(比如代理)代理通信出现错误,那么故障转移 client 就会自己主动故障转移到列表中的下一个主机中。比如:
- // Setup properties for the failover
- Properties props = new Properties();
- props.put("client.type", "default_failover");
- // List of hosts (space-separated list of user-chosen host aliases)
- props.put("hosts", "h1 h2 h3");
- // host/port pair for each host alias
- String host1 = "host1.example.org:41414";
- String host2 = "host2.example.org:41414";
- String host3 = "host3.example.org:41414";
- props.put("hosts.h1", host1);
- props.put("hosts.h2", host2);
- props.put("hosts.h3", host3);
- // create the client with failover properties
- RpcClient client = RpcClientFactory.getInstance(props);
为了能更灵活。故障转移 Flumeclient 实现(FailoverRpcClient)能够用以下的属性来配置:
- client.type = default_failover
- hosts = h1 h2 h3 # at least one isrequired, but 2 or
- # more makes better sense
- hosts.h1 = host1.example.org:41414
- hosts.h2 = host2.example.org:41414
- hosts.h3 = host3.example.org:41414
- max-attempts = 3 # Must be >=0(default: number of hosts
- # specified, 3 in this case). A '0'
- # value doesn't make much sense because
- # it will just cause an append call to
- # immmediately fail. A '1' value means
- # that the failover client will try only
- # once to send the Event, and if it
- # fails then there will be no failover
- # to a second client, so this value
- # causes the failover client to
- # degenerate into just a default client.
- # It makes sense to set this value to at
- # least the number of hosts that you
- # specified.
- batch-size = 100 # Must be >=1(default: 100)
- connect-timeout = 20000 # Must be >=1000 (default:20000)
- request-timeout = 20000 # Must be >=1000 (default:20000)
负载均衡 RPCclient
FlumeclientSDK 也支持一个 RpcClient 在多个主机之间负载均衡。这样的类型的 client 须要空格分隔的表示 Flume 代理的 <host>:<port> 列表,组成一个负载平衡组。这个 client 能够被配置一个负载平衡的策略,可能是随机选择一个配置的主机。或者以循环的方式选择一个主机。
你也能够自己定义你自己的类来实现 LoadBalancingRpcClient$HostSelector 接口,来使用一个自己定义的选择顺序。在那种情况下,这个自己定义类的全类名须要在 host-selector 中的属性中指定。
负载均衡 RPCclient 眼下不支持 Thrift。
假设启用了 backoff 属性。client 会将失败的主机暂时存放起来。这会导致这些主机在给定的超时时间内被排除在可选的主机列表中。
当超过超时时间,假设某个主机仍然无响应,该主机将会被觉得是一个连续的失效,从而导致超时时间的设置会成倍增长。以避免因为这些未响应的主机而陷入长时间的等待。
Backoff 时间的最大值能够通过 maxBackoff(单位毫秒)来设置。
maxBackoff 的默认值为 30 秒(在 OrderSelector 类中指定。它是全部负载均衡策略的超类)。Backoff 超时时间会随着每一个连续失败增长直到达到最大超时时间。
超时时间最大的可能值是 65535 秒(约 18.2 个小时)。比如:
- // Setup properties for the load balancing
- Properties props = new Properties();
- props.put("client.type", "default_loadbalance");
- // List of hosts (space-separated list of user-chosen host aliases)
- props.put("hosts", "h1 h2 h3");
- // host/port pair for each host alias
- String host1 = "host1.example.org:41414";
- String host2 = "host2.example.org:41414";
- String host3 = "host3.example.org:41414";
- props.put("hosts.h1", host1);
- props.put("hosts.h2", host2);
- props.put("hosts.h3",host3);
- props.put("host-selector", "random"); // Forrandom host selection
- // props.put("host-selector", "round_robin"); //For round-robin host
- // // selection
- props.put("backoff","true"); // Disabled by default.
- props.put("maxBackoff", "10000"); // Defaults 0,which effectively
- // becomes 30000 ms
- // Create the client with load balancing properties
- RpcClient client = RpcClientFactory.getInstance(props);
为了更具灵活性。负载均衡 Flumeclient 实现(LoadBalancingRpcClient)能够用例如以下的属性进行配置:
- client.type = default_loadbalance
- hosts = h1 h2 h3 # At least 2hosts are required
- hosts.h1 = host1.example.org:41414
- hosts.h2 = host2.example.org:41414
- hosts.h3 = host3.example.org:41414
- backoff = false # Specifieswhether the client should
- # back-off from (i.e. temporarily
- # blacklist) a failed host
- # (default: false).
- maxBackoff = 0 # Max timeout inmillis that a will
- # remain inactive due to a previous
- # failure with that host (default: 0,
- # which effectively becomes 30000)
- host-selector = round_robin # The host selectionstrategy used
- # when load-balancing among hosts
- # (default: round_robin).
- # Other values are include "random"
- # or the FQCN of a custom class
- # that implements
- # LoadBalancingRpcClient$HostSelector
- batch-size = 100 # Must be >=1 (default: 100)
- connect-timeout = 20000 # Must be >=1000 (default: 20000)
- request-timeout = 20000 # Must be >=1000 (default: 20000)
嵌入式代理
Flume 有一套嵌入式代理 API。它同意用户将一个代理嵌入到他们的应用程序中。这个代理是轻量级的,并不支持全部的 Sources。Sinks 和 Channels。
Source 使用的是一种特殊的 Source,Events 须要通过 EmbeddedAgent 对象的 put。putAll 方法发送到 Source。仅仅有文件 Channel 和内存 Channel 是支持的 Channel,Avro Sink 是唯一支持的 Sink。
注意:嵌入式代理须要依赖 hadoop-core.jar 包
嵌入式代理的配置和全然代理的配置是非常相似的。
以下是一个具体的配置选项的列表:
必要的属性用黑体表示。
Property Name |
Default |
Description |
source.type |
embedded |
唯一可用的 Source 就是嵌入式 Source |
channel.type |
- |
Memory 或 file 分别相应 Memory Channel 和 FileChannel |
channel.* |
- |
对 Channel 类型的配置选项。查看 MemoryChannel 或者 FileChannel 用户指南查找更详尽的列表 |
sinks |
- |
Sink 名称的列表 |
sink.type |
- |
属性名称必须和 Sinks 列表中的一个名称匹配。 值必须是 avro |
sink.* |
- |
Sink 的配置选项。查看 AvroSink 用户指南获得更详尽的列表,然而要注意 AvroSink 至少须要主机名和 port 号 |
processor.type |
- |
Failover 或者 load_balance。分别和 FailoverSinksProcessor 和 LoadBalancingSinkProcessor 一致 |
processor.* |
- |
对选择的 Sink 处理器的配置项。查看 FailoverSinksProcessor 和 LoadBalancingSinkProcessor 用户指南查看更详尽的列表 |
以下是一个样例展示如何使用代理:
- Map<String, String> properties = newHashMap<String, String>();
- properties.put("channel.type","memory");
- properties.put("channel.capacity","200");
- properties.put("sinks","sink1 sink2");
- properties.put("sink1.type","avro");
- properties.put("sink2.type","avro");
- properties.put("sink1.hostname","collector1.apache.org");
- properties.put("sink1.port","5564");
- properties.put("sink2.hostname", "collector2.apache.org");
- properties.put("sink2.port","5565");
- properties.put("processor.type","load_balance");
- EmbeddedAgent agent = newEmbeddedAgent("myagent");
- agent.configure(properties);
- agent.start();
- List<Event> events =Lists.newArrayList();
- events.add(event);
- events.add(event);
- events.add(event);
- events.add(event);
- agent.putAll(events);
- ...
- agent.stop();
Transaction 接口
Transaction 接口是 Flume 可靠性的基础。全部基本的组件(比如 Sources。Sinks 和 Channels)必须使用一个 Flume Transaction。
一个 Transaction 是在一个 Channel 实现中实现的。每一个与 Channel 连接的 Source 和 Sink 必须获得一个 Transaction 对象。
Sources 实际上使用一个 ChannelSelector 接口来封装 Transaction。存储(把它放到 Channel 中)和提取(把它从 Channel 中拿出来)一个 Event 的操作都是在一个活动的 Transaction 中完毕的。比如:
- Channel ch = new MemoryChannel();
- Transaction txn = ch.getTransaction();
- txn.begin();
- try {
- // This try clause includes whatever Channel operations you want todo
- Event eventToStage = EventBuilder.withBody("Hello Flume!",
- Charset.forName("UTF-8"));
- ch.put(eventToStage);
- // Event takenEvent = ch.take();
- // ...
- txn.commit();
- } catch (Throwable t) {
- txn.rollback();
- // Log exception, handle individual exceptions as needed
- // re-throw all Errors
- if (t instanceof Error) {
- throw (Error)t;
- }
- } finally {
- txn.close();
- }
这里我们从一个 Channel 中获取了一个 Transaction。在 begin() 返回后,Transaction 如今是活动 / 打开的,然后 Event 被放到 Channel 里。
假设放置成功,Transaction 就进行提交并关闭。
Sink
Sink 的目的是将 Events 从 Channel 中取出并将他们发送到下一个流中的 Flume 代理或者将他们存储到一个外部仓库中。一个 Sink 仅仅与一个 Channel 相关,如在 Flume 属性中配置的那样。有一个 SinkRunner 实例。它和每个配置的 Sink 都有关系,当 Flume 框架调用 SinkRunner.start(),一个新线程就会被创建来驱动 Sink(使用 SinkRunner.PollingRunner 作为线程的 Runnable)。
这个线程管理 Sink 的生命周期。Sink 须要实现 start() 和 sttop() 方法,这两个方法是 LifecycleAware 接口的一部分。Sink.start() 方法应该初始化 Sink 并把它带入它能够将 Events 发送到下一个目的地的状态。Sink.process() 方法须要做核心的处理来将 Events 从 Channel 中取出并转发它。
Sink.stop() 方法须要做必要的清理(比如释放资源)。
Sink 的实现也须要实现 Configurable 接口来处理它自己的配置设置。比如:
- public class MySink extends AbstractSink implements Configurable {
- private String myProp;
- @Override
- public void configure(Context context) {
- String myProp = context.getString("myProp","defaultValue");
- // Process the myProp value (e.g. validation)
- // Store myProp for later retrieval by process() method
- this.myProp = myProp;
- }
- @Override
- public void start() {
- // Initialize the connection to the external repository (e.g. HDFS)that
- // this Sink will forward Events to ..
- }
- @Override
- public void stop () {
- // Disconnect from the external respository and do any
- // additional cleanup (e.g. releasing resources or nulling-out
- // field values) ..
- }
- @Override
- public Status process() throws EventDeliveryException {
- Status status = null;
- // Start transaction
- Channel ch = getChannel();
- Transaction txn = ch.getTransaction();
- txn.begin();
- try {
- // This try clause includes whatever Channel operations you want todo
- Event event = ch.take();
- // Send the Event to the external repository.
- // storeSomeData(e);
- txn.commit();
- status = Status.READY;
- } catch (Throwable t) {
- txn.rollback();
- // Log exception, handle individual exceptions as needed
- status = Status.BACKOFF;
- // re-throw all Errors
- if (t instanceof Error) {
- throw (Error)t;
- }
- } finally {
- txn.close();
- }
- return status;
- }
- }
Source
Source 的目的是从外部 client 获取数据并把它存储到 Channel 中。一个 Source 能够获取它自己的 ChannelProcessor 的一个实例来处理 Event。
ChannelProcessor 也能够获得它自己的 ChannelSelector 的一个实例来获取和 Source 相关联的 Channel,正如在 Flume 属性中配置的那样。然后一个 Transaction 从每个相关的 Channel 中取出。这样 Source 就能够通过一个 Transaction 把 Event 可靠地放入 Channel 中。
类似于 SinkRunner.RollingRunner 中的 Runnable,当 Flume 框架调用 PollableSourceRunner.start() 的时候。在创建的线程上会有一个 PollingRunner 的 Runnable 执行。每一个经过配置的 PollableSource 都和它自己的执行着 PollingRunner 的线程相关。
这个线程管理 PollableSource 的生命周期,比如启动和停止。一个 PollableSource 实现必须实如今 LifecycleAware 接口中声明的 start() 方法和 stop() 方法。PollableSource 的执行要调用 Source 的 process() 方法。
Process() 方法应该检查新数据并把它们以 Flume Events 的方式存入到 Channel 中。
注意实际上有两种类型的 Sources。PollableSource 已经提过了。
另外一个是 EventDrivenSource。EventDrivenSource。不像 PollableSource,必须有它自己的回调机制来捕获数据并它存入 Channel。EventDrivenSource 并非像 PollableSource 那样每一个都由它们自己的线程驱动。以下是一个自己定义 PollableSource 的样例:
- public class MySource extends AbstractSource implementsConfigurable, PollableSource
- private String myProp;
- Channel
- @Override
- public void configure(Context context) {
- String myProp = context.getString("myProp", "defaultValue");
- // Process the myProp value (e.g. validation, convert to anothertype, ...)
- // Store myProp for later retrieval by process() method
- this.myProp = myProp;
- }
- @Override
- public void start() {
- // Initialize the connection to the external client
- }
- @Override
- public void stop () {
- // Disconnect from external client and do any additional cleanup
- // (e.g. releasing resources or nulling-out field values) ..
- }
- @Override
- public Status process() throws EventDeliveryException {
- Status status = null;
- // Start transaction
- Channel ch = getChannel();
- Transaction txn = ch.getTransaction();
- txn.begin();
- try {
- // This try clause includes whatever Channel operations you want todo
- // Receive new data
- Event e = getSomeData();
- // Store the Event into this Source's associated Channel(s)
- getChannelProcessor().processEvent(e)
- txn.commit();
- status = Status.READY;
- } catch (Throwable t) {
- txn.rollback();
- // Log exception, handle individual exceptions as needed
- status = Status.BACKOFF;
- // re-throw all Errors
- if (t instanceof Error) {
- throw (Error)t;
- }
- } finally {
- txn.close();
- }
- return status;
- }
- }
Channel
待讨论
来源: http://www.bubuko.com/infodetail-2046074.html