在 Storm 集群上开启 DRPC 功能,
基于 Storm 的 1.0.1 版本,
并且执行简单的例子测试.
1.DRPC 概念
DRPC 就是分布式远程过程调用.
Storm 里面引入 DRPC 主要是利用 storm 的实时计算能力,
来并行化 CPU intensive 的计算.
DRPC 的 Storm topology 以函数的参数流作为输入,
而把这些函数调用的返回值作为 topology 的输出流.
DRPC 其实不能算是 Storm 本身的一个特性,
它是通过组合 Storm 的原语 spout,bolt,topology 而成的一种模式 (pattern).
2.DRPC 工作机制
Distributed RPC 是由一个 DPRC Server 协调的,
Storm 自带了一个称作 LinearDRPCTopologyBuilder 的 topology builder,
它把实现 DRPC 的几乎所有步骤都自动化了.
DRPC 服务器协调机制:
接收一个 RPC 请求;
发送请求到 Storm topology;
从 Storm topology 接收结果;
把结果发回给等待的客户端.
从客户端的角度来看一个 DRPC 调用,
跟一个普通的 RPC 调用没有任何区别.
下面是客户端代码展示了如何调用 RPC 的
exclaimation 方法, 方法的参数是 hello:
- DRPCClient client = new DRPCClient("drpc-host", 3772);
- String result = client.execute("exclaimation", "hello");
DRPC 的工作流大致是这样的:
客户端给 DRPC 服务器发送要执行的方法的名字,
以及这个方法的参数.
实现了这个函数的 topology 使用 DRPCSpout 从 DRPC 服务器接收函数调用流.
每个函数调用被 DRPC 服务器标记了一个唯一的 id.
这个 topology 然后计算结果,
在 topology 的最后一个叫做 ReturnResults 的 bolt 会连接到 DRPC 服务器,
并且把这个调用的结果发送给 DRPC 服务器 (通过那个唯一的 id 标识).
DRPC 服务器用那个唯一 id 来跟等待的客户端匹配上,
唤醒这个客户端并且把结果发送给它.
3. 配置 DPRC Server
修改 storm.YAML 文件, 增加 drpc 的配置:
- drpc.servers:
- - "zdh-237"
- drpc.childopts: "-Xmx1024m"
其他参数 drpc.port, drpc.http.port 等使用默认值即可,
参考默认值如下:
- drpc.port:
- 3772
- drpc.worker.threads:
- 64
- drpc.max_buffer_size:
- 1048576
- drpc.queue.size:
- 128
- drpc.invocations.port:
- 3773
- drpc.invocations.threads:
- 64
- drpc.request.timeout.secs:
- 600
- drpc.childopts:
- "-Xmx768m"
- drpc.http.port:
- 3774
- drpc.https.port:
- -1
- drpc.https.keystore.password:
- ""
- drpc.https.keystore.type:
- "JKS"
- drpc.http.creds.plugin:
- backtype.storm.security.auth.DefaultHttpCredentialsPlugin
- drpc.authorizer.acl.filename:
- "drpc-auth-acl.yaml"
- drpc.authorizer.acl.strict:
- false
4. 启动 DPRC Server
使用如下命令启动 DRPC 进程:
storm drpc> drpc.log 2>&1 &
5. 使用本地集群测试
由于命令无入参即没有 topo 名字,
无对外端口无法提供客户端调用,
在 BasicDRPCTopology 中启动后调用本地集群,
仅作为测试场景使用.
进入 Storm 目录, 提交处理 drpc 的 topo:
- cd /home/stormna/apache-storm-1.0.1/examples/storm-starter/
- storm jar storm-starter-topologies-1.0.1.jar org.apache.storm.starter.BasicDRPCTopology
在输出的日志中可以看到如下结果,
结果是单词后面被添加了感叹号!
- 8043 [Thread-26-bolt2-executor[6 6]] INFO o.a.s.l.ThriftAccessLogger - Request ID: 3 access from: principal: operation: result
- Result for "hello": hello!
- 8054 [Thread-26-bolt2-executor[6 6]] INFO o.a.s.l.ThriftAccessLogger - Request ID: 3 access from: principal: operation: result
- Result for "goodbye": goodbye!
6. 使用真实集群测试
基于真实集群的 DRPC 可以提供给外部客户端调用,
先提交处理 drpc 的 topo, 指定 topo 名称为 exclamationDrpc:
- cd /home/stormna/apache-storm-1.0.1/examples/storm-starter
- storm jar storm-starter-topologies-1.0.1.jar org.apache.storm.starter.BasicDRPCTopology exclamationDrpc
7. 客户端调用
在 BasicDRPCTopology 中提供的 drpc 方法名为 exclamation,
效果返回结果是在单词后面被添加的感叹号.
使用下面写客户端代码进行调用测试.
7.1. 适配 storm-core-0.9.6.jar 的客户端代码
注意 exclamation 名称不要拼错,
否则客户端会一直没有响应:
- public class DRPCClientTest096 {
- public static void main(String[] args) throws Exception {
- String drpcHost = "10.43.159.237";
- int drpcPort = 3772;
- DRPCClient client = new DRPCClient(drpcHost, drpcPort);
- String input="hello1";
- String result = client.execute("exclamation",input );
- System.out.println("input:"+input+", result:"+result);
- }
- }
执行 DRPCClientTest096 类中的 main 方法,
调用 drpc 的 exclamation 函数, 传入参数 hello1:
控制台输出结果:
input:hello1, result:hello1!
7.2. 适配 storm-core-1.0.1.jar 的客户端代码
注意调用需要配置 Storm 参数,
和上面的有点区别的.
- public class DRPCClientTest101 {
- public static void main(String[] args) throws Exception {
- String drpcHost = "10.43.159.237";
- int drpcPort = 3772;
- Properties pro = new Properties();
- // InputStream inStream = new FileInputStream("stormclient.conf");
- // 读取 storm-core-1.0.1.jar 里面 的 defaults.YAML 配置文件
- InputStream inStream = ClassLoader.getSystemResourceAsStream("defaults.yaml");
- pro.load(inStream);
- inStream.close();
- // 由于 Properties 加载的值带了引号, 需要重新设置一下, 或者手动去掉前后的引号
- pro.setProperty("storm.thrift.transport", "org.apache.storm.security.auth.SimpleTransportPlugin");
- DRPCClient client = new DRPCClient(pro, drpcHost, drpcPort);
- String input = "hello2";
- String result = client.execute("exclamation", input);
- System.out.println("input:" + input + ", result:" + result);
- }
- }
执行 DRPCClientTest101 类中的 main 方法,
调用 drpc 的 exclamation 函数, 传入参数 hello2:
控制台输出结果:
input:hello2, result:hello2!
8. 参考文章
StormDRPC 概念以及简单例子测试
storm DRPC 问题
Storm 高级原语 (二) - DRPC http://www.aboutyun.com/thread-8708-1-1.html
storm 自带例子详解 (二)--BasicDRPCTopology
来源: http://www.jianshu.com/p/b683fe0a4383