本文将介绍 HBase 的客户端连接实现, 并说明如何正确管理 HBase 的连接.
最近在搭建一个 HBase 的可视化管理平台, 搭建完成后发现不管什么查询都很慢, 甚至于使用 API 去 listTable 都要好几秒.
经过一番排查发现, 是每次请求的时候, 都去临时创建了一个 connection, 而创建 connection 非常耗时导致整体的 rt 上升.
因此, 就深入了解了下如何正确管理 HBase 的 connection, 同时, 也在优化过程中有些小细节的总结.
本文基于 hbase 2.0.0 版本的源码, GitHub 上 3.0 版本的源码已经有很大差异了, 但是思想还是差不多的
1.HBase-client 和 HBase 是如何连接的?
这个问题实际上在我之前的文章 深入 HBase 读写 中介绍过.
当 HBase-client 第一次请求读写的时候, 需要三步走:
1)HBase-client 从 zk 中获取保存 meta table 的位置信息, 知道 meta table 保存在了哪个 region server, 然后缓存这个位置信息;
2)HBase-client 会查询这个保存 meta table 的特定的 region server, 查询 meta table 信息, 在 table 中获取自己想要访问的 row key 所在的 region 在哪个 region server 上.
3)客户端直接访问目标 region server, 获取对应的 row
所以, 我们知道 hbase-client 实际上包含三部分连接:
跟 zk 连接, 获取相关元信息
跟 HMaster 连接, 做相关 DDL 操作
直接跟各个 region server 进行连接, 进行增删改查
2.HBase 客户端连接原理
常规写法是这样的
- Connection connection = ConnectionFactory.createConnection(conf);
- try {
- Table table = connection.getTable(TableName.valueOf("tablename"));
- // 插入数据
- Put put = new Put(Bytes.toBytes("row"));
- put.addColumn(Bytes.toBytes("family"), Bytes.toBytes("qualifier"), Bytes.toBytes("value"));
- table.put(put);
- // 单行读取
- Get get = new Get(Bytes.toBytes("row"));
- Result res = table.get(get);
- // 删除一行数据
- Delete delete = new Delete(Bytes.toBytes("row"));
- table.delete(delete);
- }catch (IOException e) {
- //.....
- } finally {
- table.close();
- connection.close();
- }
我们不禁有这样的疑问:
1)HBase 没有连接池吗?
2)connection 表示的是一个连接吗?
3)connection 每个线程都得创建吗? 线程安全吗?
4)table 每个线程都得创建吗? 线程安全吗?
下面一一解答.
首先, Connection 是线程安全的, 而 Table 和 Admin 则不是线程安全的.
因此正确的做法是一个进程 (或服务) 使用一个 Connection 对象, 而在不同的线程中使用单独的 Table 和 Admin 对象.
Connection 持有 RpcClient,RpcClient 管理了一个连接池 poolMap
- protected final PoolMap<ConnectionId, T> connections;
- //....
- this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
通过 AbstractRpcClient 的 getConnection 看到, 连接 T 继承 RpcConnection, 叫做 NettyRpcConnection.
这里顺便通过 getPoolType 和 getPoolSize 看了下线程池的大小和类型.
在枚举类 PoolType 中有三种线程池类型 Reusable, ThreadLocal, RoundRobin, 用户可以用 hbase.client.ipc.pool.type 指定线程池类型, 通过 hbase.client.ipc.pool.size 指定线程池大小(默认是 1).
3. 优化实践
搞清楚上面的原理后, 下面就可以开始优化我们的 HBase 管理平台了.
只需要对每个 HBase 集群的 connection 使用 Map 保存下来, 每次请求的时候拿出对应的 connection 进去相关操作即可. 然后需要注意在系统退出的时候关闭所有的 connection.
上代码:
- public class ConnectionManager {
- private Map<String, Connection> connectionMap = new ConcurrentHashMap<>();
- public Connection getConnection(String resourceId, Configuration configuration) {
- ResourceInfo resourceInfo = ResourceInfoCache.getResourceInfoByCache(resourceId);
- if (resourceInfo == null) {
- throw new IllegalArgumentException("error resourceid:" + resourceId);
- }
- String key = getClusterKey(resourceInfo);
- if (connectionMap.containsKey(key)) {
- return connectionMap.get(key);
- }
- synchronized (this) {
- //DCL 检查
- if (connectionMap.containsKey(key)) {
- return connectionMap.get(key);
- }
- Connection connection = null;
- try {
- connection = ConnectionFactory.createConnection(configuration);
- } catch (IOException e) {
- return null;
- }
- connectionMap.put(key, connection);
- return connection;
- }
- }
- @PreDestroy
- public void doDestroy() {
- for (Map.Entry<String, Connection> entry : connectionMap.entrySet()) {
- Connection connection = entry.getValue();
- if (connection != null) {
- try {
- connection.close();
- } catch (IOException e) {
- //....
- }
- }
- }
- }
- }
这里有几个注意点:
将 ConnectionManager 注册为 bean, 交给 spring 容器管理生命周期, 同时保证单例.
使用 @PreDestroy 保证应用关闭时, 能正确释放所有连接, 避免连接泄漏
connectionMap 使用 ConcurrentHashMap 保证线程安全
DCL 检查, 避免重复创建同一个 connection, 浪费资源; 并且避免重复创建 connection 后, 无法关闭导致连接泄漏.
在需要查询时, 只需要通过 getConnection 获取已经存在的 connection 即可.
当然, 如果是普通的应用使用 HBase-client, 一般只需要对一个 HBase 的集群创建全局唯一的一个 Connection 即可(一般交给 spring 容器管理), 每次请求的时候, 创建对应的 Table 进行 CRUD.
看到这里了, 原创不易, 点个关注, 点个赞吧, 你最好看了~
知识碎片重新梳理, 构建 Java 知识图谱: https://github.com/saigu/JavaKnowledgeGraph (历史文章查阅非常方便)
来源: https://www.cnblogs.com/awan-note/p/12731524.html