从上一篇对 Hive metastore 表结构的简要分析中, 我再根据数据设计的实体对象, 再进行整个代码结构的总结那么我们先打开 metadata 的目录, 其目录结构:
可以看到, 整个 hivemeta 的目录包含 metastore(客户端与服务端调用逻辑)events(事件目录包含 table 生命周期中的检查权限认证等 listener 实现)hooks(这里的 hooks 仅包含了 jdo connection 的相关接口)parser(对于表达树的解析)spec(partition 的相关代理类)tools(jdo execute 相关方法) 及 txn 及 model, 下来我们从整个 metadata 分逐一进行代码分析及注释:
没有把包打开, 很多类? 是不是感觉害怕很想死? 我也想死, 咱们继续一开始, 我们可能觉得一团乱麻烦躁, 这是啥玩意儿啊这冷静下来, 我们从 Hive 这个大类开始看, 因为它是 metastore 元数据调用的入口整个生命周期分析流程为: HiveMetaStoreClient 客户端的创建及加载 HiveMetaStore 服务端的创建及加载 createTabledropTableAlterTablecreatePartitiondropPartitionalterPartition 当然, 这只是完整 metadata 的一小部分
1HiveMetaStoreClient 客户端的创建及加载
那么我们从 Hive 这个类一点点开始看:
- private HiveConf conf = null;
- private IMetaStoreClient metaStoreClient;
- private UserGroupInformation owner;
- // metastore calls timing information
- private final Map<String, Long> metaCallTimeMap = new HashMap<String, Long>();
- private static ThreadLocal<Hive> hiveDB = new ThreadLocal<Hive>() {
- @Override
- protected synchronized Hive initialValue() {
- return null;
- }
- @Override
- public synchronized void remove() {
- if (this.get() != null) {
- this.get().close();
- }
- super.remove();
- }
- };
这里声明的有 hiveConf 对象 metaStoreClient 操作用户组 userGroupInfomation 以及调用时间 Map, 这里存成一个 map, 用来记录每一个动作的运行时长同时维护了一个本地线程 hiveDB, 如果 db 为空的情况下, 会重新创建一个 Hive 对象, 代码如下:
- public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException {
- Hive db = hiveDB.get();
- if (db == null || needsRefresh || !db.isCurrentUserOwner()) {
- if (db != null) {
- LOG.debug("Creating new db. db =" + db + ", needsRefresh =" + needsRefresh +
- ", db.isCurrentUserOwner =" + db.isCurrentUserOwner());
- }
- closeCurrent();
- c.set("fs.scheme.class", "dfs");
- Hive newdb = new Hive(c);
- hiveDB.set(newdb);
- return newdb;
- }
- db.conf = c;
- return db;
- }
随后我们会发现, 在创建 Hive 对象时, 便已经将 function 进行注册, 什么是 function 呢, 通过上次的表结构分析, 可以理解为所有 udf 等 jar 包的元数据存储代码如下:
- 1 // register all permanent functions. need improvement
- 2 static {
- 3 try {
- 4 reloadFunctions();
- 5 } catch (Exception e) {
- 6 LOG.warn("Failed to access metastore. This class should not accessed in runtime.",e);
- 7 }
- 8 }
- 9
- 10 public static void reloadFunctions() throws HiveException {
- // 获取 Hive 对象, 用于后续方法的调用
- 11 Hive db = Hive.get();
- // 通过遍历每一个 dbName
- 12 for (String dbName : db.getAllDatabases()) {
- // 通过 dbName 查询挂在该 db 下的所有 function 的信息
- 13 for (String functionName : db.getFunctions(dbName, "*")) {
- 14 Function function = db.getFunction(dbName, functionName);
- 15 try {
- // 这里的 register 便是将查询到的 function 的数据注册到 Registry 类中的一个 Map<String,FunctionInfo > 中, 以便计算引擎在调用时, 不必再次查询数据库
- 16 FunctionRegistry.registerPermanentFunction(
- 17 FunctionUtils.qualifyFunctionName(functionName, dbName), function.getClassName(),
- 18 false, FunctionTask.toFunctionResource(function.getResourceUris()));
- 19 } catch (Exception e) {
- 20 LOG.warn("Failed to register persistent function" +
- 21 functionName + ":" + function.getClassName() + ". Ignore and continue.");
- 22 }
- 23 }
- 24 }
- 25 }
调用 getMSC() 方法, 进行 metadataClient 客户端的创建, 代码如下:
- 1 private IMetaStoreClient createMetaStoreClient() throws MetaException {
- 2
- // 这里实现接口 HiveMetaHookLoader
- 3 HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() {
- 4 @Override
- 5 public HiveMetaHook getHook(
- 6 org.apache.hadoop.hive.metastore.api.Table tbl)
- 7 throws MetaException {
- 8
- 9 try {
- 10 if (tbl == null) {
- 11 return null;
- 12 }
- // 根据 tble 的 kv 属性加载不同 storage 的实例, 比如 hbaseredis 等等拓展存储, 作为外部表进行存储
- 13 HiveStorageHandler storageHandler =
- 14 HiveUtils.getStorageHandler(conf,
- 15 tbl.getParameters().get(META_TABLE_STORAGE));
- 16 if (storageHandler == null) {
- 17 return null;
- 18 }
- 19 return storageHandler.getMetaHook();
- 20 } catch (HiveException ex) {
- 21 LOG.error(StringUtils.stringifyException(ex));
- 22 throw new MetaException(
- 23 "Failed to load storage handler:" + ex.getMessage());
- 24 }
- 25 }
- 26 };
- 27 return RetryingMetaStoreClient.getProxy(conf, hookLoader, metaCallTimeMap,
- 28 SessionHiveMetaStoreClient.class.getName());
- 29 }
2HiveMetaStore 服务端的创建及加载
在 HiveMetaStoreClient 初始化时, 会初始化 HiveMetaStore 客户端, 代码如下:
- public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader)
- throws MetaException {
- this.hookLoader = hookLoader;
- if (conf == null) {
- conf = new HiveConf(HiveMetaStoreClient.class);
- }
- this.conf = conf;
- filterHook = loadFilterHooks();
- // 根据 hive-site.xml 中的 hive.metastore.uris 配置, 如果配置该参数, 则认为是远程连接, 否则为本地连接
- String msUri = conf.getVar(HiveConf.ConfVars.METASTOREURIS);
- localMetaStore = HiveConfUtil.isEmbeddedMetaStore(msUri);
- if (localMetaStore) {
- // 本地连接直接连接 HiveMetaStore
- client = HiveMetaStore.newRetryingHMSHandler("hive client", conf, true);
- isConnected = true;
- snapshotActiveConf();
- return;
- }
- // 获取配置中的重试次数及 timeout 时间
- retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES);
- retryDelaySeconds = conf.getTimeVar(
- ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
- // 拼接 metastore uri
- if (conf.getVar(HiveConf.ConfVars.METASTOREURIS) != null) {
- String metastoreUrisString[] = conf.getVar(
- HiveConf.ConfVars.METASTOREURIS).split(",");
- metastoreUris = new URI[metastoreUrisString.length];
- try {
- int i = 0;
- for (String s : metastoreUrisString) {
- URI tmpUri = new URI(s);
- if (tmpUri.getScheme() == null) {
- throw new IllegalArgumentException("URI:" + s
- + "does not have a scheme");
- }
- metastoreUris[i++] = tmpUri;
- }
- } catch (IllegalArgumentException e) {
- throw (e);
- } catch (Exception e) {
- MetaStoreUtils.logAndThrowMetaException(e);
- }
- } else {
- LOG.error("NOT getting uris from conf");
- throw new MetaException("MetaStoreURIs not found in conf file");
- }
调用 open 方法创建连接
- open();
- }
从上面代码中可以看出, 如果我们是远程连接, 需要配置 hive-site.xml 中的 hive.metastore.uri, 是不是很熟悉? 加入你的 client 与 server 不在同一台机器, 就需要配置进行远程连接那么我们继续往下面看, 创建连接的 open 方法:
- 1 private void open() throws MetaException {
- 2 isConnected = false;
- 3 TTransportException tte = null;
- // 是否使用 Sasl
- 4 boolean useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
- //If true, the metastore Thrift interface will use TFramedTransport. When false (default) a standard TTransport is used.
- 5 boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
- //If true, the metastore Thrift interface will use TCompactProtocol. When false (default) TBinaryProtocol will be used 具体他们之间的区别我们后续再讨论
- 6 boolean useCompactProtocol = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_COMPACT_PROTOCOL);
- // 获取 socket timeout 时间
- 7 int clientSocketTimeout = (int) conf.getTimeVar(
- 8 ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
- 9
- 10 for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
- 11 for (URI store : metastoreUris) {
- 12 LOG.info("Trying to connect to metastore with URI" + store);
- 13 try {
- 14 transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout);
- 15 if (useSasl) {
- 16 // Wrap thrift connection with SASL for secure connection.
- 17 try {
- // 创建 HadoopThriftAuthBridge client
- 18 HadoopThriftAuthBridge.Client authBridge =
- 19 ShimLoader.getHadoopThriftAuthBridge().createClient();
- 20 // 权限认证相关
- 21 // check if we should use delegation tokens to authenticate
- 22 // the call below gets hold of the tokens if they are set up by hadoop
- 23 // this should happen on the map/reduce tasks if the client added the
- 24 // tokens into hadoop's credential store in the front end during job
- 25 // submission.
- 26 String tokenSig = conf.get("hive.metastore.token.signature");
- 27 // tokenSig could be null
- 28 tokenStrForm = Utils.getTokenStrForm(tokenSig);
- 29 if(tokenStrForm != null) {
- 30 // authenticate using delegation tokens via the "DIGEST" mechanism
- 31 transport = authBridge.createClientTransport(null, store.getHost(),
- 32 "DIGEST", tokenStrForm, transport,
- 33 MetaStoreUtils.getMetaStoreSaslProperties(conf));
- 34 } else {
- 35 String principalConfig =
- 36 conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL);
- 37 transport = authBridge.createClientTransport(
- 38 principalConfig, store.getHost(), "KERBEROS", null,
- 39 transport, MetaStoreUtils.getMetaStoreSaslProperties(conf));
- 40 }
- 41 } catch (IOException ioe) {
- 42 LOG.error("Couldn't create client transport", ioe);
- 43 throw new MetaException(ioe.toString());
- 44 }
- 45 } else if (useFramedTransport) {
- 46 transport = new TFramedTransport(transport);
- 47 }
- 48 final TProtocol protocol;
- // 后续详细说明两者的区别 (因为俺还没看, 哈哈)
- 49 if (useCompactProtocol) {
- 50 protocol = new TCompactProtocol(transport);
- 51 } else {
- 52 protocol = new TBinaryProtocol(transport);
- 53 }
- // 创建 ThriftHiveMetastore client
- 54 client = new ThriftHiveMetastore.Client(protocol);
- 55 try {
- 56 transport.open();
- 57 isConnected = true;
- 58 } catch (TTransportException e) {
- 59 tte = e;
- 60 if (LOG.isDebugEnabled()) {
- 61 LOG.warn("Failed to connect to the MetaStore Server...", e);
- 62 } else {
- 63 // Don't print full exception trace if DEBUG is not on.
- 64 LOG.warn("Failed to connect to the MetaStore Server...");
- 65 }
- 66 }
- 67 // 用户组及用户的加载
- 68 if (isConnected && !useSasl && conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)){
- 69 // Call set_ugi, only in unsecure mode.
- 70 try {
- 71 UserGroupInformation ugi = Utils.getUGI();
- 72 client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames()));
- 73 } catch (LoginException e) {
- 74 LOG.warn("Failed to do login. set_ugi() is not successful," +
- 75 "Continuing without it.", e);
- 76 } catch (IOException e) {
- 77 LOG.warn("Failed to find ugi of client set_ugi() is not successful," +
- 78 "Continuing without it.", e);
- 79 } catch (TException e) {
- 80 LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server."
- 81 + "Continuing without it.", e);
- 82 }
- 83 }
- 84 } catch (MetaException e) {
- 85 LOG.error("Unable to connect to metastore with URI" + store
- 86 + "in attempt" + attempt, e);
- 87 }
- 88 if (isConnected) {
- 89 break;
- 90 }
- 91 }
- 92 // Wait before launching the next round of connection retries.
- 93 if (!isConnected && retryDelaySeconds > 0) {
- 94 try {
- 95 LOG.info("Waiting" + retryDelaySeconds + "seconds before next connection attempt.");
- 96 Thread.sleep(retryDelaySeconds * 1000);
- 97 } catch (InterruptedException ignore) {}
- 98 }
- 99 }
- 100
- 101 if (!isConnected) {
- 102 throw new MetaException("Could not connect to meta store using any of the URIs provided." +
- 103 "Most recent failure:" + StringUtils.stringifyException(tte));
- 104 }
- 105
- 106 snapshotActiveConf();
- 107
- 108 LOG.info("Connected to metastore.");
- 109 }
本篇先对对 protocol 的原理放置一边从代码中可以看出 HiveMetaStore 服务端是通过 ThriftHiveMetaStore 创建, 它本是一个 class 类, 但其中定义了接口 IfaceAsyncIface, 这样做的好处是利于继承实现那么下来, 我们看一下 HMSHandler 的初始化如果是在本地调用的过程中, 直接调用 newRetryingHMSHandler, 便会直接进行 HMSHandler 的初始化代码如下:
- public HMSHandler(String name, HiveConf conf, boolean init) throws MetaException {
- super(name);
- hiveConf = conf;
- if (init) {
- init();
- }
- }
下俩我们继续看它的 init 方法:
- public void init() throws MetaException {
- // 获取与数据交互的实现类 className, 该类为 objectStore, 是 RawStore 的实现, 负责 JDO 与数据库的交互
- rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
- // 加载 Listeners, 来自 hive.metastore.init.hooks, 可自行实现并加载
- initListeners = MetaStoreUtils.getMetaStoreListeners(
- MetaStoreInitListener.class, hiveConf,
- hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS));
- for (MetaStoreInitListener singleInitListener: initListeners) {
- MetaStoreInitContext context = new MetaStoreInitContext();
- singleInitListener.onInit(context);
- }
- // 初始化 alter 的实现类
- String alterHandlerName = hiveConf.get("hive.metastore.alter.impl",
- HiveAlterHandler.class.getName());
- alterHandler = (AlterHandler) ReflectionUtils.newInstance(MetaStoreUtils.getClass(
- alterHandlerName), hiveConf);
- // 初始化 warehouse
- wh = new Warehouse(hiveConf);
- // 创建默认 db 以及用户, 同时加载 currentUrl
- synchronized (HMSHandler.class) {
- if (currentUrl == null || !currentUrl.equals(MetaStoreInit.getConnectionURL(hiveConf))) {
- createDefaultDB();
- createDefaultRoles();
- addAdminUsers();
- currentUrl = MetaStoreInit.getConnectionURL(hiveConf);
- }
- }
- // 计数信息的初始化
- if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) {
- try {
- Metrics.init();
- } catch (Exception e) {
- // log exception, but ignore inability to start
- LOG.error("error in Metrics init:" + e.getClass().getName() + " "
- + e.getMessage(), e);
- }
- }
- //Listener 的 PreListener 的初始化
- preListeners = MetaStoreUtils.getMetaStoreListeners(MetaStorePreEventListener.class,
- hiveConf,
- hiveConf.getVar(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS));
- listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class, hiveConf,
- hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS));
- listeners.add(new SessionPropertiesListener(hiveConf));
- endFunctionListeners = MetaStoreUtils.getMetaStoreListeners(
- MetaStoreEndFunctionListener.class, hiveConf,
- hiveConf.getVar(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS));
- // 针对 partitionName 的正则校验, 可自行设置, 根据 hive.metastore.partition.name.whitelist.pattern 进行设置
- String partitionValidationRegex =
- hiveConf.getVar(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN);
- if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) {
- partitionValidationPattern = Pattern.compile(partitionValidationRegex);
- } else {
- partitionValidationPattern = null;
- }
- long cleanFreq = hiveConf.getTimeVar(ConfVars.METASTORE_EVENT_CLEAN_FREQ, TimeUnit.MILLISECONDS);
- if (cleanFreq > 0) {
- // In default config, there is no timer.
- Timer cleaner = new Timer("Metastore Events Cleaner Thread", true);
- cleaner.schedule(new EventCleanerTask(this), cleanFreq, cleanFreq);
- }
- }
它初始化了与数据库交互的 rawStore 的实现类物理操作的 warehouse 以及 Event 与 Listener 从而通过接口调用相关 meta 生命周期方法进行表的操作
3createTable
从 createTable 方法开始上代码:
- 1 public void createTable(String tableName, List < String > columns, List < String > partCols, 2 Class < ?extends InputFormat > fileInputFormat, 3 Class < ?>fileOutputFormat, int bucketCount, List < String > bucketCols, 4 Map < String, String > parameters) throws HiveException {
- 5
- if (columns == null) {
- 6
- throw new HiveException("columns not specified for table" + tableName);
- 7
- }
- 8 9 Table tbl = newTable(tableName); //SD 表属性, 设置该表的 input 及 output class 名, 在计算引擎计算时, 拉取相应的 ClassName 通过反射进行 input 及 output 类的加载
- 10 tbl.setInputFormatClass(fileInputFormat.getName());
- 11 tbl.setOutputFormatClass(fileOutputFormat.getName());
- 12 // 封装 FileSchema 对象, 该为每个 column 的名称及字段类型, 并加入到 sd 对象的的 column 属性中
- 13
- for (String col: columns) {
- 14 FieldSchema field = new FieldSchema(col, STRING_TYPE_NAME, "default");
- 15 tbl.getCols().add(field);
- 16
- }
- 17 // 如果在创建表时, 设置了分区信息, 比如 dt 字段为该分区则进行分区信息的记录, 最终写入 Partition 表中
- 18
- if (partCols != null) {
- 19
- for (String partCol: partCols) {
- 20 FieldSchema part = new FieldSchema();
- 21 part.setName(partCol);
- 22 part.setType(STRING_TYPE_NAME); // default partition key
- 23 tbl.getPartCols().add(part);
- 24
- }
- 25
- } // 设置序列化的方式
- 26 tbl.setSerializationLib(LazySimpleSerDe.class.getName()); // 设置分桶信息
- 27 tbl.setNumBuckets(bucketCount);
- 28 tbl.setBucketCols(bucketCols); // 设置 table 额外添加的 kv 信息
- 29
- if (parameters != null) {
- 30 tbl.setParamters(parameters);
- 31
- }
- 32 createTable(tbl);
- 33
- }
从代码中可以看到, Hive 构造了一个 Table 的对象, 该对象可以当做是一个 model, 包含了几乎所有以 Tbls 表为主表的所有以 table_id 为的外键表属性 (具体可参考 hive metastore 表结构), 封装完毕后在进行 createTable 的调用, 接下来的调用如下:
- public void createTable(Table tbl, boolean ifNotExists) throws HiveException {
- try {
- // 这里再次获取 SessionState 中的 CurrentDataBase 进行 setDbName(安全)
- if (tbl.getDbName() == null || "".equals(tbl.getDbName().trim())) {
- tbl.setDbName(SessionState.get().getCurrentDatabase());
- }
- // 这里主要对每一个 column 属性进行校验, 比如是否有非法字符等等
- if (tbl.getCols().size() == 0 || tbl.getSd().getColsSize() == 0) {
- tbl.setFields(MetaStoreUtils.getFieldsFromDeserializer(tbl.getTableName(),
- tbl.getDeserializer()));
- }
- // 该方法对 table 属性中的 inputoutput 以及 column 属性的校验
- tbl.checkValidity();
- if (tbl.getParameters() != null) {
- tbl.getParameters().remove(hive_metastoreConstants.DDL_TIME);
- }
- org.apache.hadoop.hive.metastore.api.Table tTbl = tbl.getTTable();
- // 这里开始进行权限认证, 牵扯到的便是我们再 hive 中配置的 hive.security.authorization.createtable.user.grantshive.security.authorization.createtable.group.grants
hive.security.authorization.createtable.role.grants 配置参数, 来自于 hive 自己封装的 用户角色组的概念
- PrincipalPrivilegeSet principalPrivs = new PrincipalPrivilegeSet();
- SessionState ss = SessionState.get();
- if (ss != null) {
- CreateTableAutomaticGrant grants = ss.getCreateTableGrants();
- if (grants != null) {
- principalPrivs.setUserPrivileges(grants.getUserGrants());
- principalPrivs.setGroupPrivileges(grants.getGroupGrants());
- principalPrivs.setRolePrivileges(grants.getRoleGrants());
- tTbl.setPrivileges(principalPrivs);
- }
- }
- // 通过客户端链接服务端进行 table 的创建
- getMSC().createTable(tTbl);
- } catch (AlreadyExistsException e) {
- if (!ifNotExists) {
- throw new HiveException(e);
- }
- } catch (Exception e) {
- throw new HiveException(e);
- }
- }
那么下来, 我们来看一下受到调用的 HiveMetaClient 中 createTable 方法, 代码如下:
- 1 public void createTable(Table tbl, EnvironmentContext envContext) throws AlreadyExistsException,
- 2 InvalidObjectException, MetaException, NoSuchObjectException, TException {
- // 这里获取 HiveMeetaHook 对象, 针对不同的存储引擎进行创建前的加载及验证
- 3 HiveMetaHook hook = getHook(tbl);
- 4 if (hook != null) {
- 5 hook.preCreateTable(tbl);
- 6 }
- 7 boolean success = false;
- 8 try {
- // 随即调用 HiveMetaStore 进行服务端与数据库的创建交互
- 10 create_table_with_environment_context(tbl, envContext);
- 11 if (hook != null) {
- 12 hook.commitCreateTable(tbl);
- 13 }
- 14 success = true;
- 15 } finally {
如果创建失败的话, 进行回滚操作
- if (!success && (hook != null)) {
- hook.rollbackCreateTable(tbl);
- }
- }
- }
这里简要说下 Hook 的作用, HiveMetaHook 为接口, 接口方法包括 preCreaterollbackCreateTablepreDropTable 等等操作, 它的实现为不同存储类型的预创建加载及验证, 以及失败回滚等动作代码如下:
- public interface HiveMetaHook {
- /**
- * Called before a new table definition is added to the metastore
- * during CREATE TABLE.
- *
- * @param table new table definition
- */
- public void preCreateTable(Table table)
- throws MetaException;
- /**
- * Called after failure adding a new table definition to the metastore
- * during CREATE TABLE.
- *
- * @param table new table definition
- */
- public void rollbackCreateTable(Table table)
- throws MetaException;
- public void preDropTale(Table table)
- throws MetaException;
- ...............................
随后, 我们再看一下 HiveMetaStore 服务端的 createTable 方法, 如下:
- 1 private void create_table_core(final RawStore ms, final Table tbl,
- 2 final EnvironmentContext envContext)
- 3 throws AlreadyExistsException, MetaException,
- 4 InvalidObjectException, NoSuchObjectException {
- 5 // 名称正则校验, 校验是否含有非法字符
- 6 if (!MetaStoreUtils.validateName(tbl.getTableName())) {
- 7 throw new InvalidObjectException(tbl.getTableName()
- 8 + "is not a valid object name");
- 9 }
- // 改端代码属于校验代码, 对于 column 的名称及 column type 类型 j 及 partitionKey 的名称校验
- 10 String validate = MetaStoreUtils.validateTblColumns(tbl.getSd().getCols());
- 11 if (validate != null) {
- 12 throw new InvalidObjectException("Invalid column" + validate);
- 13 }
- 14 if (tbl.getPartitionKeys() != null) {
- 15 validate = MetaStoreUtils.validateTblColumns(tbl.getPartitionKeys());
- 16 if (validate != null) {
- 17 throw new InvalidObjectException("Invalid partition column" + validate);
- 18 }
- 19 }
- 20 SkewedInfo skew = tbl.getSd().getSkewedInfo();
- 21 if (skew != null) {
- 22 validate = MetaStoreUtils.validateSkewedColNames(skew.getSkewedColNames());
- 23 if (validate != null) {
- 24 throw new InvalidObjectException("Invalid skew column" + validate);
- 25 }
- 26 validate = MetaStoreUtils.validateSkewedColNamesSubsetCol(
- 27 skew.getSkewedColNames(), tbl.getSd().getCols());
- 28 if (validate != null) {
- 29 throw new InvalidObjectException("Invalid skew column" + validate);
- 30 }
- 31 }
- 32
- 33 Path tblPath = null;
- 34 boolean success = false, madeDir = false;
- 35 try {
- // 创建前的事件调用, metastore 已实现的 listner 事件包含 DummyPreListenerAuthorizationPreEventListenerAlternateFailurePreListener 以及 MetaDataExportListener
- // 这些 Listener 是干嘛的呢? 详细解释由分析 meta 设计模式时, 详细说明
- 36 firePreEvent(new PreCreateTableEvent(tbl, this));
- 37
- // 打开事务
- 38 ms.openTransaction();
- 39
- // 如果 db 不存在的情况下, 则抛异常
- 40 Database db = ms.getDatabase(tbl.getDbName());
- 41 if (db == null) {
- 42 throw new NoSuchObjectException("The database" + tbl.getDbName() + "does not exist");
- 43 }
- 44
- 45 // 校验该 db 下, table 是否存在
- 46 if (is_table_exists(ms, tbl.getDbName(), tbl.getTableName())) {
- 47 throw new AlreadyExistsException("Table" + tbl.getTableName()
- 48 + "already exists");
- 49 }
- 50 // 如果该表不为视图表, 则组装完整的 tbleParth ->fs.getUri().getScheme()+fs.getUri().getAuthority()+path.toUri().getPath())
- 51 if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
- 52 if (tbl.getSd().getLocation() == null
- 53 || tbl.getSd().getLocation().isEmpty()) {
- 54 tblPath = wh.getTablePath(
- 55 ms.getDatabase(tbl.getDbName()), tbl.getTableName());
- 56 } else {
- // 如果该表不是内部表同时 tbl 的 kv 中 storage_handler 为空时, 则只是警告
- 57 if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) {
- 58 LOG.warn("Location:" + tbl.getSd().getLocation()
- 59 + "specified for non-external table:" + tbl.getTableName());
- 60 }
- 61 tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
- 62 }
- // 将拼接完的 tblPath set 到 sd 的 location 中
- 63 tbl.getSd().setLocation(tblPath.toString());
- 64 }
- 65 // 创建 table 的路径
- 66 if (tblPath != null) {
- 67 if (!wh.isDir(tblPath)) {
- 68 if (!wh.mkdirs(tblPath, true)) {
- 69 throw new MetaException(tblPath
- 70 + "is not a directory or unable to create one");
- 71 }
- 72 madeDir = true;
- 73 }
- 74 }
- // hive.stats.autogather 配置判断
- 75 if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER) &&
- 76 !MetaStoreUtils.isView(tbl)) {
- 77 if (tbl.getPartitionKeysSize() == 0) { // Unpartitioned table
- 78 MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, madeDir);
- 79 } else { // Partitioned table with no partitions.
- 80 MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, true);
- 81 }
- 82 }
- 83
- 84 // set create time
- 85 long time = System.currentTimeMillis() / 1000;
- 86 tbl.setCreateTime((int) time);
- 87 if (tbl.getParameters() == null ||
- 88 tbl.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
- 89 tbl.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
- 90 }
执行 createTable 数据库操作
- 91 ms.createTable(tbl);
- 92 success = ms.commitTransaction();
- 93
- 94 } finally {
- 95 if (!success) {
- 96 ms.rollbackTransaction();
- // 如果由于某些原因没有创建, 则进行已创建表路径的删除
- 97 if (madeDir) {
- 98 wh.deleteDir(tblPath, true);
- 99 }
- 100 }
- // 进行 create 完成时的 listener 类发送 比如 noftify 通知
- 101 for (MetaStoreEventListener listener : listeners) {
- 102 CreateTableEvent createTableEvent =
- 103 new CreateTableEvent(tbl, success, this);
- 104 createTableEvent.setEnvironmentContext(envContext);
- 105 listener.onCreateTable(createTableEvent);
- 106 }
- 107 }
- 108 }
这里的 listener 后续会详细说明, 那么我们继续垂直往下看, 这里的 ms.createTable 方法 ms 便是 RawStore 接口对象, 这个接口对象包含了所有生命周期的统一方法调用, 部分代码如下:
- public abstract Database getDatabase(String name) throws NoSuchObjectException;
- public abstract boolean dropDatabase(String dbname) throws NoSuchObjectException,
- MetaException;
- public abstract boolean alterDatabase(String dbname, Database db) throws NoSuchObjectException,
- MetaException;
- public abstract List < String > getDatabases(String pattern) throws MetaException;
- public abstract List < String > getAllDatabases() throws MetaException;
- public abstract boolean createType(Type type);
- public abstract Type getType(String typeName);
- public abstract boolean dropType(String typeName);
- public abstract void createTable(Table tbl) throws InvalidObjectException,
- MetaException;
- public abstract boolean dropTable(String dbName, String tableName) throws MetaException,
- NoSuchObjectException,
- InvalidObjectException,
- InvalidInputException;
- public abstract Table getTable(String dbName, String tableName) throws MetaException;..................
那么下来我们来看一下具体怎么实现的, 首先 hive metastore 会通过调用 getMS() 方法, 获取本地线程中的 RawStore 的实现, 代码如下:
- 1 public RawStore getMS() throws MetaException {
- // 获取本地线程中已存在的 RawStore
- 2 RawStore ms = threadLocalMS.get();
- // 如果不存在, 则创建该对象的实现, 并加入到本地线程中
- 3 if (ms == null) {
- 4 ms = newRawStore();
- 5 ms.verifySchema();
- 6 threadLocalMS.set(ms);
- 7 ms = threadLocalMS.get();
- 8 }
- 9 return ms;
- 10 }
看到这里, 是不是很想看看 newRawStore 它干嘛啦? 那么我们继续:
- public static RawStore getProxy(HiveConf hiveConf, Configuration conf, String rawStoreClassName,
- int id) throws MetaException {
- // 通过反射, 创建 baseClass, 随后再进行该实现对象的创建
- Class<? extends RawStore> baseClass = (Class<? extends RawStore>) MetaStoreUtils.getClass(
- rawStoreClassName);
- RawStoreProxy handler = new RawStoreProxy(hiveConf, conf, baseClass, id);
- // Look for interfaces on both the class and all base classes.
- return (RawStore) Proxy.newProxyInstance(RawStoreProxy.class.getClassLoader(),
- getAllInterfaces(baseClass), handler);
- }
那么问题来了, rawstoreClassName 从哪里来呢? 它是在 HiveMetaStore 进行初始化时加载的, 来源于 HiveConf 中的 METASTORE_RAW_STORE_IMPL, 配置参数, 也就是 RawStore 的实现类 ObjectStore 好了, 既然 RawStore 的实现类已经创建, 那么我们继续深入 ObjectStore, 代码如下:
- 1@Override 2 public void createTable(Table tbl) throws InvalidObjectException,
- MetaException {
- 3 boolean commited = false;
- 4
- try { // 创建事务
- 5 openTransaction(); // 这里再次进行 db table 的校验, 代码不再贴出来, 具体为什么又要做一次校验, 还需要深入思考
6 MTable mtbl = convertToMTable(tbl); 这里的 pm 为 ObjectStore 创建时, init 的 JDO PersistenceManage 对象这里便是提交 Table 对象的地方, 具体可研究下 JDO module 对象与数据库的交互 7 pm.makePersistent(mtbl); // 封装权限用户角色组对象并写入
- 8 PrincipalPrivilegeSet principalPrivs = tbl.getPrivileges();
- 9 List < Object > toPersistPrivObjs = new ArrayList < Object > ();
- 10
- if (principalPrivs != null) {
- 11 int now = (int)(System.currentTimeMillis() / 1000);
- 12 13 Map < String,
- List < PrivilegeGrantInfo >> userPrivs = principalPrivs.getUserPrivileges();
- 14 putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, userPrivs, PrincipalType.USER);
- 15 16 Map < String,
- List < PrivilegeGrantInfo >> groupPrivs = principalPrivs.getGroupPrivileges();
- 17 putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, groupPrivs, PrincipalType.GROUP);
- 18 19 Map < String,
- List < PrivilegeGrantInfo >> rolePrivs = principalPrivs.getRolePrivileges();
- 20 putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, rolePrivs, PrincipalType.ROLE);
- 21
- }
- 22 pm.makePersistentAll(toPersistPrivObjs);
- 23 commited = commitTransaction();
- 24
- } finally { // 如果失败则回滚
- 25
- if (!commited) {
- 26 rollbackTransaction();
- 27
- }
- 28
- }
- 29
- }
- 4dropTable
二话不说上从 Hive 类中上代码:
- 1 public void dropTable(String tableName, boolean ifPurge) throws HiveException {
- // 这里 Hive 将 dbName 与 TableName 合并成一个数组
- 2 String[] names = Utilities.getDbTableName(tableName);
- 3 dropTable(names[0], names[1], true, true, ifPurge);
- 4 }
为什么要进行这样的处理呢, 其实是因为 drop table 的时候 我们的 sql 语句会是 drop table dbName.tableName 或者是 drop table tableName, 这里进行 tableName 和 DbName 的组装, 如果为 drop table tableName, 则获取当前 session 中的 dbName, 代码如下:
- 1 public static String[] getDbTableName(String dbtable) throws SemanticException {
- // 获取当前 Session 中的 DbName
- 2 return getDbTableName(SessionState.get().getCurrentDatabase(), dbtable);
- 3 }
- 4
- 5 public static String[] getDbTableName(String defaultDb, String dbtable) throws SemanticException {
- 6 if (dbtable == null) {
- 7 return new String[2];
- 8 }
- 9 String[] names = dbtable.split("\\.");
- 10 switch (names.length) {
- 11 case 2:
- 12 return names;
- // 如果长度为 1, 则重新组装
- 13 case 1:
- 14 return new String [] {defaultDb, dbtable};
- 15 default:
- 16 throw new SemanticException(ErrorMsg.INVALID_TABLE_NAME, dbtable);
- 17 }
- 18 }
随后通过 getMSC() 调用 HiveMetaStoreClient 中的 dropTable, 代码如下:
- 1 public void dropTable(String dbname, String name, boolean deleteData,
- 2 boolean ignoreUnknownTab, EnvironmentContext envContext) throws MetaException, TException,
- 3 NoSuchObjectException, UnsupportedOperationException {
- 4 Table tbl;
- 5 try {
- // 通过 dbName 与 tableName 获取正个 Table 对象, 也就是通过 dbName 与 TableName 获取该 Table 存储的所有元数据
- 6 tbl = getTable(dbname, name);
- 7 } catch (NoSuchObjectException e) {
- 8 if (!ignoreUnknownTab) {
- 9 throw e;
- 10 }
- 11 return;
- 12 }
- // 根据 table type 来判断是否为 IndexTable, 如果为索引表则不允许删除
- 13 if (isIndexTable(tbl)) {
- 14 throw new UnsupportedOperationException("Cannot drop index tables");
- 15 }
- // 这里的 getHook 与 create 时 getHook 一致, 获取对应 table 存储的 hook
- 16 HiveMetaHook hook = getHook(tbl);
- 17 if (hook != null) {
- 18 hook.preDropTable(tbl);
- 19 }
- 20 boolean success = false;
- 21 try {
调用 HiveMetaStore 服务端的 dropTable 方法
- drop_table_with_environment_context(dbname, name, deleteData, envContext);
- if (hook != null) {
- hook.commitDropTable(tbl, deleteData);
- }
- success=true;
- } catch (NoSuchObjectException e) {
- if (!ignoreUnknownTab) {
- throw e;
- }
- } finally {
- if (!success && (hook != null)) {
- hook.rollbackDropTable(tbl);
- }
- }
- }
下面我们重点看下服务端 HiveMetaStore 干了些什么, 代码如下:
- 1 private boolean drop_table_core(final RawStore ms, final String dbname, final String name,
- 2 final boolean deleteData, final EnvironmentContext envContext,
- 3 final String indexName) throws NoSuchObjectException,
- 4 MetaException, IOException, InvalidObjectException, InvalidInputException {
- 5 boolean success = false;
- 6 boolean isExternal = false;
- 7 Path tblPath = null;
- 8 List<Path> partPaths = null;
- 9 Table tbl = null;
- 10 boolean ifPurge = false;
- 11 try {
- 12 ms.openTransaction();
- 13 // 获取正个 Table 的对象属性
- 14 tbl = get_table_core(dbname, name);
- 15 if (tbl == null) {
- 16 throw new NoSuchObjectException(name + "doesn't exist");
- 17 }
- // 如果 sd 数据为空, 则认为该表数据损坏
- 18 if (tbl.getSd() == null) {
- 19 throw new MetaException("Table metadata is corrupted");
- 20 }
- 21 ifPurge = isMustPurge(envContext, tbl);
- 22
- 23 firePreEvent(new PreDropTableEvent(tbl, deleteData, this));
- // 判断如果该表存在索引, 则需要先删除该表的索引
- 25 boolean isIndexTable = isIndexTable(tbl);
- 26 if (indexName == null && isIndexTable) {
- 27 throw new RuntimeException(
- 28 "The table" + name + "is an index table. Please do drop index instead.");
- 29 }
- // 如果不是索引表, 则删除索引元数据
- 31 if (!isIndexTable) {
- 32 try {
- 33 List<Index> indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE);
- 34 while (indexes != null && indexes.size() > 0) {
- 35 for (Index idx : indexes) {
- 36 this.drop_index_by_name(dbname, name, idx.getIndexName(), true);
- 37 }
- 38 indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE);
- 39 }
- 40 } catch (TException e) {
- 41 throw new MetaException(e.getMessage());
- 42 }
- 43 }
- // 判断是否为外部表
- 44 isExternal = isExternal(tbl);
- 45 if (tbl.getSd().getLocation() != null) {
- 46 tblPath = new Path(tbl.getSd().getLocation());
- 47 if (!wh.isWritable(tblPath.getParent())) {
- 48 String target = indexName == null ? "Table" : "Index table";
- 49 throw new MetaException(target + "metadata not deleted since" +
- 50 tblPath.getParent() + "is not writable by" +
- 51 hiveConf.getUser());
- 52 }
- 53 }
- 54
- 56 checkTrashPurgeCombination(tblPath, dbname + "." + name, ifPurge);
- 57 // 获取所有 partition 的 location path 这里有个奇怪的地方, 为什么不将 Table 对象直接传入, 而是又在该方法中重新 getTable, 同时校验上级目录的读写权限
- 58 partPaths = dropPartitionsAndGetLocations(ms, dbname, name, tblPath,
- 59 tbl.getPartitionKeys(), deleteData && !isExternal);
- 60 // 调用 ObjectStore 进行 meta 数据的删除
- 61 if (!ms.dropTable(dbname, name)) {
- 62 String tableName = dbname + "." + name;
- 63 throw new MetaException(indexName == null ? "Unable to drop table" + tableName:
- 64 "Unable to drop index table" + tableName + "for index" + indexName);
- 65 }
- 66 success = ms.commitTransaction();
- 67 } finally {
- 68 if (!success) {
- 69 ms.rollbackTransaction();
- 70 } else if (deleteData && !isExternal) {
- // 删除物理 partition
- 73 deletePartitionData(partPaths, ifPurge);
- 74 // 删除 Table 路径
- 75 deleteTableData(tblPath, ifPurge);
- 76 // ok even if the data is not deleted
- 77
- //Listener 处理
- 78 for (MetaStoreEventListener listener : listeners) {
- 79 DropTableEvent dropTableEvent = new DropTableEvent(tbl, success, deleteData, this);
- 80 dropTableEvent.setEnvironmentContext(envContext);
- 81 listener.onDropTable(dropTableEvent);
- 82 }
- 83 }
- 84 return success;
- 85 }
我们继续深入 ObjectStore 中的 dropTable, 会发现 再一次通过 dbName 与 tableName 获取整个 Table 对象, 随后逐一删除也许代码并不是同一个人写的也可能是由于安全性考虑? 很多可以通过接口传入的 Table 对象, 都重新获取了, 这样会不会加重数据库的负担呢? ObjectStore 代码如下:
- public boolean dropTable(String dbName, String tableName) throws MetaException,
- NoSuchObjectException, InvalidObjectException, InvalidInputException {
- boolean success = false;
- try {
- openTransaction();
- // 重新获取 Table 对象
- MTable tbl = getMTable(dbName, tableName);
- pm.retrieve(tbl);
- if (tbl != null) {
- // 下列代码查询并删除所有的权限
- List<MTablePrivilege> tabGrants = listAllTableGrants(dbName, tableName);
- if (tabGrants != null && tabGrants.size() > 0) {
- pm.deletePersistentAll(tabGrants);
- }
- List<MTableColumnPrivilege> tblColGrants = listTableAllColumnGrants(dbName,
- tableName);
- if (tblColGrants != null && tblColGrants.size() > 0) {
- pm.deletePersistentAll(tblColGrants);
- }
- List<MPartitionPrivilege> partGrants = this.listTableAllPartitionGrants(dbName, tableName);
- if (partGrants != null && partGrants.size() > 0) {
- pm.deletePersistentAll(partGrants);
- }
- List<MPartitionColumnPrivilege> partColGrants = listTableAllPartitionColumnGrants(dbName,
- tableName);
- if (partColGrants != null && partColGrants.size() > 0) {
- pm.deletePersistentAll(partColGrants);
- }
- // delete column statistics if present
- try {
- // 删除 column 统计表数据
- deleteTableColumnStatistics(dbName, tableName, null);
- } catch (NoSuchObjectException e) {
- LOG.info("Found no table level column statistics associated with db" + dbName +
- "table" + tableName + "record to delete");
- }
- // 删除 mcd 表数据
- preDropStorageDescriptor(tbl.getSd());
- // 删除整个 Table 对象相关表数据
- pm.deletePersistentAll(tbl);
- }
- success = commitTransaction();
- } finally {
- if (!success) {
- rollbackTransaction();
- }
- }
- return success;
- }
- AlterTable
下来我们看下 AlterTable,AlterTable 包含的逻辑较多, 因为牵扯到物理存储上的路径修改等, 那么我们来一点点查看还是从 Hive 类中开始, 上代码:
- 1 public void alterTable(String tblName, Table newTbl, boolean cascade)
- 2 throws InvalidOperationException, HiveException {
- 3 String[] names = Utilities.getDbTableName(tblName);
- 4 try {
- 5 // 删除 table kv 中的 DDL_TIME 因为要 alterTable 所以, 该事件会被改变
- 6 if (newTbl.getParameters() != null) {
- 7 newTbl.getParameters().remove(hive_metastoreConstants.DDL_TIME);
- 8 }
- // 进行相关校验, 包含 dbNametableNamecolumninputOutClassoutputClass 的校验等, 如果校验不通过则抛出 HiveException
- 9 newTbl.checkValidity();
- // 调用 alterTable
- 10 getMSC().alter_table(names[0], names[1], newTbl.getTTable(), cascade);
- 11 } catch (MetaException e) {
- 12 throw new HiveException("Unable to alter table." + e.getMessage(), e);
- 13 } catch (TException e) {
- 14 throw new HiveException("Unable to alter table." + e.getMessage(), e);
- 15 }
- 16 }
对于 HiveMetaClient, 并没有做相应处理, 所以我们直接来看 HiveMetaStore 服务端做了些什么呢?
- 1 private void alter_table_core(final String dbname, final String name, final Table newTable,
- 2 final EnvironmentContext envContext, final boolean cascade)
- 3 throws InvalidOperationException, MetaException {
- 4 startFunction("alter_table", ": db=" + dbname + "tbl=" + name
- 5 + "newtbl=" + newTable.getTableName());
- 6
- 7 // 更新 DDL_Time
- 8 if (newTable.getParameters() == null ||
- 9 newTable.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
- 10 newTable.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
- 11 .currentTimeMillis() / 1000));
- 12 }
- 13 boolean success = false;
- 14 Exception ex = null;
- 15 try {
- // 获取已有 Table 的整个对象
- 16 Table oldt = get_table_core(dbname, name);
- // 进行 Event 处理
- 17 firePreEvent(new PreAlterTableEvent(oldt, newTable, this));
- // 进行 alterTable 处理, 后面详细说明
- 18 alterHandler.alterTable(getMS(), wh, dbname, name, newTable, cascade);
- 19 success = true;
- 20
- // 进行 Listener 处理
- 21 for (MetaStoreEventListener listener : listeners) {
- 22
- 23 AlterTableEvent alterTableEvent =
- 24 new AlterTableEvent(oldt, newTable, success, this);
- 25 alterTableEvent.setEnvironmentContext(envContext);
- 26 listener.onAlterTable(alterTableEvent);
- 27 }
- 28 } catch (NoSuchObjectException e) {
- 29 // thrown when the table to be altered does not exist
- 30 ex = e;
- 31 throw new InvalidOperationException(e.getMessage());
- 32 } catch (Exception e) {
- 33 ex = e;
- 34 if (e instanceof MetaException) {
- 35 throw (MetaException) e;
- 36 } else if (e instanceof InvalidOperationException) {
- 37 throw (InvalidOperationException) e;
- 38 } else {
- 39 throw newMetaException(e);
- 40 }
- 41 } finally {
- 42 endFunction("alter_table", success, ex, name);
- 43 }
- 44 }
那么, 我们重点看下 alterHandler 具体所做的事情, 在这之前简要说下 alterHandler 的初始化, 它是在 HiveMetaStore init 时获取的 hive.metastore.alter.impl 参数的 className, 也就是 HiveAlterHandler 的 name, 那么具体, 我们来看下它 alterTable 时的实现, 前方高能, 小心火烛:)
- 1 public void alterTable(RawStore msdb, Warehouse wh, String dbname,
- 2 String name, Table newt, boolean cascade) throws InvalidOperationException, MetaException {
- 3 if (newt == null) {
- 4 throw new InvalidOperationException("New table is invalid:" + newt);
- 5 }
- 6 // 校验新的 tableName 是否合法
- 7 if (!MetaStoreUtils.validateName(newt.getTableName())) {
- 8 throw new InvalidOperationException(newt.getTableName()
- 9 + "is not a valid object name");
- 10 }
- // 校验新的 column Name type 是否合法
- 11 String validate = MetaStoreUtils.validateTblColumns(newt.getSd().getCols());
- 12 if (validate != null) {
- 13 throw new InvalidOperationException("Invalid column" + validate);
- 14 }
- 15
- 16 Path srcPath = null;
- 17 FileSystem srcFs = null;
- 18 Path destPath = null;
- 19 FileSystem destFs = null;
- 20
- 21 boolean success = false;
- 22 boolean moveData = false;
- 23 boolean rename = false;
- 24 Table oldt = null;
- 25 List<ObjectPair<Partition, String>> altps = new ArrayList<ObjectPair<Partition, String>>();
- 26
- 27 try {
- 28 msdb.openTransaction();
- // 这里直接转换小写, 可以看出 代码不是一个人写的
- 29 name = name.toLowerCase();
- 30 dbname = dbname.toLowerCase();
- 31
- 32 // 校验新的 tableName 是否存在
- 33 if (!newt.getTableName().equalsIgnoreCase(name)
- 34 || !newt.getDbName().equalsIgnoreCase(dbname)) {
- 35 if (msdb.getTable(newt.getDbName(), newt.getTableName()) != null) {
- 36 throw new InvalidOperationException("new table" + newt.getDbName()
- 37 + "." + newt.getTableName() + "already exists");
- 38 }
- 39 rename = true;
- 40 }
- 41
- 42 // 获取老的 table 对象
- 43 oldt = msdb.getTable(dbname, name);
- 44 if (oldt == null) {
- 45 throw new InvalidOperationException("table" + newt.getDbName() + "."
- 46 + newt.getTableName() + "doesn't exist");
- 47 }
- 48 //alter Table 时 获取 METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES 配置项, 如果为 true 的话, 将改变 column 的 type 类型, 这里为 false
- 49 if (HiveConf.getBoolVar(hiveConf,
- 50 HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES,
- 51 false)) {
- 52 // Throws InvalidOperationException if the new column types are not
- 53 // compatible with the current column types.
- 54 MetaStoreUtils.throwExceptionIfIncompatibleColTypeChange(
- 55 oldt.getSd().getCols(), newt.getSd().getCols());
- 56 }
- 57 //cascade 参数由调用 Hive altertable 方法穿过来的, 也就是引擎调用时参数的设置, 这里用来查看是否需要 alterPartition 信息
- 58 if (cascade) {
- 59 // 校验新的 column 是否与老的 column 一致, 如不一致, 说明进行了 column 的添加或删除操作
- 60 if(MetaStoreUtils.isCascadeNeededInAlterTable(oldt, newt)) {
- // 根据 dbName 与 tableName 获取整个 partition 的信息
- 61 List<Partition> parts = msdb.getPartitions(dbname, name, -1);
- 62 for (Partition part : parts) {
- 63 List<FieldSchema> oldCols = part.getSd().getCols();
- 64 part.getSd().setCols(newt.getSd().getCols());
- 65 String oldPartName = Warehouse.makePartName(oldt.getPartitionKeys(), part.getValues());
- // 如果 columns 不一致, 则删除已有的 column 统计信息
- 66 updatePartColumnStatsForAlterColumns(msdb, part, oldPartName, part.getValues(), oldCols, part);
- // 更新整个 Partition 的信息
- 67 msdb.alterPartition(dbname, name, part.getValues(), part);
- 68 }
- 69 } else {
- 70 LOG.warn("Alter table does not cascade changes to its partitions.");
- 71 }
- 72 }
- 73
- 74 // 判断 parititonkey 是否改变, 也就是 dt 或 hour 等 partName 是否改变
- 76 boolean partKeysPartiallyEqual = checkPartialPartKeysEqual(oldt.getPartitionKeys(),
- 77 newt.getPartitionKeys());
- 78
- // 如果已有表为视图表, 同时发现老的 partkey 与新的 partKey 不一致, 则报错
- 79 if(!oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())){
- 80 if (oldt.getPartitionKeys().size() != newt.getPartitionKeys().size()
- 81 || !partKeysPartiallyEqual) {
- 82 throw new InvalidOperationException(
- 83 "partition keys can not be changed.");
- 84 }
- 85 }
- 86
- // 如果该表不为视图表, 同时, 该表的 location 信息并未发生变化, 同时新的 location 信息并不为空, 并且已有的该表不为外部表, 说明用户是想要移动数据到新的 location 地址, 那么该操作
- // 为 alter table rename 操作
- 91 if (rename
- 92 && !oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())
- 93 && (oldt.getSd().getLocation().compareTo(newt.getSd().getLocation()) == 0
- 94 || StringUtils.isEmpty(newt.getSd().getLocation()))
- 95 && !MetaStoreUtils.isExternalTable(oldt)) {
- 96 // 获取新的 location 信息
- 97 srcPath = new Path(oldt.getSd().getLocation());
- 98 srcFs = wh.getFs(srcPath);
- 99
- 100 // that means user is asking metastore to move data to new location
- 101 // corresponding to the new name
- 102 // get new location
- 103 Database db = msdb.getDatabase(newt.getDbName());
- 104 Path databasePath = constructRenamedPath(wh.getDatabasePath(db), srcPath);
- 105 destPath = new Path(databasePath, newt.getTableName());
- 106 destFs = wh.getFs(destPath);
- 107 // 设置新的 table location 信息 用于后续更新动作
- 108 newt.getSd().setLocation(destPath.toString());
- 109 moveData = true;
- 110
- // 校验物理目标地址是否存在, 如果存在则会 override 所有数据, 是不允许的
- 114 if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
- 115 throw new InvalidOperationException("table new location" + destPath
- 116 + "is on a different file system than the old location"
- 117 + srcPath + ". This operation is not supported");
- 118 }
- 119 try {
- 120 srcFs.exists(srcPath); // check that src exists and also checks
- 121 // permissions necessary
- 122 if (destFs.exists(destPath)) {
- 123 throw new InvalidOperationException("New location for this table"
- 124 + newt.getDbName() + "." + newt.getTableName()
- 125 + "already exists :" + destPath);
- 126 }
- 127 } catch (IOException e) {
- 128 throw new InvalidOperationException("Unable to access new location"
- 129 + destPath + "for table" + newt.getDbName() + "."
- 130 + newt.getTableName());
- 131 }
- 132 String oldTblLocPath = srcPath.toUri().getPath();
- 133 String newTblLocPath = destPath.toUri().getPath();
- 134
- 135 // 获取 old table 中的所有 partition 信息
- 136 List<Partition> parts = msdb.getPartitions(dbname, name, -1);
- 137 for (Partition part : parts) {
- 138 String oldPartLoc = part.getSd().getLocation();
- // 这里, 便开始新老 partition 地址的变换, 修改 partition 元数据信息
- 139 if (oldPartLoc.contains(oldTblLocPath)) {
- 140 URI oldUri = new Path(oldPartLoc).toUri();
- 141 String newPath = oldUri.getPath().replace(oldTblLocPath, newTblLocPath);
- 142 Path newPartLocPath = new Path(oldUri.getScheme(), oldUri.getAuthority(), newPath);
- 143 altps.add(ObjectPair.create(part, part.getSd().getLocation()));
- 144 part.getSd().setLocation(newPartLocPath.toString());
- 145 String oldPartName = Warehouse.makePartName(oldt.getPartitionKeys(), part.getValues());
- 146 try {
- 147 //existing partition column stats is no longer valid, remove them
- 148 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, part.getValues(), null);
- 149 } catch (InvalidInputException iie) {
- 150 throw new InvalidOperationException("Unable to update partition stats in table rename." + iie);
- 151 }
- 152 msdb.alterPartition(dbname, name, part.getValues(), part);
- 153 }
- 154 }
- // 更新 stats 相关信息
- 155 } else if (MetaStoreUtils.requireCalStats(hiveConf, null, null, newt) &&
- 156 (newt.getPartitionKeysSize() == 0)) {
- 157 Database db = msdb.getDatabase(newt.getDbName());
- 158 // Update table stats. For partitioned table, we update stats in
- 159 // alterPartition()
- 160 MetaStoreUtils.updateUnpartitionedTableStatsFast(db, newt, wh, false, true);
- 161 }
- 162 updateTableColumnStatsForAlterTable(msdb, oldt, newt);
- 163 // now finally call alter table
- 164 msdb.alterTable(dbname, name, newt);
- 165 // commit the changes
- 166 success = msdb.commitTransaction();
- 167 } catch (InvalidObjectException e) {
- 168 LOG.debug(e);
- 169 throw new InvalidOperationException(
- 170 "Unable to change partition or table."
- 171 + "Check metastore logs for detailed stack." + e.getMessage());
- 172 } catch (NoSuchObjectException e) {
- 173 LOG.debug(e);
- 174 throw new InvalidOperationException(
- 175 "Unable to change partition or table. Database" + dbname + "does not exist"
- 176 + "Check metastore logs for detailed stack." + e.getMessage());
- 177 } finally {
- 178 if (!success) {
- 179 msdb.rollbackTransaction();
- 180 }
- 181 if (success && moveData) {
- // 开始更新 hdfs 路径, 进行老路径的 rename 到新路径 , 调用 fileSystem 的 rename 操作
- 185 try {
- 186 if (srcFs.exists(srcPath) && !srcFs.rename(srcPath, destPath)) {
- 187 throw new IOException("Renaming" + srcPath + "to" + destPath + "failed");
- 188 }
- 189 } catch (IOException e) {
- 190 LOG.error("Alter Table operation for" + dbname + "." + name + "failed.", e);
- 191 boolean revertMetaDataTransaction = false;
- 192 try {
- 193 msdb.openTransaction();
- // 这里会发现, 又一次进行了 alterTable 元数据动作, 或许跟 JDO 的特性有关? 还是因为安全?
- 194 msdb.alterTable(newt.getDbName(), newt.getTableName(), oldt);
- 195 for (ObjectPair<Partition, String> pair : altps) {
- 196 Partition part = pair.getFirst();
- 197 part.getSd().setLocation(pair.getSecond());
- 198 msdb.alterPartition(newt.getDbName(), name, part.getValues(), part);
- 199 }
- 200 revertMetaDataTransaction = msdb.commitTransaction();
- 201 } catch (Exception e1) {
- 202 // we should log this for manual rollback by administrator
- 203 LOG.error("Reverting metadata by HDFS operation failure failed During HDFS operation failed", e1);
- 204 LOG.error("Table" + Warehouse.getQualifiedName(newt) +
- 205 "should be renamed to" + Warehouse.getQualifiedName(oldt));
- 206 LOG.error("Table" + Warehouse.getQualifiedName(newt) +
- 207 "should have path" + srcPath);
- 208 for (ObjectPair<Partition, String> pair : altps) {
- 209 LOG.error("Partition" + Warehouse.getQualifiedName(pair.getFirst()) +
- 210 "should have path" + pair.getSecond());
- 211 }
- 212 if (!revertMetaDataTransaction) {
- 213 msdb.rollbackTransaction();
- 214 }
- 215 }
- 216 throw new InvalidOperationException("Alter Table operation for" + dbname + "." + name +
- 217 "failed to move data due to:'" + getSimpleMessage(e) + "'See hive log file for details.");
- 218 }
- 219 }
- 220 }
- 221 if (!success) {
- 222 throw new MetaException("Committing the alter table transaction was not successful.");
- 223 }
- 224 }
- 6createPartition
在分区数据写入之前, 会先进行 partition 的元数据注册及物理文件路径的创建 (内部表),Hive 类代码如下:
- 1 public Partition createPartition(Table tbl, Map<String, String> partSpec) throws HiveException {
- 2 try {
- //new 出来一个 Partition 对象, 传入 Table 对象, 调用 Partition 的构造方法来 initialize Partition 的信息
- 3 return new Partition(tbl, getMSC().add_partition(
- 4 Partition.createMetaPartitionObject(tbl, partSpec, null)));
- 5 } catch (Exception e) {
- 6 LOG.error(StringUtils.stringifyException(e));
- 7 throw new HiveException(e);
- 8 }
- 9 }
这里的 createMetaPartitionObject 作用在于整个 Partition 传入对象的校验对对象的封装, 代码如下:
- public static org.apache.hadoop.hive.metastore.api.Partition createMetaPartitionObject(
- Table tbl, Map<String, String> partSpec, Path location) throws HiveException {
- List<String> pvals = new ArrayList<String>();
- // 遍历整个 PartCols, 并且校验 partMap 中是否一一对应
- for (FieldSchema field : tbl.getPartCols()) {
- String val = partSpec.get(field.getName());
- if (val == null || val.isEmpty()) {
- throw new HiveException("partition spec is invalid; field"
- + field.getName() + "does not exist or is empty");
- }
- pvals.add(val);
- }
- //set 相关的属性信息, 包括 DbNameTableNamePartValues 以及 sd 信息
- org.apache.hadoop.hive.metastore.api.Partition tpart =
- new org.apache.hadoop.hive.metastore.api.Partition();
- tpart.setDbName(tbl.getDbName());
- tpart.setTableName(tbl.getTableName());
- tpart.setValues(pvals);
- if (!tbl.isView()) {
- tpart.setSd(cloneS d(tbl));
- tpart.getSd().setLocation((location != null) ? location.toString() : null);
- }
- return tpart;
- }
随之 MetaDataClient 对于该对象调用 MetaDataService 的 addPartition, 并进行了深拷贝, 这里不再详细说明, 那么我们直接看下服务端干了什么:
- 1 private Partition add_partition_core(final RawStore ms,
- 2 final Partition part, final EnvironmentContext envContext)
- 3 throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
- 4 boolean success = false;
- 5 Table tbl = null;
- 6 try {
- 7 ms.openTransaction();
- // 根据 DbNameTableName 获取整个 Table 对象信息
- 8 tbl = ms.getTable(part.getDbName(), part.getTableName());
- 9 if (tbl == null) {
- 10 throw new InvalidObjectException(
- 11 "Unable to add partition because table or database do not exist");
- 12 }
- 13 // 事件处理
- 14 firePreEvent(new PreAddPartitionEvent(tbl, part, this));
- 15 // 在创建 Partition 之前, 首先会校验元数据中该 partition 是否存在
- 16 boolean shouldAdd = startAddPartition(ms, part, false);
- 17 assert shouldAdd; // start would throw if it already existed here
- // 创建 Partition 路径
- 18 boolean madeDir = createLocationForAddedPartition(tbl, part);
- 19 try {
- // 加载一些 kv 信息
- 20 initializeAddedPartition(tbl, part, madeDir);
- // 写入元数据
- 21 success = ms.addPartition(part);
- 22 } finally {
- 23 if (!success && madeDir) {
- // 如果没有成功, 便删除物理路径
- 24 wh.deleteDir(new Path(part.getSd().getLocation()), true);
- 25 }
- 26 }
- 27 // we proceed only if we'd actually succeeded anyway, otherwise,
- 28 // we'd have thrown an exception
- 29 success = success && ms.commitTransaction();
- 30 } finally {
- 31 if (!success) {
- 32 ms.rollbackTransaction();
- 33 }
- 34 fireMetaStoreAddPartitionEvent(tbl, Arrays.asList(part), envContext, success);
- 35 }
- 36 return part;
- 37 }
这里提及一个设计上的点, 从之前的表结构设计上, 没有直接存储 PartName, 而是将 key 与 value 单独存在与 kv 表中, 这里我们看下 createLocationForAddedPartition:
- 1 private boolean createLocationForAddedPartition(
- 2 final Table tbl, final Partition part) throws MetaException {
- 3 Path partLocation = null;
- 4 String partLocationStr = null;
- // 如果 sd 不为 null, 则将 sd 的 location 信息作为表跟目录赋给 partLocationStr
- 5 if (part.getSd() != null) {
- 6 partLocationStr = part.getSd().getLocation();
- 7 }
- 8 // 如果为 null, 则重新拼接 part Location
- 9 if (partLocationStr == null || partLocationStr.isEmpty()) {
- 10 // set default location if not specified and this is
- 11 // a physical table partition (not a view)
- 12 if (tbl.getSd().getLocation() != null) {
- // 如果不为 null, 则继续拼接文件路径及 part 的路径, 组成完成的 Partition location
- 13 partLocation = new Path(tbl.getSd().getLocation(), Warehouse
- 14 .makePartName(tbl.getPartitionKeys(), part.getValues()));
- 15 }
- 16 } else {
- 17 if (tbl.getSd().getLocation() == null) {
- 18 throw new MetaException("Cannot specify location for a view partition");
- 19 }
- 20 partLocation = wh.getDnsPath(new Path(partLocationStr));
- 21 }
- 22
- 23 boolean result = false;
- // 将 location 信息写入 sd 表
- 24 if (partLocation != null) {
- 25 part.getSd().setLocation(partLocation.toString());
- 26
- 27 // Check to see if the directory already exists before calling
- 28 // mkdirs() because if the file system is read-only, mkdirs will
- 29 // throw an exception even if the directory already exists.
- 30 if (!wh.isDir(partLocation)) {
- 31 if (!wh.mkdirs(partLocation, true)) {
- 32 throw new MetaException(partLocation
- 33 + "is not a directory or unable to create one");
- 34 }
- 35 result = true;
- 36 }
- 37 }
- 38 return result;
- 39 }
总结:
7dropPartition
删除 partition 就不再从 Hive 开始了, 我们直接看 HiveMetaStore 服务端做了什么:
- private boolean drop_partition_common(RawStore ms, String db_name, String tbl_name,
- List<String> part_vals, final boolean deleteData, final EnvironmentContext envContext)
- throws MetaException, NoSuchObjectException, IOException, InvalidObjectException,
- InvalidInputException {
- boolean success = false;
- Path partPath = null;
- Table tbl = null;
- Partition part = null;
- boolean isArchived = false;
- Path archiveParentDir = null;
- boolean mustPurge = false;
- try {
- ms.openTransaction();
- // 根据 dbNametableNamepart_values 获取整个 part 信息
- part = ms.getPartition(db_name, tbl_name, part_vals);
- // 获取所有 Table 对象
- tbl = get_table_core(db_name, tbl_name);
- firePreEvent(new PreDropPartitionEvent(tbl, part, deleteData, this));
- mustPurge = isMustPurge(envContext, tbl);
- if (part == null) {
- throw new NoSuchObjectException("Partition doesn't exist. "
- + part_vals);
- }
- // 这一片还没有深入看 Arrchived partition
- isArchived = MetaStoreUtils.isArchived(part);
- if (isArchived) {
- archiveParentDir = MetaStoreUtils.getOriginalLocation(part);
- verifyIsWritablePath(archiveParentDir);
- checkTrashPurgeCombination(archiveParentDir, db_name + "." + tbl_name + "." + part_vals, mustPurge);
- }
- if (!ms.dropPartition(db_name, tbl_name, part_vals)) {
- throw new MetaException("Unable to drop partition");
- }
- success = ms.commitTransaction();
- if ((part.getSd() != null) && (part.getSd().getLocation() != null)) {
- partPath = new Path(part.getSd().getLocation());
- verifyIsWritablePath(partPath);
- checkTrashPurgeCombination(partPath, db_name + "." + tbl_name + "." + part_vals, mustPurge);
- }
- } finally {
- if (!success) {
- ms.rollbackTransaction();
- } else if (deleteData && ((partPath != null) || (archiveParentDir != null))) {
- if (tbl != null && !isExternal(tbl)) {
- if (mustPurge) {
- LOG.info("dropPartition() will purge" + partPath + "directly, skipping trash.");
- }
- else {
- LOG.info("dropPartition() will move" + partPath + "to trash-directory.");
- }
- // 删除 partition
- // Archived partitions have har:/to_har_file as their location.
- // The original directory was saved in params
- if (isArchived) {
- assert (archiveParentDir != null);
- wh.deleteDir(archiveParentDir, true, mustPurge);
- } else {
- assert (partPath != null);
- wh.deleteDir(partPath, true, mustPurge);
- deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge);
- }
- // ok even if the data is not deleted
- }
- }
- for (MetaStoreEventListener listener : listeners) {
- DropPartitionEvent dropPartitionEvent =
- new DropPartitionEvent(tbl, part, success, deleteData, this);
- dropPartitionEvent.setEnvironmentContext(envContext);
- listener.onDropPartition(dropPartitionEvent);
- }
- }
- return true;
- }
- 8alterPartition
alterPartition 牵扯的校验及文件目录的修改, 我们直接从 HiveMetaStore 中的 rename_partition 中查看:
- private void rename_partition(final String db_name, final String tbl_name,
- final List<String> part_vals, final Partition new_part,
- final EnvironmentContext envContext)
- throws InvalidOperationException, MetaException,
- TException {
- // 日志记录
- startTableFunction("alter_partition", db_name, tbl_name);
- if (LOG.isInfoEnabled()) {
- LOG.info("New partition values:" + new_part.getValues());
- if (part_vals != null && part_vals.size() > 0) {
- LOG.info("Old Partition values:" + part_vals);
- }
- }
- Partition oldPart = null;
- Exception ex = null;
- try {
- firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, part_vals, new_part, this));
- // 校验 PartName 的规范性
- if (part_vals != null && !part_vals.isEmpty()) {
- MetaStoreUtils.validatePartitionNameCharacters(new_part.getValues(),
- partitionValidationPattern);
- }
调用 alterHandler 的 alterPartition 进行 partition 物理上的 rename, 以及元数据修改
- oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part);
- // Only fetch the table if we actually have a listener
- Table table = null;
- for (MetaStoreEventListener listener : listeners) {
- if (table == null) {
- table = getMS().getTable(db_name, tbl_name);
- }
- AlterPartitionEvent alterPartitionEvent =
- new AlterPartitionEvent(oldPart, new_part, table, true, this);
- alterPartitionEvent.setEnvironmentContext(envContext);
- listener.onAlterPartition(alterPartitionEvent);
- }
- } catch (InvalidObjectException e) {
- ex = e;
- throw new InvalidOperationException(e.getMessage());
- } catch (AlreadyExistsException e) {
- ex = e;
- throw new InvalidOperationException(e.getMessage());
- } catch (Exception e) {
- ex = e;
- if (e instanceof MetaException) {
- throw (MetaException) e;
- } else if (e instanceof InvalidOperationException) {
- throw (InvalidOperationException) e;
- } else if (e instanceof TException) {
- throw (TException) e;
- } else {
- throw newMetaException(e);
- }
- } finally {
- endFunction("alter_partition", oldPart != null, ex, tbl_name);
- }
- return;
- }
这里我们着重看一下, alterHandler.alterPartition 方法, 前方高能:
- 1 public Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname,
- 2 final String name, final List<String> part_vals, final Partition new_part)
- 3 throws InvalidOperationException, InvalidObjectException, AlreadyExistsException,
- 4 MetaException {
- 5 boolean success = false;
- 6
- 7 Path srcPath = null;
- 8 Path destPath = null;
- 9 FileSystem srcFs = null;
- 10 FileSystem destFs = null;
- 11 Partition oldPart = null;
- 12 String oldPartLoc = null;
- 13 String newPartLoc = null;
- 14
- 15 // 修改新的 partition 的 DDL 时间
- 16 if (new_part.getParameters() == null ||
- 17 new_part.getParameters().get(hive_metastoreConstants.DDL_TIME) == null ||
- 18 Integer.parseInt(new_part.getParameters().get(hive_metastoreConstants.DDL_TIME)) == 0) {
- 19 new_part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
- 20 .currentTimeMillis() / 1000));
- 21 }
- 22 // 根据 dbNametableName 获取整个 Table 对象
- 23 Table tbl = msdb.getTable(dbname, name);
- 24 // 如果传入的 part_vals 为空或为 0, 说明修改的只是 partition 的其他元数据信息而不牵扯到 partKV, 则直接元数据, 在 msdb.alterPartition 会直接更新
- 25 if (part_vals == null || part_vals.size() == 0) {
- 26 try {
- 27 oldPart = msdb.getPartition(dbname, name, new_part.getValues());
- 28 if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
- 29 MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
- 30 }
- 31 updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part);
- 32 msdb.alterPartition(dbname, name, new_part.getValues(), new_part);
- 33 } catch (InvalidObjectException e) {
- 34 throw new InvalidOperationException("alter is not possible");
- 35 } catch (NoSuchObjectException e){
- 36 //old partition does not exist
- 37 throw new InvalidOperationException("alter is not possible");
- 38 }
- 39 return oldPart;
- 40 }
- 41 //rename partition
- 42 try {
- 43 msdb.openTransaction();
- 44 try {
- // 获取 oldPart 对象信息
- 45 oldPart = msdb.getPartition(dbname, name, part_vals);
- 46 } catch (NoSuchObjectException e) {
- 47 // this means there is no existing partition
- 48 throw new InvalidObjectException(
- 49 "Unable to rename partition because old partition does not exist");
- 50 }
- 51 Partition check_part = null;
- 52 try {
- // 组装 newPart 的 partValues 等 Partition 信息
- 53 check_part = msdb.getPartition(dbname, name, new_part.getValues());
- 54 } catch(NoSuchObjectException e) {
- 55 // this means there is no existing partition
- 56 check_part = null;
- 57 }
- // 如果 check_part 组装成功, 说明该 part 已经存在, 则报 already exists
- 58 if (check_part != null) {
- 59 throw new AlreadyExistsException("Partition already exists:" + dbname + "." + name + "." +
- 60 new_part.getValues());
- 61 }
- //table 的信息校验
- 62 if (tbl == null) {
- 63 throw new InvalidObjectException(
- 64 "Unable to rename partition because table or database do not exist");
- 65 }
- 66
- 67 // 如果是外部表的分区变化了, 那么不需要操作文件系统, 直接更新 meta 信息即可
- 68 if (tbl.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
- 69 new_part.getSd().setLocation(oldPart.getSd().getLocation());
- 70 String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues());
- 71 try {
- 72 //existing partition column stats is no longer valid, remove
- 73 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null);
- 74 } catch (NoSuchObjectException nsoe) {
- 75 //ignore
- 76 } catch (InvalidInputException iie) {
- 77 throw new InvalidOperationException("Unable to update partition stats in table rename." + iie);
- 78 }
- 79 msdb.alterPartition(dbname, name, part_vals, new_part);
- 80 } else {
- 81 try {
- // 获取 Table 的文件路径
- 82 destPath = new Path(wh.getTablePath(msdb.getDatabase(dbname), name),
- 83 Warehouse.makePartName(tbl.getPartitionKeys(), new_part.getValues()));
- // 拼接新的 Partition 的路径
- 84 destPath = constructRenamedPath(destPath, new Path(new_part.getSd().getLocation()));
- 85 } catch (NoSuchObjectException e) {
- 86 LOG.debug(e);
- 87 throw new InvalidOperationException(
- 88 "Unable to change partition or table. Database" + dbname + "does not exist"
- 89 + "Check metastore logs for detailed stack." + e.getMessage());
- 90 }
- // 如果 destPath 不为空, 说明改变了文件路径
- 91 if (destPath != null) {
- 92 newPartLoc = destPath.toString();
- 93 oldPartLoc = oldPart.getSd().getLocation();
- 94 // 根据原有 sd 的路径获取老的 part 路径信息
- 95 srcPath = new Path(oldPartLoc);
- 96
- 97 LOG.info("srcPath:" + oldPartLoc);
- 98 LOG.info("descPath:" + newPartLoc);
- 99 srcFs = wh.getFs(srcPath);
- 100 destFs = wh.getFs(destPath);
- 101 // 查看 srcFS 与 destFs 是否 Wie 同一个 fileSystem
- 102 if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
- 103 throw new InvalidOperationException("table new location" + destPath
- 104 + "is on a different file system than the old location"
- 105 + srcPath + ". This operation is not supported");
- 106 }
- 107 try {
- // 校验老的 partition 路径与新的 partition 路径是否一致, 同时新的 partition 路径是否已经存在
- 108 srcFs.exists(srcPath); // check that src exists and also checks
- 109 if (newPartLoc.compareTo(oldPartLoc) != 0 && destFs.exists(destPath)) {
- 110 throw new InvalidOperationException("New location for this table"
- 111 + tbl.getDbName() + "." + tbl.getTableName()
- 112 + "already exists :" + destPath);
- 113 }
- 114 } catch (IOException e) {
- 115 throw new InvalidOperationException("Unable to access new location"
- 116 + destPath + "for partition" + tbl.getDbName() + "."
- 117 + tbl.getTableName() + " " + new_part.getValues());
- 118 }
- 119 new_part.getSd().setLocation(newPartLoc);
- 120 if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
- 121 MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
- 122 }
- // 拼接 oldPartName, 并且删除原有 oldPart 的信息, 写入新的 partition 信息
- 123 String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues());
- 124 try {
- 125 //existing partition column stats is no longer valid, remove
- 126 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null);
- 127 } catch (NoSuchObjectException nsoe) {
- 128 //ignore
- 129 } catch (InvalidInputException iie) {
- 130 throw new InvalidOperationException("Unable to update partition stats in table rename." + iie);
- 131 }
- 132 msdb.alterPartition(dbname, name, part_vals, new_part);
- 133 }
- 134 }
- 135
- 136 success = msdb.commitTransaction();
- 137 } finally {
- 138 if (!success) {
- 139 msdb.rollbackTransaction();
- 140 }
- 141 if (success && newPartLoc != null && newPartLoc.compareTo(oldPartLoc) != 0) {
- 142 //rename the data directory
- 143 try{
- 144 if (srcFs.exists(srcPath)) {
- 145 // 如果根路径海微创建, 需要重新进行创建, 就好比计算引擎先调用了 alterTable, 又调用了 alterPartition, 这时 partition 的根路径或许还未创建
- 146 Path destParentPath = destPath.getParent();
- 147 if (!wh.mkdirs(destParentPath, true)) {
- 148 throw new IOException("Unable to create path" + destParentPath);
- 149 }
- // 进行原路径与目标路径的 rename
- 150 wh.renameDir(srcPath, destPath, true);
- 151 LOG.info("rename done!");
- 152 }
- 153 } catch (IOException e) {
- 154 boolean revertMetaDataTransaction = false;
- 155 try {
- 156 msdb.openTransaction();
- 157 msdb.alterPartition(dbname, name, new_part.getValues(), oldPart);
- 158 revertMetaDataTransaction = msdb.commitTransaction();
- 159 } catch (Exception e1) {
- 160 LOG.error("Reverting metadata opeation failed During HDFS operation failed", e1);
- 161 if (!revertMetaDataTransaction) {
- 162 msdb.rollbackTransaction();
- 163 }
- 164 }
- 165 throw new InvalidOperationException("Unable to access old location"
- 166 + srcPath + "for partition" + tbl.getDbName() + "."
- 167 + tbl.getTableName() + " " + part_vals);
- 168 }
- 169 }
- 170 }
- 171 return oldPart;
- 172 }
暂时到这里吧~ 后续咱们慢慢玩哈~
来源: https://www.cnblogs.com/yangsy0915/p/8456806.html