Spark Core 面试篇 01
随着 Spark 技术在企业中应用越来越广泛, Spark 成为大数据开发必须掌握的技能. 前期分享了很多关于 Spark 的学习视频和文章, 为了进一步巩固和掌握 Spark, 在原有 spark 专刊基础上, 新增《Spark 面试 2000 题》专刊, 题集包含基础概念, 原理, 编码开发, 性能调优, 运维, 源代码以及 Spark 周边生态系统等. 部分题集来源于互联网, 由梅峰谷志愿者收集和整理, 部分题集由梅峰谷志愿者结合生产实际碰到的问题设计出来, 希望能给大家带来帮助.
一, 简答题
1.Spark master 使用 zookeeper 进行 HA 的, 有哪些元数据保存在 Zookeeper?
答: spark 通过这个参数 spark.deploy.zookeeper.dir 指定 master 元数据在 zookeeper 中保存的位置, 包括 Worker,Driver 和 Application 以及 Executors.standby 节点要从 zk 中, 获得元数据信息, 恢复集群运行状态, 才能对外继续提供服务, 作业提交资源申请等, 在恢复前是不能接受请求的. 另外, Master 切换需要注意 2 点
1)在 Master 切换的过程中, 所有的已经在运行的程序皆正常运行! 因为 Spark Application 在运行前就已经通过 Cluster Manager 获得了计算资源, 所以在运行时 Job 本身的调度和处理和 Master 是没有任何关系的!
2) 在 Master 的切换过程中唯一的影响是不能提交新的 Job: 一方面不能够提交新的应用程序给集群, 因为只有 Active Master 才能接受新的程序的提交请求; 另外一方面, 已经运行的程序中也不能够因为 Action 操作触发新的 Job 的提交请求;
2.Spark master HA 主从切换过程不会影响集群已有的作业运行, 为什么?
答: 因为程序在运行之前, 已经申请过资源了, driver 和 Executors 通讯, 不需要和 master 进行通讯的.
3.Spark on Mesos 中, 什么是的粗粒度分配, 什么是细粒度分配, 各自的优点和缺点是什么?
答: 1)粗粒度: 启动时就分配好资源, 程序启动, 后续具体使用就使用分配好的资源, 不需要再分配资源; 好处: 作业特别多时, 资源复用率高, 适合粗粒度; 不好: 容易资源浪费, 假如一个 job 有 1000 个 task, 完成了 999 个, 还有一个没完成, 那么使用粗粒度, 999 个资源就会闲置在那里, 资源浪费. 2)细粒度分配: 用资源的时候分配, 用完了就立即回收资源, 启动会麻烦一点, 启动一次分配一次, 会比较麻烦.
4. 如何配置 spark master 的 HA?
1)配置 zookeeper
2)修改 spark_env.sh 文件, spark 的 master 参数不在指定, 添加如下代码到各个 master 节点
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=zk01:2181,zk02:2181,zk03:2181 -Dspark.deploy.zookeeper.dir=/spark"
3) 将 spark_env.sh 分发到各个节点
4)找到一个 master 节点, 执行./start-all.sh, 会在这里启动主 master, 其他的 master 备节点, 启动 master 命令: ./sbin/start-master.sh
5)提交程序的时候指定 master 的时候要指定三台 master, 例如
./spark-shell --master spark://master01:7077,master02:7077,master03:7077
5.Apache Spark 有哪些常见的稳定版本, Spark1.6.0 的数字分别代表什么意思?
答: 常见的大的稳定版本有 Spark 1.3,Spark1.6, Spark 2.0 ,Spark1.6.0 的数字含义
1)第一个数字: 1
major version : 代表大版本更新, 一般都会有一些 API 的变化, 以及大的优化或是一些结构的改变;
2)第二个数字: 6
minor version : 代表小版本更新, 一般会新加 API, 或者是对当前的 API 就行优化, 或者是其他内容的更新, 比如说 web UI 的更新等等;
3)第三个数字: 0
patch version , 代表修复当前小版本存在的一些 bug, 基本不会有任何 API 的改变和功能更新; 记得有一个大神曾经说过, 如果要切换 spark 版本的话, 最好选 patch version 非 0 的版本, 因为一般类似于 1.2.0, ... 1.6.0 这样的版本是属于大更新的, 有可能会有一些隐藏的 bug 或是不稳定性存在, 所以最好选择 1.2.1, ... 1.6.1 这样的版本.
通过版本号的解释说明, 可以很容易了解到, spark2.1.1 的发布时是针对大版本 2.1 做的一些 bug 修改, 不会新增功能, 也不会新增 API, 会比 2.1.0 版本更加稳定.
6.driver 的功能是什么?
答: 1)一个 Spark 作业运行时包括一个 Driver 进程, 也是作业的主进程, 具有 main 函数, 并且有 SparkContext 的实例, 是程序的人口点; 2)功能: 负责向集群申请资源, 向 master 注册信息, 负责了作业的调度,, 负责作业的解析, 生成 Stage 并调度 Task 到 Executor 上. 包括 DAGScheduler,TaskScheduler.
7.spark 的有几种部署模式, 每种模式特点?
1)本地模式
Spark 不一定非要跑在 hadoop 集群, 可以在本地, 起多个线程的方式来指定. 将 Spark 应用以多线程的方式直接运行在本地, 一般都是为了方便调试, 本地模式分三类
. local: 只启动一个 executor
. local[k]: 启动 k 个 executor
. local: 启动跟 CPU 数目相同的 executor
2)standalone 模式
分布式部署集群, 自带完整的服务, 资源管理和任务监控是 Spark 自己监控, 这个模式也是其他模式的基础,
3)Spark on yarn 模式
分布式部署集群, 资源和任务监控交给 yarn 管理, 但是目前仅支持粗粒度资源分配方式, 包含 cluster 和 client 运行模式, cluster 适合生产, driver 运行在集群子节点, 具有容错功能, client 适合调试, dirver 运行在客户端
4)Spark On Mesos 模式. 官方推荐这种模式(当然, 原因之一是血缘关系). 正是由于 Spark 开发之初就考虑到支持 Mesos, 因此, 目前而言, Spark 运行在 Mesos 上会比运行在 YARN 上更加灵活, 更加自然. 用户可选择两种调度模式之一运行自己的应用程序:
1) 粗粒度模式(Coarse-grained Mode): 每个应用程序的运行环境由一个 Dirver 和若干个 Executor 组成, 其中, 每个 Executor 占用若干资源, 内部可运行多个 Task(对应多少个 "slot"). 应用程序的各个任务正式运行之前, 需要将运行环境中的资源全部申请好, 且运行过程中要一直占用这些资源, 即使不用, 最后程序运行结束后, 回收这些资源.
2) 细粒度模式(Fine-grained Mode): 鉴于粗粒度模式会造成大量资源浪费, Spark On Mesos 还提供了另外一种调度模式: 细粒度模式, 这种模式类似于现在的云计算, 思想是按需分配.
8.Spark 技术栈有哪些组件, 每个组件都有什么功能, 适合什么应用场景?
答: 可以画一个这样的技术栈图先, 然后分别解释下每个组件的功能和场景
file:///E:/安装软件/有道笔记文件/qq19B99AF2399E52F466CC3CF7E3B24ED5/dc318cd93346448487e9f423ce499b4b/d1d97571615f01111094fdcae4bed078.jpg
1)Spark core: 是其它组件的基础, spark 的内核, 主要包含: 有向循环图, RDD,Lingage,Cache,broadcast 等, 并封装了底层通讯框架, 是 Spark 的基础.
2)SparkStreaming 是一个对实时数据流进行高通量, 容错处理的流式处理系统, 可以对多种数据源 (如 Kdfka,Flume,Twitter,Zero 和 TCP 套接字) 进行类似 Map,Reduce 和 Join 等复杂操作, 将流式计算分解成一系列短小的批处理作业.
3)Spark sql:Shark 是 SparkSQL 的前身, Spark SQL 的一个重要特点是其能够统一处理关系表和 RDD, 使得开发人员可以轻松地使用 SQL 命令进行外部查询, 同时进行更复杂的数据分析
4)BlinkDB : 是一个用于在海量数据上运行交互式 SQL 查询的大规模并行查询引擎, 它允许用户通过权衡数据精度来提升查询响应时间, 其数据的精度被控制在允许的误差范围内.
5)MLBase 是 Spark 生态圈的一部分专注于机器学习, 让机器学习的门槛更低, 让一些可能并不了解机器学习的用户也能方便地使用 MLbase.MLBase 分为四部分: MLlib,MLI,ML Optimizer 和 MLRuntime.
6)GraphX 是 Spark 中用于图和图并行计算
9.Spark 中 Work 的主要工作是什么?
答: 主要功能: 管理当前节点内存, CPU 的使用状况, 接收 master 分配过来的资源指令, 通过 ExecutorRunner 启动程序分配任务, worker 就类似于包工头, 管理分配新进程, 做计算的服务, 相当于 process 服务. 需要注意的是: 1)worker 会不会汇报当前信息给 master,worker 心跳给 master 主要只有 workid, 它不会发送资源信息以心跳的方式给 mater,master 分配的时候就知道 work, 只有出现故障的时候才会发送资源. 2)worker 不会运行代码, 具体运行的是 Executor 是可以运行具体 appliaction 写的业务逻辑代码, 操作代码的节点, 它不会运行程序的代码的.
10.Spark 为什么比 mapreduce 快?
答: 1)基于内存计算, 减少低效的磁盘交互; 2)高效的调度算法, 基于 DAG;3)容错机制 Linage, 精华部分就是 DAG 和 Lingae
11. 简单说一下 hadoop 和 spark 的 shuffle 相同和差异?
答: 1)从 high-level 的角度来看, 两者并没有大的差别. 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition, 不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask, 也可能是 ResultTask).Reducer 以内存作缓冲区, 边 shuffle 边 aggregate 数据, 等到数据 aggregate 好以后进行 reduce() (Spark 里可能是后续的一系列操作).
2)从 low-level 的角度来看, 两者差别不小. Hadoop MapReduce 是 sort-based, 进入 combine() 和 reduce() 的 records 必须先 sort. 这样的好处在于 combine/reduce() 可以处理大规模的数据, 因为其输入数据可以通过外排得到(mapper 对每段数据先做排序, reducer 的 shuffle 对排好序的每段数据做归并). 目前的 Spark 默认选择的是 hash-based, 通常使用 HashMap 来对 shuffle 来的数据进行 aggregate, 不会对数据进行提前排序. 如果用户需要经过排序的数据, 那么需要自己调用类似 sortByKey() 的操作; 如果你是 Spark 1.1 的用户, 可以将 spark.shuffle.manager 设置为 sort, 则会对数据进行排序. 在 Spark 1.2 中, sort 将作为默认的 Shuffle 实现.
3)从实现角度来看, 两者也有不少差别. Hadoop MapReduce 将处理流程划分出明显的几个阶段: map(), spill, merge, shuffle, sort, reduce() 等. 每个阶段各司其职, 可以按照过程式的编程思想来逐一实现每个阶段的功能. 在 Spark 中, 没有这样功能明确的阶段, 只有不同的 stage 和一系列的 transformation(), 所以 spill, merge, aggregate 等操作需要蕴含在 transformation() 中.
如果我们将 map 端划分数据, 持久化数据的过程称为 shuffle write, 而将 reducer 读入数据, aggregate 数据的过程称为 shuffle read. 那么在 Spark 中, 问题就变为怎么在 job 的逻辑或者物理执行图中加入 shuffle write 和 shuffle read 的处理逻辑? 以及两个处理逻辑应该怎么高效实现?
Shuffle write 由于不要求数据有序, shuffle write 的任务很简单: 将数据 partition 好, 并持久化. 之所以要持久化, 一方面是要减少内存存储空间压力, 另一方面也是为了 fault-tolerance.
12.Mapreduce 和 Spark 的都是并行计算, 那么他们有什么相同和区别
答: 两者都是用 mr 模型来进行并行计算:
1)hadoop 的一个作业称为 job,job 里面分为 map task 和 reduce task, 每个 task 都是在自己的进程中运行的, 当 task 结束时, 进程也会结束.
2)spark 用户提交的任务成为 application, 一个 application 对应一个 sparkcontext,App 中存在多个 job, 每触发一次 action 操作就会产生一个 job. 这些 job 可以并行或串行执行, 每个 job 中有多个 stage,stage 是 shuffle 过程中 DAGSchaduler 通过 RDD 之间的依赖关系划分 job 而来的, 每个 stage 里面有多个 task, 组成 taskset 有 TaskSchaduler 分发到各个 executor 中执行, executor 的生命周期是和 App 一样的, 即使没有 job 运行也是存在的, 所以 task 可以快速启动读取内存进行计算.
3)hadoop 的 job 只有 map 和 reduce 操作, 表达能力比较欠缺而且在 mr 过程中会重复的读写 hdfs, 造成大量的 io 操作, 多个 job 需要自己管理关系.
spark 的迭代计算都是在内存中进行的, API 中提供了大量的 RDD 操作如 join,groupby 等, 而且通过 DAG 图可以实现良好的容错.
13.RDD 机制?
答: rdd 分布式弹性数据集, 简单的理解成一种数据结构, 是 spark 框架上的通用货币.
所有算子都是基于 rdd 来执行的, 不同的场景会有不同的 rdd 实现类, 但是都可以进行互相转换.
rdd 执行过程中会形成 dag 图, 然后形成 lineage 保证容错性等. 从物理的角度来看 rdd 存储的是 block 和 node 之间的映射.
14,spark 有哪些组件?
答: 主要有如下组件:
1)master: 管理集群和节点, 不参与计算.
2)worker: 计算节点, 进程本身不参与计算, 和 master 汇报.
3)Driver: 运行程序的 main 方法, 创建 spark context 对象.
4)spark context: 控制整个 application 的生命周期, 包括 dagsheduler 和 task scheduler 等组件.
5)client: 用户提交程序的入口.
15,spark 工作机制?
答: 用户在 client 端提交作业后, 会由 Driver 运行 main 方法并创建 spark context 上下文.
执行 add 算子, 形成 dag 图输入 dagscheduler, 按照 add 之间的依赖关系划分 stage 输入 task scheduler. task scheduler 会将 stage 划分为 task set 分发到各个节点的 executor 中执行.
16,spark 的优化怎么做?
答: spark 调优比较复杂, 但是大体可以分为三个方面来进行, 1)平台层面的调优: 防止不必要的 jar 包分发, 提高数据的本地性, 选择高效的存储格式如 parquet,2)应用程序层面的调优: 过滤操作符的优化降低过多小任务, 降低单条记录的资源开销, 处理数据倾斜, 复用 RDD 进行缓存, 作业并行化执行等等, 3)JVM 层面的调优: 设置合适的资源量, 设置合理的 JVM, 启用高效的序列化方法如 kyro, 增大 off head 内存等等
17. 简要描述 Spark 分布式集群搭建的步骤
1)准备 Linux 环境, 设置集群搭建账号和用户组, 设置 SSH, 关闭防火墙, 关闭 seLinux, 配置 host,hostname
2)配置 jdk 到环境变量
3)搭建 hadoop 集群, 如果要做 master ha, 需要搭建 zookeeper 集群
修改 hdfs-site.xml,hadoop_env.sh,yarn-site.xml,slaves 等配置文件
4)启动 hadoop 集群, 启动前要格式化 namenode
5)配置 spark 集群, 修改 spark-env.xml,slaves 等配置文件, 拷贝 hadoop 相关配置到 spark conf 目录下
6)启动 spark 集群.
18. 什么是 RDD 宽依赖和窄依赖?
RDD 和它依赖的 parent RDD(s)的关系有两种不同的类型, 即窄依赖 (narrow dependency) 和宽依赖(wide dependency).
1)窄依赖指的是每一个 parent RDD 的 Partition 最多被子 RDD 的一个 Partition 使用
2)宽依赖指的是多个子 RDD 的 Partition 会依赖同一个 parent RDD 的 Partition
19.spark-submit 的时候如何引入外部 jar 包
方法一: spark-submit -jars
根据 spark 官网, 在提交任务的时候指定 - jars, 用逗号分开. 这样做的缺点是每次都要指定 jar 包, 如果 jar 包少的话可以这么做, 但是如果多的话会很麻烦.
命令: spark-submit --master yarn-client --jars ***.jar,***.jar
方法二: extraClassPath
提交时在 spark-default 中设定参数, 将所有需要的 jar 包考到一个文件里, 然后在参数中指定该目录就可以了, 较上一个方便很多:
spark.executor.extraClassPath=/home/hadoop/wzq_workspace/lib/* spark.driver.extraClassPath=/home/hadoop/wzq_workspace/lib/*
需要注意的是, 你要在所有可能运行 spark 任务的机器上保证该目录存在, 并且将 jar 包考到所有机器上. 这样做的好处是提交代码的时候不用再写一长串 jar 了, 缺点是要把所有的 jar 包都拷一遍.
20.cache 和 pesist 的区别
答: 1)cache 和 persist 都是用于将一个 RDD 进行缓存的, 这样在之后使用的过程中就不需要重新计算了, 可以大大节省程序运行时间; 2) cache 只有一个默认的缓存级别 MEMORY_ONLY ,cache 调用了 persist, 而 persist 可以根据情况设置其它的缓存级别; 3)executor 执行的时候, 默认 60% 做 cache,40% 做 task 操作, persist 最根本的函数, 最底层的函数
二, 选择题
1. Spark 的四大组件下面哪个不是 (D )
- A.Spark Streaming B. Mlib
- C Graphx D.Spark R
2. 下面哪个端口不是 spark 自带服务的端口 (C )
A.8080 B.4040 C.8090 D.18080
备注: 8080:spark 集群 Web ui 端口, 4040:sparkjob 监控端口, 18080:jobhistory 端口
3.spark 1.4 版本的最大变化 (B )
A spark sql Release 版本 B . 引入 Spark R
C DataFrame D. 支持动态资源分配
4. Spark Job 默认的调度模式 (A )
A FIFO B FAIR
C 无 D 运行时指定
5. 哪个不是本地模式运行的个条件 ( D)
A spark.localExecution.enabled=true
B 显式指定本地运行
C finalStage 无父 Stage
D partition 默认值
6. 下面哪个不是 RDD 的特点 (C )
A. 可分区 B 可序列化 C 可修改 D 可持久化
7. 关于广播变量, 下面哪个是错误的 (D )
A 任何函数调用 B 是只读的
C 存储在各个节点 D 存储在磁盘或 HDFS
8. 关于累加器, 下面哪个是错误的 (D )
A 支持加法 B 支持数值类型
C 可并行 D 不支持自定义类型
9.Spark 支持的分布式部署方式中哪个是错误的 (D )
- A standalone B spark on mesos
- C spark on YARN D Spark on local
10.Stage 的 Task 的数量由什么决定 (A )
A Partition B Job C Stage D TaskScheduler
11. 下面哪个操作是窄依赖 (B )
- A join B filter
- C group D sort
12. 下面哪个操作肯定是宽依赖 (C )
- A map B flatMap
- C reduceByKey D sample
13.spark 的 master 和 worker 通过什么方式进行通信的? (D )
A http B nio C netty D Akka
14 默认的存储级别 (A )
- A MEMORY_ONLY B MEMORY_ONLY_SER
- C MEMORY_AND_DISK D MEMORY_AND_DISK_SER
15 spark.deploy.recoveryMode 不支持那种 (D )
- A.ZooKeeper B. FileSystem
- D NONE D Hadoop
16. 下列哪个不是 RDD 的缓存方法 (C )
- A persist() B Cache()
- C Memory()
17.Task 运行在下来哪里个选项中 Executor 上的工作单元 (C )
- A Driver program B. spark master
- C.worker node D Cluster manager
18.hive 的元数据存储在 derby 和 MySQL 中有什么区别 (B )
A. 没区别 B. 多会话
C. 支持网络环境 D 数据库的区别
19.DataFrame 和 RDD 最大的区别 (B )
A. 科学统计支持 B. 多了 schema
C. 存储方式不一样 D. 外部数据源支持
20.Master 的 ElectedLeader 事件后做了哪些操作 (D )
A. 通知 driver B. 通知 worker
C. 注册 application D. 直接 ALIVE
-----------------------------------------------------------------------------------------------------------------------------
[Spark 面试 2000 题 41-70] Spark core 面试篇 02
这批 Spark 面试题由志愿者 Taffry(某高校研究生)提供, 非常感谢志愿者的优质题集, 大家如果有好的面试题可以私信给群主(可加入志愿者群 QQ 群: 233864572). 为确保题集质量, 志愿者贡献出来的题集, 群主及各位梅峰谷平台组成员会审核, 个别地方会略加修改, 还请志愿者理解.
一, 面试 30 题
1.cache 后面能不能接其他算子, 它是不是 action 操作?
答: cache 可以接其他算子, 但是接了算子之后, 起不到缓存应有的效果, 因为会重新触发 cache.
cache 不是 action 操作
2.reduceByKey 是不是 action?
答: 不是, 很多人都会以为是 action,reduce rdd 是 action
3. 数据本地性是在哪个环节确定的?
具体的 task 运行在那他机器上, dag 划分 stage 的时候确定的
4.RDD 的弹性表现在哪几点?
1)自动的进行内存和磁盘的存储切换;
2)基于 Lingage 的高效容错;
3)task 如果失败会自动进行特定次数的重试;
4)stage 如果失败会自动进行特定次数的重试, 而且只会计算失败的分片;
5)checkpoint 和 persist, 数据计算之后持久化缓存
6)数据调度弹性, DAG TASK 调度和资源无关
7)数据分片的高度弹性, a. 分片很多碎片可以合并成大的, b.par
5. 常规的容错方式有哪几种类型?
1). 数据检查点, 会发生拷贝, 浪费资源
2). 记录数据的更新, 每次更新都会记录下来, 比较复杂且比较消耗性能
6.RDD 通过 Linage(记录数据更新)的方式为何很高效?
1)lazy 记录了数据的来源, RDD 是不可变的, 且是 lazy 级别的, 且 rDD
之间构成了链条, lazy 是弹性的基石. 由于 RDD 不可变, 所以每次操作就
产生新的 rdd, 不存在全局修改的问题, 控制难度下降, 所有有计算链条
将复杂计算链条存储下来, 计算的时候从后往前回溯
900 步是上一个 stage 的结束, 要么就 checkpoint
2)记录原数据, 是每次修改都记录, 代价很大
如果修改一个集合, 代价就很小, 官方说 rdd 是
粗粒度的操作, 是为了效率, 为了简化, 每次都是
操作数据集合, 写或者修改操作, 都是基于集合的
rdd 的写操作是粗粒度的, rdd 的读操作既可以是粗粒度的
也可以是细粒度, 读可以读其中的一条条的记录.
3)简化复杂度, 是高效率的一方面, 写的粗粒度限制了使用场景
如网络爬虫, 现实世界中, 大多数写是粗粒度的场景
7.RDD 有哪些缺陷?
1)不支持细粒度的写和更新操作(如网络爬虫),spark 写数据是粗粒度的
所谓粗粒度, 就是批量写入数据, 为了提高效率. 但是读数据是细粒度的也就是
说可以一条条的读
2)不支持增量迭代计算, Flink 支持
8. 说一说 Spark 程序编写的一般步骤?
答: 初始化, 资源, 数据源, 并行化, rdd 转化, action 算子打印输出结果或者也可以存至相应的数据存储介质, 具体的可看下图:
file:///E:/安装软件/有道笔记文件/qq19B99AF2399E52F466CC3CF7E3B24ED5/069fa7b471f54e038440faf63233acce/640.webp
9. Spark 有哪两种算子?
答: Transformation(转化)算子和 Action(执行)算子.
10. Spark 提交你的 jar 包时所用的命令是什么?
答: spark-submit.
11. Spark 有哪些聚合类的算子, 我们应该尽量避免什么类型的算子?
答: 在我们的开发过程中, 能避免则尽可能避免使用 reduceByKey,join,distinct,repartition 等会进行 shuffle 的算子, 尽量使用 map 类的非 shuffle 算子. 这样的话, 没有 shuffle 操作或者仅有较少 shuffle 操作的 Spark 作业, 可以大大减少性能开销.
12. 你所理解的 Spark 的 shuffle 过程?
答: 从下面三点去展开
1)shuffle 过程的划分
2)shuffle 的中间结果如何存储
3)shuffle 的数据如何拉取过来
可以参考这篇博文: http://www.cnblogs.com/jxhd1/p/6528540.html
13. 你如何从 Kafka 中获取数据?
1)基于 Receiver 的方式
这种方式使用 Receiver 来获取数据. Receiver 是使用 Kafka 的高层次 Consumer API 来实现的. receiver 从 Kafka 中获取的数据都是存储在 Spark Executor 的内存中的, 然后 Spark Streaming 启动的 job 会去处理那些数据.
2)基于 Direct 的方式
这种新的不基于 Receiver 的直接方式, 是在 Spark 1.3 中引入的, 从而能够确保更加健壮的机制. 替代掉使用 Receiver 来接收数据后, 这种方式会周期性地查询 Kafka, 来获得每个 topic+partition 的最新的 offset, 从而定义每个 batch 的 offset 的范围. 当处理数据的 job 启动时, 就会使用 Kafka 的简单 consumer API 来获取 Kafka 指定 offset 范围的数据
14. 对于 Spark 中的数据倾斜问题你有什么好的方案?
1)前提是定位数据倾斜, 是 OOM 了, 还是任务执行缓慢, 看日志, 看 WebUI
2)解决方法, 有多个方面
. 避免不必要的 shuffle, 如使用广播小表的方式, 将 reduce-side-join 提升为 map-side-join
. 分拆发生数据倾斜的记录, 分成几个部分进行, 然后合并 join 后的结果
. 改变并行度, 可能并行度太少了, 导致个别 task 数据压力大
. 两阶段聚合, 先局部聚合, 再全局聚合
. 自定义 paritioner, 分散 key 的分布, 使其更加均匀
详细解决方案参考博文《Spark 数据倾斜优化方法》
15.RDD 创建有哪几种方式?
1). 使用程序中的集合创建 rdd
2). 使用本地文件系统创建 rdd
3). 使用 hdfs 创建 rdd,
4). 基于数据库 db 创建 rdd
5). 基于 Nosql 创建 rdd, 如 hbase
6). 基于 s3 创建 rdd,
7). 基于数据流, 如 socket 创建 rdd
如果只回答了前面三种, 是不够的, 只能说明你的水平还是入门级的, 实践过程中有很多种创建方式.
16.Spark 并行度怎么设置比较合适
答: spark 并行度, 每个 core 承载 2~4 个 partition, 如, 32 个 core, 那么 64~128 之间的并行度, 也就是
设置 64~128 个 partion, 并行读和数据规模无关, 只和内存使用量和 CPU 使用
时间有关
17.Spark 中数据的位置是被谁管理的?
答: 每个数据分片都对应具体物理位置, 数据的位置是被 blockManager, 无论
数据是在磁盘, 内存还是 tacyan, 都是由 blockManager 管理
18.Spark 的数据本地性有哪几种?
答: Spark 中的数据本地性有三种:
a.PROCESS_LOCAL 是指读取缓存在本地节点的数据
b.NODE_LOCAL 是指读取本地节点硬盘数据
c.ANY 是指读取非本地节点数据
通常读取数据 PROCESS_LOCAL>NODE_LOCAL>ANY, 尽量使数据以 PROCESS_LOCAL 或 NODE_LOCAL 方式读取. 其中 PROCESS_LOCAL 还和 cache 有关, 如果 RDD 经常用的话将该 RDD cache 到内存中, 注意, 由于 cache 是 lazy 的, 所以必须通过一个 action 的触发, 才能真正的将该 RDD cache 到内存中.
19.rdd 有几种操作类型?
1)transformation,rdd 由一种转为另一种 rdd
2)action,
3)cronroller,crontroller 是控制算子, cache,persist, 对性能和效率的有很好的支持
三种类型, 不要回答只有 2 中操作
19.rdd 有几种操作类型?
1)transformation,rdd 由一种转为另一种 rdd
2)action,
3)cronroller,crontroller 是控制算子, cache,persist, 对性能和效率的有很好的支持
三种类型, 不要回答只有 2 中操作
20.Spark 如何处理不能被序列化的对象?
将不能序列化的内容封装成 object
21.collect 功能是什么, 其底层是怎么实现的?
答: driver 通过 collect 把集群中各个节点的内容收集过来汇总成结果, collect 返回结果是 Array 类型的, collect 把各个节点上的数据抓过来, 抓过来数据是 Array 型, collect 对 Array 抓过来的结果进行合并, 合并后 Array 中只有一个元素, 是 tuple 类型 (KV 类型的) 的.
22.Spaek 程序执行, 有时候默认为什么会产生很多 task, 怎么修改默认 task 执行个数?
答: 1)因为输入数据有很多 task, 尤其是有很多小文件的时候, 有多少个输入
block 就会有多少个 task 启动; 2)spark 中有 partition 的概念, 每个 partition 都会对应一个 task,task 越多, 在处理大规模数据的时候, 就会越有效率. 不过 task 并不是越多越好, 如果平时测试, 或者数据量没有那么大, 则没有必要 task 数量太多. 3)参数可以通过 spark_home/conf/spark-default.conf 配置文件设置:
spark.sql.shuffle.partitions 50 spark.default.parallelism 10
第一个是针对 spark sql 的 task 数量
第二个是非 spark sql 程序设置生效
23. 为什么 Spark Application 在没有获得足够的资源, job 就开始执行了, 可能会导致什么什么问题发生?
答: 会导致执行该 job 时候集群资源不足, 导致执行 job 结束也没有分配足够的资源, 分配了部分 Executor, 该 job 就开始执行 task, 应该是 task 的调度线程和 Executor 资源申请是异步的; 如果想等待申请完所有的资源再执行 job 的: 需要将 spark.scheduler.maxRegisteredResourcesWaitingTime 设置的很大; spark.scheduler.minRegisteredResourcesRatio 设置为 1, 但是应该结合实际考虑
否则很容易出现长时间分配不到资源, job 一直不能运行的情况.
24.map 与 flatMap 的区别
map: 对 RDD 每个元素转换, 文件中的每一行数据返回一个数组对象
flatMap: 对 RDD 每个元素转换, 然后再扁平化
将所有的对象合并为一个对象, 文件中的所有行数据仅返回一个数组
对象, 会抛弃值为 null 的值
25. 列举你常用的 action?
collect,reduce,take,count,saveAsTextFile 等
26.Spark 为什么要持久化, 一般什么场景下要进行 persist 操作?
为什么要进行持久化?
spark 所有复杂一点的算法都会有 persist 身影, spark 默认数据放在内存, spark 很多内容都是放在内存的, 非常适合高速迭代, 1000 个步骤
只有第一个输入数据, 中间不产生临时数据, 但分布式系统风险很高, 所以容易出错, 就要容错, rdd 出错或者分片可以根据血统算出来, 如果没有对父 rdd 进行 persist 或者 cache 的化, 就需要重头做.
以下场景会使用 persist
1)某个步骤计算非常耗时, 需要进行 persist 持久化
2)计算链条非常长, 重新恢复要算很多步骤, 很好使, persist
3)checkpoint 所在的 rdd 要持久化 persist,
lazy 级别, 框架发现有 checnkpoint,checkpoint 时单独触发一个 job, 需要重算一遍, checkpoint 前
要持久化, 写个 rdd.cache 或者 rdd.persist, 将结果保存起来, 再写 checkpoint 操作, 这样执行起来会非常快, 不需要重新计算 rdd 链条了. checkpoint 之前一定会进行 persist.
4)shuffle 之后为什么要 persist,shuffle 要进性网络传输, 风险很大, 数据丢失重来, 恢复代价很大
5)shuffle 之前进行 persist, 框架默认将数据持久化到磁盘, 这个是框架自动做的.
27. 为什么要进行序列化
序列化可以减少数据的体积, 减少存储空间, 高效存储和传输数据, 不好的是使用的时候要反序列化, 非常消耗 CPU
28. 介绍一下 join 操作优化经验?
答: join 其实常见的就分为两类: map-side join 和 reduce-side join. 当大表和小表 join 时, 用 map-side join 能显著提高效率. 将多份数据进行关联是数据处理过程中非常普遍的用法, 不过在分布式计算系统中, 这个问题往往会变的非常麻烦, 因为框架提供的 join 操作一般会将所有数据根据 key 发送到所有的 reduce 分区中去, 也就是 shuffle 的过程. 造成大量的网络以及磁盘 IO 消耗, 运行效率极其低下, 这个过程一般被称为 reduce-side-join. 如果其中有张表较小的话, 我们则可以自己实现在 map 端实现数据关联, 跳过大量数据进行 shuffle 的过程, 运行时间得到大量缩短, 根据不同数据可能会有几倍到数十倍的性能提升.
备注: 这个题目面试中非常非常大概率见到, 务必搜索相关资料掌握, 这里抛砖引玉.
29. 介绍一下 cogroup rdd 实现原理, 你在什么场景下用过这个 rdd?
答: cogroup 的函数实现: 这个实现根据两个要进行合并的两个 RDD 操作, 生成一个 CoGroupedRDD 的实例, 这个 RDD 的返回结果是把相同的 key 中两个 RDD 分别进行合并操作, 最后返回的 RDD 的 value 是一个 Pair 的实例, 这个实例包含两个 Iterable 的值, 第一个值表示的是 RDD1 中相同 KEY 的值, 第二个值表示的是 RDD2 中相同 key 的值. 由于做 cogroup 的操作, 需要通过 partitioner 进行重新分区的操作, 因此, 执行这个流程时, 需要执行一次 shuffle 的操作(如果要进行合并的两个 RDD 的都已经是 shuffle 后的 rdd, 同时他们对应的 partitioner 相同时, 就不需要执行 shuffle,),
场景: 表关联查询
30 下面这段代码输出结果是什么?
- --------------------------
- def joinRdd(sc:SparkContext) {
- val name= Array(
- Tuple2(1,"spark"),
- Tuple2(2,"tachyon"),
- Tuple2(3,"hadoop")
- )
- val score= Array(
- Tuple2(1,100),
- Tuple2(2,90),
- Tuple2(3,80)
- )
- val namerdd=sc.parallelize(name);
- val scorerdd=sc.parallelize(score);
- val result = namerdd.join(scorerdd);
- result .collect.foreach(println);
- }
- --------------------------
答案:
- (1,(Spark,100))
- (2,(tachyon,90))
- (3,(hadoop,80))
来源: http://www.bubuko.com/infodetail-3069758.html