YARN Node Label 功能最早是在 Hadoop 2.6 版本中引入, 在后续版本中有更多的功能完善. 到了 Hadoop 2.8.x 版本之后, 该功能已经比较完整, 可以满足日常使用. 在本文中, 我们需要将 Node Label 功能应用在 EMR 弹性伸缩场景中.
其实 Node Label 特性更准确的叫法是 Node Partition, 也就是说通过 label 把 YARN 集群中的节点分组, 每个节点拥有一个 label, 通过调度器的配置, 将作业 Task 调度到指定的节点中, 如果节点没有配置 Label, 那么这个节点属于 Label 为 DEFAULT 的 Partition.Hadoop 3.2 之后加入的 Node Attribute 功能是更加灵活的方案, 可以方便的给各个节点打上 OS/kernel version/CPU architecture/JDK version 等标签, 但这个功能在本文中就不展开了. 更多关于 Node Label 的信息请参考 Hadoop YARN PMC Wangda 的文章.
配置
首先需要给 YARN 服务打开 NodeLabel 功能, yarn-site.xml 需要增加配置:
- yarn.node-labels.enabled -> true
- yarn.node-labels.fs-store.root-dir -> /yarn/node-label
保存后需要重启 YARN ResourceManager.
节点的 Node Label
EMR 集群的 worker node 有两类节点, 分别是 core 节点 (除了 NodeManager 之外还运行 HDFS 和 HBase 等存储服务进程) 和 task 节点(只运行 NodeManager 之类的计算服务), 我们首先增加两个 node label
yarn rmadmin -addToClusterNodeLabels "core(exclusive=false),task(exclusive=false)"
exclusive=false 意思是这个 Label 的资源可以共享给申请 DEFAULT 资源的作业.
然后给各个节点打上标签(用你的机器名字替代 emr-worker-*.cluster-105364):
- yarn rmadmin -replaceLabelsOnNode \
- "emr-worker-1.cluster-105364=core emr-worker-2.cluster-105364=core emr-worker-3.cluster-105364=task"
Scheduler 的配置
目前只有 YARN Capacity Scheduler 支持 NodeLabel 功能, 我们以他为例, 而且假设一个最简单的场景, 集群内只有一个 queue(default).
在配置 node label 相关的信息之前, capacity-scheduler.xml 包含如下内容:
- <property>
- <name>yarn.scheduler.capacity.root.queues</name>
- <value>default</value>
- </property>
- <property>
- <name>yarn.scheduler.capacity.root.default.capacity</name>
- <value>100</value>
- </property>
- <property>
- <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
- <value>100</value>
- </property>
新增 Node Label 相关配置:
- <property>
- <name>yarn.scheduler.capacity.root.accessible-node-labels</name>
- <value>core,task</value>
- </property>
- <property>
- <name>yarn.scheduler.capacity.root.accessible-node-labels.core.capacity</name>
- <value>100</value>
- </property>
- <property>
- <name>yarn.scheduler.capacity.root.accessible-node-labels.task.capacity</name>
- <value>100</value>
- </property>
- <property>
- <name>yarn.scheduler.capacity.root.default.accessible-node-labels</name>
- <value>core,task</value>
- </property>
- <property>
- <name>yarn.scheduler.capacity.root.default.accessible-node-labels.core.capacity</name>
- <value>100</value>
- </property>
- <property>
- <name>yarn.scheduler.capacity.root.default.accessible-node-labels.task.capacity</name>
- <value>100</value>
- </property>
上述配置的意思是:
对于 Label 为 core 的资源, default queue 占用 100% 的资源
对于 Label 为 task 的资源, default queue 占用 100% 的资源
保存配置之后, 需要刷新 YARN Scheduler queue.
作业提交
在弹性伸缩场景下, task 节点可能随时被停止, 所以如果一个长时间运行的作业的 application master(AM)被运行在 Task 节点上, 当节点停止后 AM 就退出了, 可能会造成作业失败. 所以, 比较好的策略是 AM 只启动在 Core 节点上.
如果集群开启了 Node Label 功能, 我们可以通过配置作业的方式, 将 AM 启动在 Core 节点上. 本文中只介绍 MapReduce 和 Spark 两类作业
MapReduce 作业
提交作业如下:
- hadoop jar hadoop-mapreduce-examples-2.8.5.jar pi \
- -Dmapreduce.job.am.node-label-expression=core \
- 10 100000000
mapreduce.job.am.node-label-expression=core 参数指明了 MapReduce AM 申请 core 类型资源, 所以 AM 一定会启动在 Core 节点上. 同时, 因为没有指定普通 Task 运行的资源(也就是申请了 DEFAULT), 所以普通 Task 可以同时运行在 Core 和 Task 节点上(exclusive=false).
如果要希望 MapReduce Map 或 Reduce task 只运行在 Task 节点上, 可以在提交参数上指定:
- hadoop jar hadoop-mapreduce-examples-2.8.5.jar pi \
- -Dmapreduce.job.am.node-label-expression=core \
- -Dmapreduce.job.node-label-expression=task \
- 100 100000000
另外, 还有 mapreduce.map.node-label-expression 和 mapreduce.reduce.node-label-expression 等参数可配置.
Spark 作业
类似于 MapReduce Job, 我们可以指定 spark.yarn.am.nodeLabelExpression 和 spark.yarn.executor.nodeLabelExpression 将作业提交到不同的节点上. 下面这个例子是将 Spark AM 启动在 Core 节点上:
- spark-submit --class org.apache.spark.examples.SparkPi \
- --master yarn --deploy-mode cluster \
- --conf spark.yarn.am.nodeLabelExpression=core \
- spark-examples_2.11-2.3.2.jar 10
参考资料
- YARN Node Labels: Label-based scheduling and resource isolation
- YARN Node Labels
来源: https://yq.aliyun.com/articles/696446