本文的例子来自 Delta Lake 官方教程. 因为官方教程是基于商业软件 Databricks Community Edition 构建, 虽然教程中使用的软件特性都是开源 Delta Lake 版本所具备的, 但是考虑到国内的网络环境, 注册和使用 Databricks Community Edition 门槛较高. 所以本文尝试基于开源的 Jupiter Notebook 重新构建这个教程.
准备一个环境安装 Spark 和 jupyter
本文基于 Linux 构建开发环境, 同时使用的软件比如 conda,jupyter 以及 pyspark 等都可以在 Windows 和 MacOS 上找到, 理论上来说也完全可以在这两个系统上完成此教程.
假设系统已经安装 anaconda 或 miniconda, 我们使用 conda 来构建开发环境, 可以非常方便的安装 pyspark 和 jupyter notebook
- conda create --name spark
- conda activate spark
- conda install pyspark
- conda install -c conda-forge jupyterlab
环境变量设置
我们在设置一些环境变量之后, 就可以使用 pyspark 命令来创建 jupyter notebook 服务
- export SPARK_HOME=$HOME/miniconda3/envs/spark/lib/python3.7/site-packages/pyspark
- export PYSPARK_DRIVER_PYTHON=jupyter
- export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
启动服务 (注意这里的参数里指定了 Delta Lake 的 package,Spark 会帮忙自动下载依赖):
pyspark --packages io.delta:delta-core_2.11:0.5.0
接下去所有代码在 notebook 里运行
下载需要 parquet 文件
- %%bash
- rm -fr /tmp/delta_demo
- mkdir -p /tmp/delta_demo/loans/
- wget -O /tmp/delta_demo/loans/SAISEU19-loan-risks.snappy.parquet https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet
- ls -al /tmp/delta_demo/loans/
Delta Lake 的批流处理
在这里我们进入正题, 开始介绍 Delta Lake 的批流处理能力.
首先, 我们通过批处理的形式创建一张 Delta Lake 表, 数据来自前面我们下载的 parquet 文件, 可以和方便的把一张 parquet 表转换为 Delta Lake 表:
- import os
- import shutil
- from pyspark.sql.functions import *
- delta_path = "/tmp/delta_demo/loans_delta"
- # Delete a new delta table with the parquet file
- if os.path.exists(delta_path):
- print("Deleting path" + delta_path)
- shutil.rmtree(delta_path)
- # Create a new delta table with the parquet file
- spark.read.format("parquet").load("/tmp/delta_demo/loans") \
- .write.format("delta").save(delta_path)
- print("Created a Delta table at" + delta_path)
我来查一下这张表, 数据量是否正确:
- # Create a view on the table called loans_delta
- spark.read.format("delta").load(delta_path).createOrReplaceTempView("loans_delta")
- print("Defined view'loans_delta'")
- spark.sql("select count(*) from loans_delta").show()
- Defined view 'loans_delta'
- +--------+
- |count(1)|
- +--------+
- | 14705|
- +--------+
接下去我们会使用 Spark Streaming 流式写入这张 Delta Lake 表, 同时展示 Delta Lake 的 Schema enforcement 能力 (本文省略了流式写 Parquet 表的演示部分, 那部分指出了 parquet 文件的不足, 比如无法强制指定 Schema )
- import random
- from pyspark.sql.functions import *
- from pyspark.sql.types import *
- def random_checkpoint_dir():
- return "/tmp/delta_demo/chkpt/%s" % str(random.randint(0, 10000))
- # User-defined function to generate random state
- states = ["CA", "TX", "NY", "IA"]
- @udf(returnType=StringType())
- def random_state():
- return str(random.choice(states))
- # Generate a stream of randomly generated load data and append to the delta table
- def generate_and_append_data_stream_fixed(table_format, table_path):
- stream_data = spark.readStream.format("rate").option("rowsPerSecond", 50).load() \
- .withColumn("loan_id", 10000 + col("value")) \
- .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer")) \
- .withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000)) \
- .withColumn("addr_state", random_state()) \
- .select("loan_id", "funded_amnt", "paid_amnt", "addr_state") # *********** FIXED THE SCHEMA OF THE GENERATED DATA *************
- query = stream_data.writeStream \
- .format(table_format) \
- .option("checkpointLocation", random_checkpoint_dir()) \
- .trigger(processingTime="10 seconds") \
- .start(table_path)
- return query
启动两个流式作业:
- stream_query_1 = generate_and_append_data_stream_fixed(table_format = "delta", table_path = delta_path)
- stream_query_2 = generate_and_append_data_stream_fixed(table_format = "delta", table_path = delta_path)
因为 Delta Lake 的乐观锁机制, 多个流可以同时写入一张表, 并保证数据的完整性.
通过批处理的方式来查询一下当前表中的数据量, 我们发现有数据被插入了:
- spark.sql("select count(*) from loans_delta").show()
- +--------+
- |count(1)|
- +--------+
- | 17605|
- +--------+
接下去我们停止所有流的写入, 接下去会展示 Delta Lake 的其他特性
- # Function to stop all streaming queries
- def stop_all_streams():
- # Stop all the streams
- print("Stopping all streams")
- for s in spark.streams.active:
- s.stop()
- print("Stopped all streams")
- print("Deleting checkpoints")
- shutil.rmtree("/tmp/delta_demo/chkpt/", True)
- print("Deleted checkpoints")
- stop_all_streams()
- Schema evolution(Schema 演化)
Delta Lake 支持 Schema 演化, 也就是说我们可以增加或改变表字段. 接下去的批处理 SQL 会新增加一些数据, 同时这些数据比之前的多了一个 "closed" 字段. 我们将新的 DF 配置参数 mergeSchema 为 true 来显示指明 Delta Lake 表 Schema 的演化:
- cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed']
- items = [
- (1111111, 1000, 1000.0, 'TX', True),
- (2222222, 2000, 0.0, 'CA', False)
- ]
- loan_updates = spark.createDataFrame(items, cols) \
- .withColumn("funded_amnt", col("funded_amnt").cast("int"))
- loan_updates.write.format("delta") \
- .mode("append") \
- .option("mergeSchema", "true") \
- .save(delta_path)
来看一下插入新数据之后的表内容, 新增加了 closed 字段, 之前的老数据行这个字段默认为 null.
- spark.read.format("delta").load(delta_path).show()
- +-------+-----------+---------+----------+------+
- |loan_id|funded_amnt|paid_amnt|addr_state|closed|
- +-------+-----------+---------+----------+------+
- | 0| 1000| 182.22| CA| null|
- | 1| 1000| 361.19| WA| null|
- | 2| 1000| 176.26| TX| null|
- | 3| 1000| 1000.0| OK| null|
- | 4| 1000| 249.98| PA| null|
- | 5| 1000| 408.6| CA| null|
- | 6| 1000| 1000.0| MD| null|
- | 7| 1000| 168.81| OH| null|
- | 8| 1000| 193.64| TX| null|
- | 9| 1000| 218.83| CT| null|
- | 10| 1000| 322.37| NJ| null|
- | 11| 1000| 400.61| NY| null|
- | 12| 1000| 1000.0| FL| null|
- | 13| 1000| 165.88| NJ| null|
- | 14| 1000| 190.6| TX| null|
- | 15| 1000| 1000.0| OH| null|
- | 16| 1000| 213.72| MI| null|
- | 17| 1000| 188.89| MI| null|
- | 18| 1000| 237.41| CA| null|
- | 19| 1000| 203.85| CA| null|
- +-------+-----------+---------+----------+------+
- only showing top 20 rows
新的数据行具有 closed 字段:
- spark.read.format("delta").load(delta_path).filter(col("closed") == True).show()
- +-------+-----------+---------+----------+------+
- |loan_id|funded_amnt|paid_amnt|addr_state|closed|
- +-------+-----------+---------+----------+------+
- |1111111| 1000| 1000.0| TX| true|
- +-------+-----------+---------+----------+------+
Delta Lake 表的删除操作
除了常规的插入操作, Delta Lake 还支持 update 和 delete 等功能, 可以更新表格内容. 下面展示删除操作, 我们希望删除表格中贷款已经被完全还清的记录. 下面几条命令可以简单和清晰的展示删除过程.
首先, 我们看看符合条件的记录有多少条:
- spark.sql("SELECT COUNT(*) FROM loans_delta WHERE funded_amnt = paid_amnt").show()
- +--------+
- |count(1)|
- +--------+
- | 5134|
- +--------+
然后, 我们执行一个 delete 命令:
- from delta.tables import *
- deltaTable = DeltaTable.forPath(spark, delta_path)
- deltaTable.delete("funded_amnt = paid_amnt")
最后, 我们看一下删除后的结果, 发现符合条件的记录都已被删除:
- spark.sql("SELECT COUNT(*) FROM loans_delta WHERE funded_amnt = paid_amnt").show()
- +--------+
- |count(1)|
- +--------+
- | 0|
- +--------+
版本历史和回溯
Delta Lake 还具有很强大历史版本记录和回溯功能. history() 方法清晰的展示了刚才那张表的修改记录, 包括最后一次 Delete 操作.
- deltaTable.history().show()
- +-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+
- |version| timestamp|userId|userName| operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
- +-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+
- | 10|2020-02-22 22:14:06| null| null| DELETE|[predicate -> ["(...|null| null| null| 9| null| false|
- | 9|2020-02-22 22:13:57| null| null| WRITE|[mode -> Append, ...|null| null| null| 8| null| true|
- | 8|2020-02-22 22:13:52| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 6| null| true|
- | 7|2020-02-22 22:13:50| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 6| null| true|
- | 6|2020-02-22 22:13:42| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 4| null| true|
- | 5|2020-02-22 22:13:40| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 4| null| true|
- | 4|2020-02-22 22:13:32| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 2| null| true|
- | 3|2020-02-22 22:13:30| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 2| null| true|
- | 2|2020-02-22 22:13:22| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 1| null| true|
- | 1|2020-02-22 22:13:20| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 0| null| true|
- | 0|2020-02-22 22:13:18| null| null| WRITE|[mode -> ErrorIfE...|null| null| null| null| null| true|
- +-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+
如果我们希望看一下刚才删除操作前的数据表状态, 可以很方便的回溯到前一个快照点, 并进行再次查询 (我们可以看到被删除的记录又出现了).
- previousVersion = deltaTable.history(1).select("version").collect()[0][0] - 1
- spark.read.format("delta") \
- .option("versionAsOf", previousVersion) \
- .load(delta_path) \
- .createOrReplaceTempView("loans_delta_pre_delete") \
- spark.sql("SELECT COUNT(*) FROM loans_delta_pre_delete WHERE funded_amnt = paid_amnt").show()
- +--------+
- |count(1)|
- +--------+
- | 5134|
- +--------+
结论
本文通过 jupyter notebook 工具演示了 Delta Lake 的官方教程, 你可以在本文末尾下载到完整的 notebook 文件.
来源: https://yq.aliyun.com/articles/745782