RPC 是 flume 开发中比较核心的部分.
flume 开发基础可见: https://cloud.tencent.com/developer/article/1195082
RPC 客户端接口
一个 RPC 客户端接口的实现, 包含了支持 Flume 的 RPC 方法. 用户的程序可以简单地调用 Flume SDK 客户端的 append(Event)或者 appendBatch(List<Event>)接口来发送数据, 而不用考虑消息交互的细节. 用户可以通过使用诸如 SimpleEvent 类, 或者使用 EventBuilder 的 静态 helper 方法 withBody(), 便捷地实现直接提供事件接口所需的事件 ARG.
RPC clients - Avro and Thrift
Flume 1.4.0 时, Avro 成为默认的 RPC 协议. NettyAvroRpcClient 和 ThriftRpcClient 实现了 RpcClient 的接口. 客户端需要建立一个包含目的 Flume agent host 和 port 信息的对象. 使用 RpcClient 来发送数据到客户端. 下例展示了如何在程序中使用 flume client sdk api.
- import org.apache.flume.Event;
- import org.apache.flume.EventDeliveryException;
- import org.apache.flume.api.RpcClient;
- import org.apache.flume.api.RpcClientFactory;
- import org.apache.flume.event.EventBuilder;
- import java.nio.charset.Charset;
- public class MyApp {
- public static void main(String[] args) {
- MyRpcClientFacade client = new MyRpcClientFacade();
- // Initialize client with the remote Flume agent's host and port
- client.init("host.example.org", 41414);
- // Send 10 events to the remote Flume agent. That agent should be
- // configured to listen with an AvroSource.
- String sampleData = "Hello Flume!";
- for (int i = 0; i <10; i++) {
- client.sendDataToFlume(sampleData);
- }
- client.cleanUp();
- }
- }
- class MyRpcClientFacade {
- private RpcClient client;
- private String hostname;
- private int port;
- public void init(String hostname, int port) {
- // Setup the RPC connection
- this.hostname = hostname;
- this.port = port;
- this.client = RpcClientFactory.getDefaultInstance(hostname, port);
- // Use the following method to create a thrift client (instead of the above line):
- // this.client = RpcClientFactory.getThriftInstance(hostname, port);
- }
- public void sendDataToFlume(String data) {
- // Create a Flume Event object that encapsulates the sample data
- Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
- // Send the event
- try {
- client.append(event);
- } catch (EventDeliveryException e) {
- // clean up and recreate the client
- client.close();
- client = null;
- client = RpcClientFactory.getDefaultInstance(hostname, port);
- // Use the following method to create a thrift client (instead of the above line):
- // this.client = RpcClientFactory.getThriftInstance(hostname, port);
- }
- }
- public void cleanUp() {
- // Close the RPC connection
- client.close();
- }
- }
远端的 flume agent 需要 AvroSource 在监听相关端口(或者是 ThriftSource). 下面是一个正在等待 MyApp 连接的 flume agent 示例配置文件.
- a1.channels = c1
- a1.sources = r1
- a1.sinks = k1
- a1.channels.c1.type = memory
- a1.sources.r1.channels = c1
- a1.sources.r1.type = avro
- # For using a thrift source set the following instead of the above line. # a1.source.r1.type = thrift
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 41414
- a1.sinks.k1.channel = c1
- a1.sinks.k1.type = logger
更灵活一点, Flume client 实现 ((NettyAvroRpcClient and ThriftRpcClient) ) 可以如下配置:
- client.type = default (for avro) or thrift (for thrift)
- hosts = h1 # default client accepts only 1 host
- # (additional hosts will be ignored)
- hosts.h1 = host1.example.org:41414
- # host and port must both be specified
- # (neither has a default)
- batch-size = 100 # Must be>=1 (default: 100)
- connect-timeout = 20000 # Must be>=1000 (default: 20000)
- request-timeout = 20000 # Must be>=1000 (default: 20000)
- Secure RPC client - Thrift
Flume 1.6.0 时, Thrift source 和 sink 支持基于 kerberos 的认证. 客户端需要使用 SecureRpcClientFactory 的 getThriftInstance 方法来实现 SecureThriftRpcClient. 当你使用 SecureRpcClientFactory 时, kerberos 认证模块需要放在 classpath 的 flume-ng-auth 路径下. 客户端的主体和密钥表通过 properties 以参数形式传入. 同时目的服务端的 Thrift source 也需要使用这样处理. 用户的数据程序可以参考如下例子来使用 SecureRpcClientFactory:
- import org.apache.flume.Event;
- import org.apache.flume.EventDeliveryException;
- import org.apache.flume.event.EventBuilder;
- import org.apache.flume.api.SecureRpcClientFactory;
- import org.apache.flume.api.RpcClientConfigurationConstants;
- import org.apache.flume.api.RpcClient;
- import java.nio.charset.Charset;
- import java.util.Properties;
- public class MyApp {
- public static void main(String[] args) {
- MySecureRpcClientFacade client = new MySecureRpcClientFacade();
- // Initialize client with the remote Flume agent's host, port
- Properties props = new Properties();
- props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift");
- props.setProperty("hosts", "h1");
- props.setProperty("hosts.h1", "client.example.org"+":"+ String.valueOf(41414));
- // Initialize client with the kerberos authentication related properties
- props.setProperty("kerberos", "true");
- props.setProperty("client-principal", "flumeclient/client.example.org@EXAMPLE.ORG");
- props.setProperty("client-keytab", "/tmp/flumeclient.keytab");
- props.setProperty("server-principal", "flume/server.example.org@EXAMPLE.ORG");
- client.init(props);
- // Send 10 events to the remote Flume agent. That agent should be
- // configured to listen with an AvroSource.
- String sampleData = "Hello Flume!";
- for (int i = 0; i <10; i++) {
- client.sendDataToFlume(sampleData);
- }
- client.cleanUp();
- }
- }
- class MySecureRpcClientFacade {
- private RpcClient client;
- private Properties properties;
- public void init(Properties properties) {
- // Setup the RPC connection
- this.properties = properties;
- // Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory
- this.client = SecureRpcClientFactory.getThriftInstance(properties);
- }
- public void sendDataToFlume(String data) {
- // Create a Flume Event object that encapsulates the sample data
- Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
- // Send the event
- try {
- client.append(event);
- } catch (EventDeliveryException e) {
- // clean up and recreate the client
- client.close();
- client = null;
- client = SecureRpcClientFactory.getThriftInstance(properties);
- }
- }
- public void cleanUp() {
- // Close the RPC connection
- client.close();
- }
- }
远端的 ThriftSource 需要以 kerberos 模式启动. 下面这个示例 Flume agent 配置文件用于等待 MyApp 的连接:
- a1.channels = c1
- a1.sources = r1
- a1.sinks = k1
- a1.channels.c1.type = memory
- a1.sources.r1.channels = c1
- a1.sources.r1.type = thrift
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 41414
- a1.sources.r1.kerberos = true
- a1.sources.r1.agent-principal = flume/server.example.org@EXAMPLE.ORG
- a1.sources.r1.agent-keytab = /tmp/flume.keytab
- a1.sinks.k1.channel = c1
- a1.sinks.k1.type = logger
- failover client
这个 class 包含一个默认的 Avro RPC 客户端, 来提供客户端的故障切换处理能力. 它使用了一个以空格分隔的 list(<host>:<port > 代表 flume agent)作为一个故障切换组. 目前 failoverRPC 客户端还不支持 thrift. 如果与选中的 host agent 通信出现错误, 那么 failover client 会自动的选取列表中下一个 host. 例如:
- // Setup properties for the failover
- Properties props = new Properties();
- props.put("client.type", "default_failover");
- // List of hosts (space-separated list of user-chosen host aliases)
- props.put("hosts", "h1 h2 h3");
- // host/port pair for each host alias
- String host1 = "host1.example.org:41414";
- String host2 = "host2.example.org:41414";
- String host3 = "host3.example.org:41414";
- props.put("hosts.h1", host1);
- props.put("hosts.h2", host2);
- props.put("hosts.h3", host3);
- // create the client with failover properties
- RpcClient client = RpcClientFactory.getInstance(props);
FailoverRpcClient 可以以下列参数更灵活的配置:
- client.type = default_failover
- hosts = h1 h2 h3 # at least one is required, but 2 or
- # more makes better sense
- hosts.h1 = host1.example.org:41414
- hosts.h2 = host2.example.org:41414
- hosts.h3 = host3.example.org:41414
- max-attempts = 3 # Must be>=0 (default: number of hosts
- # specified, 3 in this case). A '0'
- # value doesn't make much sense because
- # it will just cause an append call to
- # immmediately fail. A '1' value means
- # that the failover client will try only
- # once to send the Event, and if it
- # fails then there will be no failover
- # to a second client, so this value
- # causes the failover client to
- # degenerate into just a default client.
- # It makes sense to set this value to at
- # least the number of hosts that you
- # specified.
- batch-size = 100 # Must be>=1 (default: 100)
- connect-timeout = 20000 # Must be>=1000 (default: 20000)
- request-timeout = 20000 # Must be>=1000 (default: 20000)
负载均衡的 RPC client
Flume Client SDK 同样支持 RPC 的负载均衡. 它使用了一个以空格分隔的 list(<host>:<port > 代表 flume agent)作为一个负载均衡组. 同时支持随机和轮询两种负载均衡策略. 你同样可以通过 implement loadBalancingRpcClient$HostSelector 接口在你自定义的 class 中实现选取算法. 这种情况下, 你需要在 host-selector 属性中填上你自定义类的 FQCN(Full Qualified Class Name). 目前负载均衡的 RPC client 同样没有支持 thrift.
如果启用了 backoff, 则客户端将暂时将失败的主机列入黑名单, 从而在给定的 timeout 时间内将它们排除为故障转移的 host. 当 timeout 时间到了, 如果 host 仍然没有响应, 那么这被认为是顺序故障, 并且 timeout 时间会以指数方式增加, 以避免在无响应的主机上长时间等待时卡住.
可以通过设置 maxBackoff(以毫秒为单位)来配置最大 backoff 时间. maxBackoff 默认值为 30 秒(在 OrderSelector 类中指定, 它是两个负载平衡策略的超类). 退避超时将随着连续故障呈指数级增长, 直至最大可能的退避超时(最大可能的退避限制为 65536 秒(约 18.2 小时)).
- // Setup properties for the load balancing
- Properties props = new Properties();
- props.put("client.type", "default_loadbalance");
- // List of hosts (space-separated list of user-chosen host aliases)
- props.put("hosts", "h1 h2 h3");
- // host/port pair for each host alias
- String host1 = "host1.example.org:41414";
- String host2 = "host2.example.org:41414";
- String host3 = "host3.example.org:41414";
- props.put("hosts.h1", host1);
- props.put("hosts.h2", host2);
- props.put("hosts.h3", host3);
- props.put("host-selector", "random"); // For random host selection
- // props.put("host-selector", "round_robin"); // For round-robin host
- //// selection
- props.put("backoff", "true"); // Disabled by default.
- props.put("maxBackoff", "10000"); // Defaults 0, which effectively
- // becomes 30000 ms
- // Create the client with load balancing properties
- RpcClient client = RpcClientFactory.getInstance(props);
LoadBalancingRpcClient 可以以下列参数更灵活的配置:
- client.type = default_loadbalance
- hosts = h1 h2 h3 # At least 2 hosts are required
- hosts.h1 = host1.example.org:41414
- hosts.h2 = host2.example.org:41414
- hosts.h3 = host3.example.org:41414
- backoff = false # Specifies whether the client should
- # back-off from (i.e. temporarily
- # blacklist) a failed host
- # (default: false).
- maxBackoff = 0 # Max timeout in millis that a will
- # remain inactive due to a previous
- # failure with that host (default: 0,
- # which effectively becomes 30000)
- host-selector = round_robin # The host selection strategy used
- # when load-balancing among hosts
- # (default: round_robin).
- # Other values are include "random"
- # or the FQCN of a custom class
- # that implements
- # LoadBalancingRpcClient$HostSelector
- batch-size = 100 # Must be>=1 (default: 100)
- connect-timeout = 20000 # Must be>=1000 (default: 20000)
- request-timeout = 20000 # Must be>=1000 (default: 20000)
嵌入式的 agent
Flume 有一个嵌入式式的 api, 允许用户在他们的应用程序中嵌入. 当不使用全部的 source,sink,channel 时, agent 就会很轻量. 具体而言, 使用的 source 是一个特殊的嵌入式的 source, 事件通过 EmbeddedAgent 对象的 put,putall 方法来发送数据到 source.Avro 是唯一支持的 sink, 而 channel 只允许是 File 和 Memory Channel. 嵌入式的 agent 同样支持 Interceptors.
注意: 嵌入式 agent 依赖 hadoop-core.jar.
嵌入式 agent 的配置类似于完整 agent 的配置. 以下是一份详尽的可选配置列表:
加粗的为必选项.
属性名 | 默认 | 描述 |
---|---|---|
source.type | embedded | 只能选 embedded source. |
channel.type | - | memory 或者 file |
channel.* | - | 详见 MemoryChannel 和 FileChannel 的用户指引 |
sinks | - | sink 名的列表 |
sink.type | - | 必须为 avro |
sink.* | - | 参考 AvroSink 的用户指引 |
processor.type | - | failover or load_balance |
processor.* | - | 参考 failover or load_balance 的用户指引 |
source.interceptors | - | 空格分隔的 interceptors 列表 |
source.interceptors.* | - | 每个独立 source.interceptors 配置 |
使用案例:
- Map<String, String> properties = new HashMap<String, String>();
- properties.put("channel.type", "memory");
- properties.put("channel.capacity", "200");
- properties.put("sinks", "sink1 sink2");
- properties.put("sink1.type", "avro");
- properties.put("sink2.type", "avro");
- properties.put("sink1.hostname", "collector1.apache.org");
- properties.put("sink1.port", "5564");
- properties.put("sink2.hostname", "collector2.apache.org");
- properties.put("sink2.port", "5565");
- properties.put("processor.type", "load_balance");
- properties.put("source.interceptors", "i1");
- properties.put("source.interceptors.i1.type", "static");
- properties.put("source.interceptors.i1.key", "key1");
- properties.put("source.interceptors.i1.value", "value1");
- EmbeddedAgent agent = new EmbeddedAgent("myagent");
- agent.configure(properties);
- agent.start();
- List<Event> events = Lists.newArrayList();
- events.add(event);
- events.add(event);
- events.add(event);
- events.add(event);
- agent.putAll(events);
- ...
- agent.stop();
来源: https://www.qcloud.com/developer/article/1195083