概述
Kubernetes 自推出以来, 以其完善的集群配额, 均衡, 故障恢复能力, 成为开源容器管理平台中的佼佼者. 从设计思路上, Spark 以开放 Cluster Manager 为理念, Kubernetes 则以多语言, 容器调度为卖点, 二者的结合是顺理成章的.
使用 Kubernetes 调度 Spark 的好处:
集中式资源调度: 接入 k8s 的 Spark 应用与其他 k8s 应用共享资源池.
多租户: 可利用 Kubernetes 的 namespace 和 ResourceQuota 做用户粒度的资源调度.
容器生态: 以监控为例, 开发者可利用 Prometheus 检测 Spark 应用的性能.
Kubernetes 社区早期尝试将 Spark 以 standalone 模式运行在 Kubernetes 上. https://issues.apache.org/jira/browse/SPARK-18278 则提出了一个子项目 apache-spark-on-k8s, 旨在支持 Spark driver/executor 的 Pod 化. 该项目于 2018 年正式合并到主版本, 在 Spark 2.3 发布.
(图片来自 databricks.com)
Spark 2.4 的 Kubernetes 支持包含以下特性:
支持 pyspark 应用
支持 R 语言应用
支持 client mode: 允许用户运行 spark-shell 或 Notebook, 可以是集群之外的单独机器, 或者是 k8s 集群的 pod.client mode 要求用户保证 driver 与 k8s 集群内 executor 之间的连通性. 如果 driver 运行在 ks8 集群的 pod 内, 推荐使用 headless service 以允许 executor 通过 FQDN 连接到 driver; 如果 driver 运行在 k8s 集群之外, 用户需要确保集群内的 executor Pod 可访问到 driver.
kubernetes operator 则是 k8s 上运行 Spark 另一种的途径, 用户可以通过 Helm chart 安装. 运行时转换为 CRD 对象执行.
以下我们以 spark-2.4.3-bin-hadoop2.7 为例, 使用 minikube 实验 Spark on Kubernetes.(minikube 是 Kubernetes 官方工具, 可运行单节点 k8s 服务, 方便在本机上测试.)
在 minikube 中测试 Spark on k8s
最低要求:
Spark 2.3 + 版本, client mode 需要 2.4+.
kubernetes 1.6 以上版本, 可调用 kubectl.
Kubernetes 启动 DNS.
cluster mode
1. 启动 minikube
运行 Spark 应用需要足够的系统资源, 执行以下命令并重新启动 minikube
- minikube config set memory 4096
- minikube config set cpus 4
或者显式指定资源启动 minikube
minikube start --cpus 4 --memory 4096
2. 构建 Spark 镜像
为简便起见, 直接在本地 k8s 环境中构建镜像, 以便 k8s 运行时直接取用. 注意: 构建时必须位于 Spark 安装根目录下.
./bin/docker-image-tool.sh -r <repo> -t my-tag build
或者
- eval $(minikube docker-env)
- docker build -t spark:2.4.3 -f ./kubernetes/dockerfiles/spark/Dockerfile .
也可以将构建镜像推送到 k8s 集群可达的镜像仓库中.
3. 执行 examples
先为 Spark 应用配置必要的 serviceAccount.
- kubectl create serviceaccount spark
- kubectl create clusterrolebinding spark-role --clusterrole=admin --serviceaccount=default:spark --namespace=default
master 地址可以通过 kubectl 获取
kubectl cluster-info
本机执行 spark-submit
- ./bin/spark-submit \
- --master k8s://https://192.168.99.103:8443 \
- --deploy-mode cluster \
- --name spark-pi \
- --class org.apache.spark.examples.SparkPi \
- --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
- --conf spark.executor.instances=2 \
- --conf spark.kubernetes.container.image=spark:2.4.3 \
- local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar
启动控制台 minikube dashboard, 在可以在界面上看到对应的 pods
我们可以通过 kubectl 查看其运行日志. 程序运行结束之后, driver Pod 仍然保留.
- kubectl -n=default logs -f spark-pi-4e673e6fc64432bb8bda3f5632ce9596-driver
- client mode
以下演示在 k8s 集群的独立 Pod 中启动 Spark 应用和 spark-shell 的方法.
1. 准备独立 Pod
用任意 Linux 镜像运行.
kubectl run transfer1 -it -n default --image=CentOS:centos7 --serviceaccount='spark' -- /bin/sh
如果可以进入命令行, 说明 Pod 已经启动, 查看一下 Pod 列表.
- kubectl get pods
- NAME READY STATUS RESTARTS AGE
- transfer1-584c678c8c-fh8s6 1/1 Running 0 12m
进入 transfer1 Pod, 安装 OpenJDK 和 Spark.
2. 暴露 service
我们任意指定一个端口暴露, 后续 client mode 将通过去 DNS 去查找 Driver Pod 的位置, 这也是 Spark on k8s 要求 DNS 的原因.
- kubectl expose deployment transfer1 --port=19987 --type=ClusterIP --cluster-ip=None
- kubectl get services
- NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
- kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 41h
- transfer1 ClusterIP None <none> 19987/TCP 13m
3. 在 Pod 中启动 Spark 应用
- ./bin/spark-submit \
- --master k8s://https://192.168.99.103:8443 \
- --deploy-mode client \
- --name spark-pi \
- --class org.apache.spark.examples.SparkPi \
- --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
- --conf spark.executor.instances=2 \
- --conf spark.kubernetes.container.image=spark:2.4.3 \
- --conf spark.driver.host=transfer1.default.svc.cluster.local \
- --conf spark.driver.port=19987 \
- /root/spark-2.4.3-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.3.jar
注意运行入口指向是本地的 Pod 的 jar 包.
启动 spark-shell:
- ./bin/spark-shell \
- --master k8s://https://192.168.99.103:8443 \
- --deploy-mode client \
- --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
- --conf spark.kubernetes.namespace=default \
- --conf spark.driver.pod.name=transfer1driverpod \
- --conf spark.executor.instances=2 \
- --conf spark.kubernetes.container.image=spark:2.4.3 \
- --conf spark.driver.host=transfer1.default.svc.cluster.local \
- --conf spark.driver.port=19987
稍等片刻 spark-shell 即启动完成.
Pod 创建原理
镜像
先了解 Spark 镜像做了什么. 打开 Spark 发行目录下的 kubernetes/dockerfiles/spark/Dockerfile, 可以发现, Dockerfile 只做了 jars,bin,sbin 等目录的文件拷贝, 指向 / opt/entrypoint.sh 作为镜像入口.
entrypoint.sh 支持传入 "driver" 或者 "executor" 参数 (对于 python 和 R 支持, 则是 dirver-py 和 dirver-r), 这样, 默认容器即支持创建 driver 或者 executor 容器. 无需用户显式提供 spark.kubernetes.driver.container.image 参数.
组件
Driver: 以 headless service 存在.
Executor: 数量可以由 spark-submit 参数指定, 也支持动态资源配置.
k8s API Server: Spark 通过 API Server 创建与删除 Pod.
spark-submit 之后
再看 spark-submit,SparkSubmit 类会匹配 master 参数, 如果以 "k8s" 开头, 则会装载对应的 submit 类, 对于 Spark 2.4, 这个 submit 类是 org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.
KubernetesClientApplication 中创建 driver Pod:
- // resolvedDriverPod 是基于 KubernetesDriverBuilder, 读取 conf 创建的 Pod 定义
- kubernetesClient
- .pods()
- .withName(resolvedDriverPod.getMetadata.getName)
- .watch(watcher)) { _ =>
- val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
- try {
- val otherKubernetesResources =
- resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
- addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
- kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
- } catch {
- // 如果创建失败则移除 driver Pod
- case NonFatal(e) =>
- kubernetesClient.pods().delete(createdDriverPod)
- throw e
- }
另一方面, 与 Spark on YARN 的初始化过程类似, SparkContext 装载 ExternalClusterManager 的子类 KubernetesClusterManager, 并初始化 ExecutorPodsAllocator 和 k8s 的 DefaultKubernetesClient, 初始化 KubernetesClusterSchedulerBackend.
KubernetesClusterSchedulerBackend 继承了 CoarseGrainedSchedulerBackend, 请求创建 executor 时, 其重载的 doRequestTotalExecutors 方法, 使用 PodAllocator 中的内部线程创建 executor Pod, 后者会自动增减 executor 数量; killExecutors 时调用 doKillExecutors 来销毁 Pods.
- override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
- podAllocator.setTotalExpectedExecutors(requestedTotal)
- true
- }
- ...
- override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
- kubernetesClient
- .pods()
- .withLabel(SPARK_APP_ID_LABEL, applicationId())
- .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
- .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
- .delete()
- // Don't do anything else - let event handling from the Kubernetes API do the Spark changes
- }
总结
Kubernetes 为 Spark 开发者提供了新的调度手段, Spark 2.4 支持 cluster/client mode 运行, client mode 可以运行在单独 Pod 或者 k8s 集群之外.
Spark on Kubernetes 项目正在快速发展之中, 目前支持的功能仍然是实验性质的, 未来其内部实现可能会发生变化, 对于 Spark 3.0 版本中的 k8s 新特性, 我们不妨拭目以待.
来源: https://yq.aliyun.com/articles/702055