简介
同 Hive Hook 一样, Presto 也支持自定义实现 Event Listener, 用于侦听 Presto 引擎执行查询时发生的事件, 并作出相应的处理. 我们可以利用该功能实现诸如自定义日志记录, 调试和性能分析插件, 帮助我们更好的运维 Presto 集群. 但是不同于 Hive Hook 的是, 在 Presto 集群中, 一次只能有一个 Event Listener 处于活动状态.
Event Listener 作为 Plugin 监听以下事件:
- Query Creation(查询建立相关信息)
- Query completion (success or failure)(查询执行相关信息, 包含成功查询的细节信息, 失败查询的错误码等信息)
- Split completion (success or failure)(split 执行信息, 同理包含成功和失败的细节信息)
了解 Hook 及 Listener 模式的朋友对于其步骤应该很清楚了, 我们只需要:
实现 Presto Event Listener 和 EventListenerFactory 接口.
正确的打包我们的 jar.
部署, 放到 Presto 指定目录, 修改配置文件.
接口
实现 EventListener, 该类是我们的核心逻辑所在, 供包含上面所说的三个事件:
- public interface EventListener
- {
- //query 创建的详细信息
- default void queryCreated(QueryCreatedEvent queryCreatedEvent)
- {
- }
- //query 执行的详细信息
- default void queryCompleted(QueryCompletedEvent queryCompletedEvent)
- {
- }
- //split 执行的详细信息
- default void splitCompleted(SplitCompletedEvent splitCompletedEvent)
- {
- }
- }
实现 EventListenerFactory 创建我们自己实现的 EventListener
实现 Plugin 接口, 实现 getEventListenerFactories()方法, 获取我们自己实现的 EventListenerFactory
添加配置信息, 为 etc/event-listener.properties. 其中 event-listener.name 为必备属性, 其他属性为我们 plugin 所需要的信息.
示例
由于集群运维的需要, 先需要将用户的查询历史, 查询花费的时间等信息进行统计, 以便于后续对各个业务的查询进行优先级分级和评分, 方便后续 Presto 集群稳定性易用性的维护. 这里给出一个简单的将这些信息存储到 MySQL 数据库的样例.
- Maven Pom
- <dependency>
- <groupId>com.Facebook.presto</groupId>
- <artifactId>presto-spi</artifactId>
- <version>0.220</version>
- <scope>compile</scope>
- </dependency>
- QueryEventListenerFactory
- public class QueryEventListenerFactory implements EventListenerFactory {
- @Override
- public String getName() {
- return "query-event-listener";
- }
- @Override
- public EventListener create(Map<String, String> config) {
- if (!config.containsKey("jdbc.uri")) {
- throw new RuntimeException("/etc/event-listener.properties file missing jdbc.uri");
- }
- if (!config.containsKey("jdbc.user")) {
- throw new RuntimeException("/etc/event-listener.properties file missing jdbc.user");
- }
- if (!config.containsKey("jdbc.pwd")) {
- throw new RuntimeException("/etc/event-listener.properties file missing jdbc.pwd");
- }
- return new QueryEventListener(config);
- }
- }
- QueryEventPlugin
- public class QueryEventPlugin implements Plugin {
- @Override
- public Iterable<EventListenerFactory> getEventListenerFactories() {
- EventListenerFactory listenerFactory = new QueryEventListenerFactory();
- return Arrays.asList(listenerFactory);
- }
- }
- QueryEventListener
- public class QueryEventListener implements EventListener {
- private Map<String, String> config;
- private Connection connection;
- public QueryEventListener(Map<String, String> config) {
- this.config = new HashMap<>();
- this.config.putAll(config);
- init();
- }
- private void init() {
- try {
- if (connection == null || !connection.isValid(10)) {
- Class.forName("com.mysql.jdbc.Driver");
- connection = DriverManager
- .getConnection(config.get("jdbc.uri"), config.get("jdbc.user"), config.get("jdbc.pwd"));
- }
- } catch (SQLException | ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void queryCreated(QueryCreatedEvent queryCreatedEvent) {
- }
- @Override
- public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {
- String queryId = queryCompletedEvent.getMetadata().getQueryId();
- String querySql = queryCompletedEvent.getMetadata().getQuery();
- String queryState = queryCompletedEvent.getMetadata().getQueryState();
- String queryUser = queryCompletedEvent.getContext().getUser();
- long createTime = queryCompletedEvent.getCreateTime().toEpochMilli();
- long endTime = queryCompletedEvent.getEndTime().toEpochMilli();
- long startTime = queryCompletedEvent.getExecutionStartTime().toEpochMilli();
- //insert into query execution table
- long analysisTime = queryCompletedEvent.getStatistics().getAnalysisTime().orElse(Duration.ZERO)
- .toMillis();
- long cpuTime = queryCompletedEvent.getStatistics().getCpuTime().toMillis();
- long queuedTime = queryCompletedEvent.getStatistics().getQueuedTime().toMillis();
- long wallTime = queryCompletedEvent.getStatistics().getWallTime().toMillis();
- int completedSplits = queryCompletedEvent.getStatistics().getCompletedSplits();
- double cumulativeMemory = queryCompletedEvent.getStatistics().getCumulativeMemory();
- long outputBytes = queryCompletedEvent.getStatistics().getOutputBytes();
- long outputRows = queryCompletedEvent.getStatistics().getOutputRows();
- long totalBytes = queryCompletedEvent.getStatistics().getTotalBytes();
- long totalRows = queryCompletedEvent.getStatistics().getTotalRows();
- long writtenBytes = queryCompletedEvent.getStatistics().getWrittenBytes();
- long writtenRows = queryCompletedEvent.getStatistics().getWrittenRows();
- //insert into query info table
- queryCompletedEvent.getFailureInfo().ifPresent(queryFailureInfo -> {
- int code = queryFailureInfo.getErrorCode().getCode();
- String name = queryFailureInfo.getErrorCode().getName();
- String failureType = queryFailureInfo.getFailureType().orElse("").toUpperCase();
- String failureHost = queryFailureInfo.getFailureHost().orElse("").toUpperCase();
- String failureMessage = queryFailureInfo.getFailureMessage().orElse("").toUpperCase();
- String failureTask = queryFailureInfo.getFailureTask().orElse("").toUpperCase();
- String failuresJson = queryFailureInfo.getFailuresJson();
- // insert into failed query table
- });
- }
- @Override
- public void splitCompleted(SplitCompletedEvent splitCompletedEvent) {
- long createTime = splitCompletedEvent.getCreateTime().toEpochMilli();
- long endTime = splitCompletedEvent.getEndTime().orElse(Instant.MIN).toEpochMilli();
- String payload = splitCompletedEvent.getPayload();
- String queryId = splitCompletedEvent.getQueryId();
- String stageId = splitCompletedEvent.getStageId();
- long startTime = splitCompletedEvent.getStartTime().orElse(Instant.MIN).toEpochMilli();
- String taskId = splitCompletedEvent.getTaskId();
- long completedDataSizeBytes = splitCompletedEvent.getStatistics().getCompletedDataSizeBytes();
- long completedPositions = splitCompletedEvent.getStatistics().getCompletedPositions();
- long completedReadTime = splitCompletedEvent.getStatistics().getCompletedReadTime().toMillis();
- long cpuTime = splitCompletedEvent.getStatistics().getCpuTime().toMillis();
- long queuedTime = splitCompletedEvent.getStatistics().getQueuedTime().toMillis();
- long wallTime = splitCompletedEvent.getStatistics().getWallTime().toMillis();
- //insert into stage info table
- }
- }
打包
Presto 使用服务提供者接口 (SPI) 来扩展 Presto.Presto 使用 SPI 加载连接器, 功能, 类型和系统访问控制. SPI 通过元数据文件加载. 我们还需要创建
src/main/resources/META-INF/services/com.Facebook.presto.spi.Plugin
元数据文件. 该文件应包含我们插件的类名如:
com.ji3jin.presto.listener.QueryEventListener
执行 mvn clean install 打包
部署
创建配置文件 etc/event-listener.properties
- event-listener.name=query-event-listener
- jdbc.uri=jdbc:MySQL://localhost:3306/presto_monitor
- jdbc.user=presto
- jdbc.pwd=presto123
在 presto 根目录下创建
query-event-listener
目录, 名称与我们上面 event listener 的 name 一致
将我们的 jar 包和 MySQL connector 的 jar 包拷贝到上面创建的目录
重新启动 Presto 服务即可
好了, 现在你可以执行查询, 然后就可以在 MySQL 中看到你的查询历史和相关时间的统计信息了. 如果你目前的工作对此也有需要, 还等什么, 快动手实现一个吧.
来源: https://www.cnblogs.com/jixin/p/11273156.html