近年来, 基于 hadoop 的 sql 框架层出不穷, presto 也是其中的一员. 从 2012 年发展至今, 依然保持年轻的活力 (版本迭代依然很快),presto 的相关介绍, 我们就不赘述了, 相信看官多对 presto 有或多或少的了解, 详细的一些说明可以看官网(https://prestodb.io) 的说明.
presto 自身功能和思想富有先进性, 虽然由于是内存计算, 稳定性方面还有很大提升空间, 但整体依然在 adhoc 方面有很好的竞争力. 其中在 catalog 加载的方式上来说比较的固化, 官方并没有做出动态的方案出来, 导致在添加 catalog 后必须重启整个集群才可以将新添加的 catalog 数据源添加到 presto 中, 这无疑在实际的生产环境中很不友好. 尤其是在一些中台项目中, 需要动态规划的东西非常多. 这种模式的 catalog 添加方式显然不能满足我们的开发需要.
因此, 在环境的加持下, 对 presto 的加载 catlog 的方式的源码进行了改造, 使其具有热动态添加的功能. 我们采用了外部数据库作为他的 catlog 资源库, 对其进行热加载
(1)添加 restful API 请求接口.
为了使框架本身具有添加 catalog 的功能, 需要使其本身具有 API 访问接口的方式来来对 catalog 的资源进行调整的功能
1. 新增 CatalogResource 类来实现 API 的请求接口
2. 新增 TimiCatalogStoreConfig 类来实现与数据库交互的持久层
3. 新增 TimiCatalogStore 类来替换原本的 catlog 加载类
4. 新增 CatalogInfo 类来实现对 catalog Model 信息的解析
- #1 CatalogResource
- @Path("/presto/catalog")
- public class CatalogResource{
- @GET
- @Path("test")
- public Response test()
- {
- return Response.ok("Hello world").build();
- }
- }
在 ServerMainModule 类中 setup 方法, 最后一行添加 jaxrsBinder(binder).bind(CatalogResource.class); 将添加的请求类添加进来, 然后启动主服务, 并确认所开启的 presto 的请求接口地址, 默认端口是: 8080 请求 http://localhost:8080/presto/catalog/test
返回 "Hello world" 则表示 restful API 接口添加成功.
- #2 TimiCatalogStoreConfig 类中主要实现了读取数据库连接配置, 以及具体执行的 catalog 执行动作, 并使用 jaxrsBinder(binder).bind(TimiCatalogStoreConfig.class); 注入到项目启动的容器中. 并将 Announcer,disabledCatalogs,ConnectorManager 注入到类中. 具体实现
- public class TimiCatalogStoreConfig {
- private final Announcer announcer;
- private static final Logger log = Logger.get(TimiCatalogStoreConfig.class);
- private final Set<String> disabledCatalogs;
- private final ConnectorManager connectorManager;
- public TimiCatalogStoreConfig(Announcer announcer,Set<String> disabledCatalogs,ConnectorManager connectorManager ) {
- this.announcer = announcer;
- this.disabledCatalogs = ImmutableSet.copyOf(disabledCatalogs);
- this.connectorManager = connectorManager;
- }
- }
然后就是实现对 catlog 增删查改动作, 并将操作的结构实现到 ConnectorManager 中,
首先将 Server 中的 CatalogStore 替换成我们自定义实现的 TimiCatalogStore 并注入相关类
- @Inject
- public TimiCatalogStore(ConnectorManager connectorManager, Announcer announcer, StaticCatalogStoreConfig config,TimiCatalogStoreConfig catalogStoreConfig) {
- this(connectorManager,
- announcer,
- config.getCatalogConfigurationDir(),
- firstNonNull(config.getDisabledCatalogs(), ImmutableList.of()),
- catalogStoreConfig
- );
- }
然后实现 loadCatalogs 方法, 首次调用的时候使用 load(); 方法加载 MySQL 中存储的所有 catlog, 然后使用 ScheduledExecutorService 定时的方式从 MySQL 中提取有变化的 catlog 加载到 presto 的 ConnectorManager 中.
- public static void updateConnectorIdAnnouncement(Announcer announcer, CatalogName connectorId)
- {
- //
- // This code was copied from PrestoServer, and is a hack that should be removed when the connectorId property is removed
- //
- // get existing announcement
- ServiceAnnouncement announcement = getPrestoAnnouncement(announcer.getServiceAnnouncements());
- // update connectorIds property
- Map<String, String> properties = new LinkedHashMap<>(announcement.getProperties());
- String property = nullToEmpty(properties.get("connectorIds"));
- Set<String> connectorIds = new LinkedHashSet<>(Splitter.on(',').trimResults().omitEmptyStrings().splitToList(property));
- connectorIds.add(connectorId.toString());
- properties.put("connectorIds", Joiner.on(',').join(connectorIds));
- // update announcement
- announcer.removeServiceAnnouncement(announcement.getId());
- announcer.addServiceAnnouncement(serviceAnnouncement(announcement.getType()).addProperties(properties).build());
- announcer.forceAnnounce();
- }
在这里我们设定的 1 分钟从 MySQL 库充更新一次 catalog 列表
- scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- reload();
- }
- }, 60, 60, TimeUnit.SECONDS);
调用 reload 方法定时读取
- public void reload() {
- try{
- // 获取最新的 catalogs
- Map<String, CatalogInfo> catalogInfos = catalogStoreConfig.load();
- catalogInfos.forEach(
- (key, catalogInfo) -> {
- if (!catalogInfoMap.containsKey(key)) {
- // 相同 --catlog
- try {
- System.out.println("添加数据源"+JSON.toJSONString(catalogInfo));
- // log.info("添加数据源:{}",JSON.toJSONString(catalogInfos.get(key)));
- CatalogName catalogName = loadCatalog(catalogInfo);
- updateConnectorIdAnnouncement(announcer,catalogName);
- } catch (Exception e) {
- e.printStackTrace();
- }
- } else {
- // 不同 catlog
- if (!JSON.toJSONString(catalogInfoMap.get(key)).equals(JSON.toJSONString(catalogInfo))){
- connectorManager.dropConnection(catalogInfo.getCatalogName());
- try {
- System.out.println("添加数据源"+JSON.toJSONString(catalogInfo));
- CatalogName catalogName = loadCatalog(catalogInfo);
- updateConnectorIdAnnouncement(announcer,catalogName);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- );
- catalogInfoMap.putAll(catalogInfos);
- }catch (Exception e){
- e.printStackTrace();
- }
- }
从 MySQL 库中取出来的 catlog 信息和对现有的 catlog 进行对比, 如果是不同的 catlog 就添加到 presto 中, 重复的 catlog 不添加, 删除的 catlog 就从现有的 catlog 管理器中删除, 以此来达到动态添加 catlog 的动作, 不需要重启 presto 服务器.
来源: https://www.cnblogs.com/xiaoxin101/p/13363562.html