目录
Sharding-Proxy 包结构
Sharding-Proxy 启动流程
Sharding-Proxy 请求接入
MySQL 报文解析器
MySQL 执行器
MySQLComQueryPacketExecutor 执行流程
MySQLComStmtExecuteExecutor 执行流程
Sharding-Proxy 消息处理
在看 Sharding-Proxy 源码之前, 强烈建议先阅读一直官网的两篇文章:
Apache Sharding-Proxy 使用手册
Apache Sharding-Proxy 设计原理 https://mp.weixin.qq.com/s/8I3-hzwFOfqg6NjwJBZR9A
Sharding-Proxy 包结构
sharding-proxy
├── sharding-proxy-backend 负责与底层 MySQL 通信
├── sharding-proxy-Bootstrap 启动 sharding-proxy
├── sharding-proxy-common YAML 配置文件加载...
├── sharding-proxy-frontend 启动 socket, 代理 MySQL/pg
│ ├── sharding-proxy-frontend-core 启动 sokcet
│ ├── sharding-proxy-frontend-MySQL 实现类 MySQLProtocolFrontendEngine
│ ├── sharding-proxy-frontend-PostgreSQL 实现类 PostgreSQLProtocolFrontendEngine
│ └── sharding-proxy-frontend-spi 核心 spi,DatabaseProtocolFrontendEngine
└── sharding-proxy-transport 代理数据库对应的编解码
├── sharding-proxy-transport-core 核心 API,DatabasePacket 和 PacketPayload
├── sharding-proxy-transport-MySQL MySQL 协议编解码
└── sharding-proxy-transport-PostgreSQL pg 协议编解码
总结: Sharding-Proxy 包功能说明
sharding-proxy-Bootstrap: 启动入口, 调用 LogicSchemas 加载配置, ShardingProxy 启动程序, 绑定 socket.
sharding-proxy-frontend-core: 启动 netty,hander 的初始化类为 ServerHandlerInitializer, 编解码对应的 Handler 为 PacketCodec, 业务处理对应的 Handler 为 FrontendChannelInboundHandler. 这两个 Handler 实际的工作都委托给了 DatabaseProtocolFrontendEngine.
sharding-proxy-frontend-spi: 核心 spi,DatabaseProtocolFrontendEngine 包含编解码, 执行器. DatabaseProtocolFrontendEngine 目前有 MySQL 和 PG 两个实现.
sharding-proxy-frontend-MySQL: 实现类 MySQLProtocolFrontendEngine
sharding-proxy-transport-MySQL:MySQL 报文解析, 主要接口为 MySQLPacket.
Sharding-Proxy 启动流程
Sharding-Proxy 启动流程
总结: Sharding-Proxy 启动流程最核心的是通过 ServerHandlerInitializer 加载了 PacketCodec(编解码) 和 FrontendChannelInboundHandler(业务处理器) 两个处理器. 这两个处理的具体工作都委托给了 DatabaseProtocolFrontendEngine 完成, 有 MySQL 和 PostgreSQL 两个实现.
Bootstrap
启动入口位于 sharding-proxy-Bootstrap 工程中. Bootstrap 提供了有注册中心和无注册中心两种启动方式, 以无注册中心的启动方式为例:
- private static void startWithoutRegistryCenter(
- final Map<String, YamlProxyRuleConfiguration> ruleConfigs,
- final YamlAuthenticationConfiguration authentication,
- final Properties prop, final int port) throws SQLException {
- Authentication authenticationConfiguration = getAuthentication(authentication);
- ShardingProxyContext.getInstance().init(authenticationConfiguration, prop);
- // 加载配置规则
- LogicSchemas.getInstance().init(
- getDataSourceParameterMap(ruleConfigs),
- getRuleConfiguration(ruleConfigs));
- initOpenTracing();
- // 启动 sharding-proxy
- ShardingProxy.getInstance().start(port);
- }
Bootstrap 启动最核心的步骤是 ShardingProxy 启动代理. Sharding-Proxy 会启动一个 Netty 服务器, 默认端口为 3307.
ShardingProxy
程序启动入口位于 sharding-proxy-frontend-core 工程中. Netty 服务器通过 ServerHandlerInitializer 加载对应的 Handler, 包括 PacketCodec(编解码) 和 FrontendChannelInboundHandler(业务处理器) 两个处理器.
- DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine =
- DatabaseProtocolFrontendEngineFactory.newInstance(
- LogicSchemas.getInstance().getDatabaseType());
- pipeline.addLast(new PacketCodec(databaseProtocolFrontendEngine.getCodecEngine()));
- pipeline.addLast(new FrontendChannelInboundHandler(databaseProtocolFrontendEngine));
总结: 这两个 Handler 的实际工作都是委托给 DatabaseProtocolFrontendEngine 完成, 目前有 MySQL 和 PG 两个实现.
DatabaseProtocolFrontendEngine
位于 sharding-proxy-frontend-spi 工程中. DatabaseProtocolFrontendEngine 是一个 SPI 接口, 目前提供了 MySQL 和 PostgreSQL 两种实现, 分别位于 sharding-proxy-frontend-MySQL 和 sharding-proxy-frontend-PostgreSQL 工程中.
- public interface DatabaseProtocolFrontendEngine extends DatabaseTypeAwareSPI {
- FrontendContext getFrontendContext();
- AuthenticationEngine getAuthEngine();
- void release(BackendConnection backendConnection);
- // 编解码器
- DatabasePacketCodecEngine getCodecEngine();
- // SQL 执行引擎
- CommandExecuteEngine getCommandExecuteEngine();
- }
总结: DatabaseProtocolFrontendEngine 方法最重要的两个属性是 DatabasePacketCodecEngine 解码器和 CommandExecuteEngine SQL 执行引擎.
Sharding-Proxy 请求接入
Sharding-Proxy 消息处理
总结: Sharding-Proxy 接收到消息后处理过程有如下几步:
PacketCodec: 将从 client 接收的请求按长度解码成 ByteBuf, 实际由解码器 DatabasePacketCodecEngine#decode 完成.
FrontendChannelInboundHandler: 将请求 ByteBuf 交给 CommandExecutorTask 处理.
CommandExecutorTask : 消息处理核心类.
第一步: 调用 DatabasePacketCodecEngine#createPacketPayload 将消息包装成 PacketPayload.
第二步: 调用 CommandExecuteEngine.getCommandPacketType 将消息解码成具体的 CommandPacket.
第三步: 调用 CommandExecutor#getCommandExecutor 方法, 根据消息类别获取不同的执行器.
第四步: 调用 CommandExecutor#execute 执行任务.
第五步: 将处理后的结果伪装成 MySQL 服务器的协议, 返回给 client.
FrontendChannelInboundHandler
消息处理的入口 FrontendChannelInboundHandler 位于 sharding-proxy-frontend-core 工程中. Sharding-Proxy 接收到请求后, 先由 PacketCodec 按长度解码, 然后由 FrontendChannelInboundHandler 进行处理, 代码如下:
- @Override
- public void channelRead(final ChannelHandlerContext context, final Object message) {
- if (!authorized) {
- authorized = auth(context, (ByteBuf) message);
- return;
- }
- // CommandExecutorSelector 返回 ExecutorService, 任务执行 CommandExecutorTask
- CommandExecutorSelector.getExecutor(
- databaseProtocolFrontendEngine.getFrontendContext()
- .isOccupyThreadForPerConnection(),
- backendConnection.isSupportHint(),
- backendConnection.getTransactionType(),
- context.channel().id())
- .execute(new CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, message));
- }
- CommandExecutorTask
- // 核心 API, 处理编解码, sql 执行
- private final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine;
- // 管理后台 MySQL 连接
- private final BackendConnection backendConnection;
- // 按长度解码后的 client 请求信息, ByteBuf
- private final Object message;
- @Override
- public void run() {
- // 按包长度解码成 ByteBuf,client
- PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine()
- .createPacketPayload((ByteBuf) message));
- // 将 ByteBuf 解析成具体的命令, 并转发到 backendConnection, 响应 client
- isNeedFlush = executeCommand(context, payload, backendConnection);
- }
- private boolean executeCommand(final ChannelHandlerContext context,
- final PacketPayload payload, final BackendConnection backendConnection)
- throws SQLException {
- // 执行引擎
- CommandExecuteEngine commandExecuteEngine = databaseProtocolFrontendEngine
- .getCommandExecuteEngine();
- // MySQL 命令类型
- CommandPacketType type = commandExecuteEngine.getCommandPacketType(payload);
- // 解码
- CommandPacket commandPacket = commandExecuteEngine.getCommandPacket(
- payload, type, backendConnection);
- // 执行器
- CommandExecutor commandExecutor = commandExecuteEngine.getCommandExecutor(
- type, commandPacket, backendConnection);
- // 向真实 MySQL 服务器发送 sql, 并返回结果 responsePackets
- Collection<DatabasePacket> responsePackets = commandExecutor.execute();
- if (responsePackets.isEmpty()) {
- return false;
- }
- // 将返回结果伪装成 MySQL 协议, 返回给客户端 client
- for (DatabasePacket each : responsePackets) {
- context.write(each);
- }
- if (commandExecutor instanceof QueryCommandExecutor) {
- commandExecuteEngine.writeQueryData(context, backendConnection,
- (QueryCommandExecutor) commandExecutor, responsePackets.size());
- return true;
- }
- return databaseProtocolFrontendEngine.getFrontendContext()
- .isFlushForPerCommandPacket();
- }
总结: CommandExecutorTask 内部很多工作都委托给了 CommandExecuteEngine 完成, CommandExecuteEngine 也有 MySQL 和 PostgreSQL 两个实现. CommandExecuteEngine 主要是对具体的协议解码 CommandPacket, 并获取具体的执行器 CommandExecutor.
MySQL 报文解析器
位于 sharding-proxy-transport-MySQL 工程中.
MySQLPacketCodecEngine: 实现 DatabasePacketCodecEngine 接口, 根据包长度解析报文, 并将解析的 ByteBuf 包装成 MySQLPacketPayload.
MySQLPacketPayload: 实现 PacketPayload 接口, 本质是对 ByteBuf 的包装, 提供对 ByteBuf 的 read/write 字段.
MySQLCommandPacketFactory: 将 MySQLPacketPayload 解析成具体协议的报文 MySQLPacket.
MySQLPacket: 实现了 DatabasePacket 接口. 将 ByteBuf 解析成具体的命令, 主要分两大类:
一是 Statement, 代表实现是 MySQLComQueryPacket,
二是 PrepareStatement, 代表实现是 MySQLComStmtExecutePacket.
MySQL 协议解析
MySQL 执行器
位于 sharding-proxy-frontend-MySQL 工程中. Sharding-Sphere 将客户端发送的 SQL 命令解析后, 转发给底层的 MySQL 服务器, 核心的接口类如下:
MySQL 执行器
CommandExecutor: 核心接口, SQL 执行器. Sharding-Proxy 解析 client 的命令, 转发给 MySQL 服务器, 并将 MySQL 服务器返回的结果按 MySQL 协议包伪装后响应给 client.
MySQLCommandExecutorFactory: 根据请求的类型不同(eg: COM_QUERY, COM_STMT_EXECUTE), 初始化不同的执行器, 主要分为两类:
一是 Statement, 不使用预解析功能, 代表实现是 MySQLComQueryPacketExecutor, 最终调用 TextProtocolBackendHandler 执行.
二是 PrepareStatement, 使用预解析功能, 代表实现是 MySQLComStmtExecuteExecutor, 最终调用 DatabaseCommunicationEngine 执行.
注意: 并不是所有的 client 请求都转发到 MySQL 服务器上了. 如 MySQL 预解析操作分为 prepare,execute,close,reset 四步, 分别对应 MySQLComStmtPrepareExecutor,MySQLComStmtExecuteExecutor,MySQLComStmtCloseExecutor,MySQLComStmtResetExecutor 四个类. 除了 execute 会将请求转发给底层 MySQL 服务器外, 其它的解析是在代理层 (sharding-proxy) 完成的, 将 SQLParseEngine 解析后结果缓存在 MySQLBinaryStatementRegistry 实例中, 这样能避免重复解析 SQL 提高性能.
MySQLComQueryPacketExecutor 执行流程
- public MySQLComQueryPacketExecutor(final MySQLComQueryPacket comQueryPacket,
- final BackendConnection backendConnection) {
- // 包含 SQL 和 connection,textProtocolBackendHandler 可以执行 SQL
- textProtocolBackendHandler = TextProtocolBackendHandlerFactory.newInstance(
- DatabaseTypes.getActualDatabaseType("MySQL"),
- comQueryPacket.getSql(), backendConnection);
- }
- @Override
- public Collection<DatabasePacket> execute() {
- ...
- // 委托给 textProtocolBackendHandler 执行
- BackendResponse backendResponse = textProtocolBackendHandler.execute();
- // 包装返回的结果
- // error
- if (backendResponse instanceof ErrorResponse) {
- return Collections.<DatabasePacket>singletonList(createErrorPacket(
- ((ErrorResponse) backendResponse).getCause()));
- }
- // update
- if (backendResponse instanceof UpdateResponse) {
- return Collections.<DatabasePacket>singletonList(createUpdatePacket(
- (UpdateResponse) backendResponse));
- }
- // query
- isQuery = true;
- return createQueryPackets((QueryResponse) backendResponse);
- }
总结: MySQLComQueryPacketExecutor 总体过程非常清晰, 解析, 转发, 响应.
解析: 按 MySQL 协议解析 client 发送的请求. MySQL 解析的核心逻辑在 sharding-proxy-transport-MySQL 包中, 主要接口类是 MySQLPacketCodecEngine,MySQLPacketPayload,MySQLPacket.
转发: 将解析后的 SQL 转发给 MySQL 服务器, 并返加响应结果. 实际转发委托给了 TextProtocolBackendHandler, 这个类的功能会在后面继续分析.
响应: 将处理后的结果伪装成 MySQL 服务器, 响应客户端. 这个主要是伪装成 MySQL 协议. MySQL 协议参考:
MySQLComStmtExecuteExecutor 执行流程
- public MySQLComStmtExecuteExecutor(
- final MySQLComStmtExecutePacket comStmtExecutePacket,
- final BackendConnection backendConnection) {
- databaseCommunicationEngine = DatabaseCommunicationEngineFactory.getInstance()
- .newBinaryProtocolInstance(backendConnection.getLogicSchema(),
- comStmtExecutePacket.getSql(), comStmtExecutePacket.getParameters(),
- backendConnection);
- }
- @Override
- public Collection<DatabasePacket> execute() {
- // 委托给 databaseCommunicationEngine 执行
- BackendResponse backendResponse = databaseCommunicationEngine.execute();
- // 包装返回的结果, 同 MySQLComQueryPacketExecutor
- ...
- }
总结: 可以看到, 和 MySQLComQueryPacketExecutor 基本类似, 唯一不同的在于 MySQLComQueryPacketExecutor 真正调用 TextProtocolBackendHandler 执行, 而 MySQLComStmtExecuteExecutor 调用 DatabaseCommunicationEngine 执行.
那问题就来了, 为什么会有 TextProtocolBackendHandler 和 DatabaseCommunicationEngine 两个执行器? 它们到底是什么关系呢? TextProtocolBackendHandler 的实现类其实就是调用 DatabaseCommunicationEngine.
MySQL 执行过程
总结: 无论是 MySQLComQueryPacketExecutor 还是 MySQLComStmtExecuteExecutor 最终都是调用 DatabaseCommunicationEngine 执行.
Sharding-Proxy 消息处理
位于 sharding-proxy-backend 工程中.
Sharding-Proxy 消息处理时序图
总结: Sharding-Proxy 消息处理过程和 Sharding-Jdbc 处理过程差不多, 也要经过 SQL 解析, 路由, 改写, 合并这四个核心过程. 前面三个类是 Sharding-Proxy 中的, 后面四个类则是 Sharding-Jdbc 的, 两套逻辑共用一套核心代码.
DatabaseCommunicationEngine
DatabaseCommunicationEngine 是 Sharding-Proxy 内部转发执行器, 负责将请求转发给底层 MySQL 服务器.
DatabaseCommunicationEngine 类图
我们看一下 QueryBackendHandler 的实现类.
- public final class QueryBackendHandler implements TextProtocolBackendHandler {
- @Override
- public BackendResponse execute() {
- ...
- databaseCommunicationEngine = databaseCommunicationEngineFactory
- .newTextProtocolInstance(backendConnection.getLogicSchema(),
- sql, backendConnection);
- return databaseCommunicationEngine.execute();
- }
- }
说明: TextProtocolBackendHandler 是不使用预解析的执行器, 调用 databaseCommunicationEngineFactoy.newTextProtocolInstance, 而使用预解析的 MySQLComStmtExecuteExecutor 内部调用 DatabaseCommunicationEngineFactory.newBinaryProtocolInstance. 我们看一下这两个方法的内部实现.
- public DatabaseCommunicationEngine newTextProtocolInstance(final LogicSchema logicSchema,
- final String sql, final BackendConnection backendConnection) {
- return new JDBCDatabaseCommunicationEngine(logicSchema, sql,
- new JDBCExecuteEngine(backendConnection,
- new StatementExecutorWrapper(logicSchema)));
- }
- public DatabaseCommunicationEngine newBinaryProtocolInstance(
- final LogicSchema logicSchema, final String sql,
- final List<Object> parameters,
- final BackendConnection backendConnection) {
- return new JDBCDatabaseCommunicationEngine(logicSchema, sql,
- new JDBCExecuteEngine(backendConnection,
- new PreparedStatementExecutorWrapper(logicSchema, parameters)));
- }
说明: 在 Sharding-Proxy 中 TextProtocol 代表的是不使用预解析, 而 BinaryProtocol 代表使用预解析. JDBCDatabaseCommunicationEngine 内部直接委托给 JDBCExecuteEngine 完成.
JDBCDatabaseCommunicationEngine
JDBCDatabaseCommunicationEngine 类图
LogicSchema: 配置类解析规则.
JDBCExecuteEngine:SQL 执行器, 向 MySQL 服务器下发请求并获取查询结果.
JDBCBackendDataSource: 内部是一个 Map, 维护了真实服务器的连接池, 可以从中获取 MySQL 服务器的连接.
BackendConnection: 用于管理底层 MySQL 连接, 分为事务和非事务连接, 如果是事务连接, 则在获取连接时调用 connection.setAutoCommit(false) 开启一个事务.
StatementExecutorWrapper: 不使用预解析.
PreparedStatementExecutorWrapper: 使用预解析.
JDBCDatabaseCommunicationEngine 执行过程的代码如下:
- private final String sql;
- private final JDBCExecuteEngine executeEngine;
- @Override
- public BackendResponse execute() {
- try {
- // 1. SQL 路由, 改写
- SQLRouteResult routeResult = executeEngine.getJdbcExecutorWrapper().route(sql);
- return execute(routeResult);
- } catch (final SQLException ex) {
- return new ErrorResponse(ex);
- }
- }
- private BackendResponse execute(final SQLRouteResult routeResult) throws SQLException {
- ...
- // 2. SQL 执行
- response = executeEngine.execute(routeResult);
- if (logicSchema instanceof ShardingSchema) {
- logicSchema.refreshTableMetaData(routeResult.getSqlStatementContext());
- }
- // 4. 结果合并
- return merge(routeResult);
- }
总结: JDBCDatabaseCommunicationEngine 执行 SQL 过程包括: SQL 路由, 改写, 执行, 结果合并, 其中前三步都是委托 JDBCExecuteEngine 完成的.
- JDBCExecuteEngine
- // 管理底层 MySQL 连接
- private final BackendConnection backendConnection;
- // 1根据 SQL 生成执行计划(包括 SQL 解析, 路由, 改写);2生成 Statement;3执行 SQL
- private final JDBCExecutorWrapper jdbcExecutorWrapper;
- // 生成执行计划 RouteUnit -> StatementExecuteUnit
- private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
- // 执行 StatementExecuteUnit
- private final SQLExecuteTemplate sqlExecuteTemplate;
- @Override
- public BackendResponse execute(final SQLRouteResult routeResult) throws SQLException {
- final SQLStatementContext sqlStatementContext = routeResult.getSqlStatementContext();
- boolean isReturnGeneratedKeys = sqlStatementContext.getSqlStatement()
- instanceof InsertStatement;
- boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
- // 执行计划, ProxyJDBCExecutePrepareCallback 用于创建执行计划
- Collection<ShardingExecuteGroup<StatementExecuteUnit>> sqlExecuteGroups =
- sqlExecutePrepareTemplate.getExecuteUnitGroups(
- routeResult.getRouteUnits(),
- new ProxyJDBCExecutePrepareCallback(
- backendConnection, jdbcExecutorWrapper, isReturnGeneratedKeys));
- // 执行 SQL,ProxySQLExecuteCallback 用于执行 SQL
- Collection<ExecuteResponse> executeResponses = sqlExecuteTemplate.executeGroup(
- (Collection) sqlExecuteGroups,
- new ProxySQLExecuteCallback(backendConnection, jdbcExecutorWrapper,
- isExceptionThrown, isReturnGeneratedKeys, true),
- new ProxySQLExecuteCallback(backendConnection, jdbcExecutorWrapper,
- isExceptionThrown, isReturnGeneratedKeys, false));
- ExecuteResponse executeResponse = executeResponses.iterator().next();
- // 组装结果
- return executeResponse instanceof ExecuteQueryResponse
- ? getExecuteQueryResponse(((ExecuteQueryResponse) executeResponse)
- .getQueryHeaders(), executeResponses)
- : new UpdateResponse(executeResponses);
- }
每天用心记录一点点. 内容也许不重要, 但习惯很重要!
来源: http://www.bubuko.com/infodetail-3394728.html