Aapche Ranger 是以插件的形式集成到 HDFS 中,由 Ranger Admin 管理访问策略,Ranger 插件定期轮询 Admin 更新策略到本地,并根据策略信息进行用户访问权限的判定。其中提供管理员管理策略、插件的 Ranger web 和 Ranger Plugin,与 Admin 之间的通信是基于 HTTP 的 RESTful 架构。Ranger 集成 HDFS 的架构图如下:
HDFS 本身是有访问控制机制的,即在身份认证机制之后通过查询 ACLs 来对用户的权限检查,该权限检查的实现代码是 INodeAttributeProvider 抽象类中接口 AccessControlEnforcer 的 checkPermission 方法。Ranger 将 HDFS NameNode 的 Inode attribute 的提供类即 INodeAttributeProvider 抽象类修改为 Ranger 自己写的类,该类继承了 INodeAttributeProvider 抽象类。即在 hdfs-site.xml 文件中修改如下配置项。
- <name>
- dfs.namenode.inode.attributes.provider.class
- </name>
- <value>
- org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer
- </value>
在 Namenode 启动过程中编译 RangerHdfsAuthorizerorizer 类(包名为 ranger-hdfs-plugin-shim),将 RangerHdfs 的相关类动态加载进虚拟机,并实例化具体实现类 RangerHdfsAuthorizerorizer(包名为 ranger-hdfs-plugin)。
- 1rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE,this.getClass());
- 2@SuppressWarnings("unchecked")
- 3Class cls = (Class) Class.forName(RANGER_HDFS_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
- 4 activatePluginClassLoader();
- 5rangerHdfsAuthorizerImpl = cls.newInstance();
初始化 RangerPlugin,如上面的类图可知,RangerHdfsPlugin 是 RangerBasePlugin 类的子类,其具体的初始化是由父类的初始化方法来实现的。该方法主要完成了以下几个功能:
(1)调用 cleanup() 方法, 主要完成清空了 refresher、serviceName、policyEngine 这三个变量的值。(2)读取配置文件,并设置以下变量的初始值。
(3)设置 PangerPolicyEngineOptions 类的成员变量值。
(4)调用 createAdminClient(),创建 RangerAdmin 与 RangerPlugin 通信的客户端。这里使用的基于 RESTful 的通信风格,所以创建 RangerAdminClient 类的实例对象。
(5)创建 PolicyRefresher 类的对象,调用 startRefresher() 开启策略刷新器,根据轮询间隔时间定期从 Ranger Admin 拉取更新的策略。
Ranger 插件更新策略的流程图如下:
这部分主要讲 Ranger 插件如何通过调用 Ranger 定义好的 API 获取策略。这里使用了 jersey 来构建 RESTful 服务。
RangerAdminRESTClient # getServicePoliciesIfUpdated()
- 1
- if (isSecureMode) {
- 2 PrivilegedAction action = new PrivilegedAction() {
- 3 public ClientResponse run() {
- 4 // 创建WebResource实例,并根据URI和查询参数(lastKnownVersion、pluginId)构建URL
- 5 WebResource secureWebResource = createWebResource(RangerRESTUtils.REST_URL_POLICY_GET_FOR_SECURE_SERVICE_IF_UPDATED + serviceName) 6.queryParam(RangerRESTUtils.REST_PARAM_LAST_KNOWN_POLICY_VERSION, Long.toString(lastKnownVersion)) 7.queryParam(RangerRESTUtils.REST_PARAM_PLUGIN_ID, pluginId);
- 8 // 发送GET请求,并且mime类型为Json
- 9
- return secureWebResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class);
- 10
- };
- 11
- };
- 12 response = user.doAs(action);
- 13
- } else {
- 14 WebResource webResource = createWebResource(RangerRESTUtils.REST_URL_POLICY_GET_FOR_SERVICE_IF_UPDATED + serviceName) 15.queryParam(RangerRESTUtils.REST_PARAM_LAST_KNOWN_POLICY_VERSION, Long.toString(lastKnownVersion)) 16.queryParam(RangerRESTUtils.REST_PARAM_PLUGIN_ID, pluginId);
- 17 response = webResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class);
- 18
- }
- 19
- if (response != null && response.getStatus() == 200) { //200:请求成功
- 20 ret = response.getEntity(ServicePolicies.class);
- 21
- } else if (response != null && response.getStatus() == 304) { // 304:未修正
- 22 // no change
- 23
- }
在 RangerRESTUtils 类中定义了参数的值:
- public static finalString REST_URL_POLICY_GET_FOR_SERVICE_IF_UPDATED = "/service/plugins/policies/download/";
- public static finalString SERVICE_NAME_PARAM = "serviceName";
- public static finalString LAST_KNOWN_TAG_VERSION_PARAM = "lastKnownVersion";
- public static finalString REST_MIME_TYPE_JSON = "application/json" ;
ServiceREST # getServicePolicesIfUpdate()
- 1@GET 2@Path("/policies/download/{serviceName}") 3@Produces({
- "application/json",
- "application/xml"
- }) 4 public ServicePolicies getServicePoliciesIfUpdated(@PathParam("serviceName") String serviceName, @QueryParam("lastKnownVersion") Long lastKnownVersion, @QueryParam("pluginId") String pluginId, @Context HttpServletRequest request) throws Exception {
- 5 ServicePolicies ret = null;
- 6 int httpCode = HttpServletResponse.SC_OK;
- 7 String logMsg = null;
- 8 RangerPerfTracer perf = null;
- 9 10
- if (serviceUtil.isValidateHttpsAuthentication(serviceName, request)) {
- 11
- if (lastKnownVersion == null) {
- 12 lastKnownVersion = Long.valueOf( - 1);
- 13
- }
- 14
- try {
- 15 ServicePolicies servicePolicies = svcStore.getServicePoliciesIfUpdated(serviceName, lastKnownVersion); // 从数据库获取更新策略的过程
- 16
- if (servicePolicies == null) {
- 17 httpCode = HttpServletResponse.SC_NOT_MODIFIED;
- 18 logMsg = "No change since last update";
- 19
- } else {
- 20 ret = filterServicePolicies(servicePolicies);
- 21 httpCode = HttpServletResponse.SC_OK;
- 22 logMsg = "Returning " + (ret.getPolicies() != null ? ret.getPolicies().size() : 0) + " policies. Policy version=" + ret.getPolicyVersion();
- 23
- }
- 24
- }
- 25
- }
- 26
- return ret;
- 27
- }
来源: http://www.cnblogs.com/qiuyuesu/p/6774520.html