1 Spark SQL 分组聚合统计
groupBy 与 agg 搭配使用, agg(*exprs), 这个方法是 GroupedData 上用于计算聚合数据的方法,*exprs 可以是一个 string 到 string 组成的字典, key 是需要聚合的列名, value 是用于计算的聚合函数的名称. 聚 合函数可以的取值有: avg, max, min, sum, count,mean.agg 方法返回一个聚合后的 DataFrame 对象.
- df = spark.read.CSV('/sql/customers.csv',header=True)
- df.columns
- df.groupBy('Genre').agg({"Age":"mean","Annual Income (k$)":"max","Spending Score (1-100)":"count"}).show()
- +------+-----------------------------+-----------------------+------------------+
- | Genre|count(Spending Score (1-100))|max(Annual Income (k$))| avg(Age)|
- +------+-----------------------------+-----------------------+------------------+
- |Female| 112| 99|38.098214285714285|
- | Male| 88| 99| 39.80681818181818|
- +------+-----------------------------+-----------------------+------------------+
除了使用字符串以字典的方式指定, 其实还可以使用聚合后的列的表达式来做相同的是, 这 需要借助 pyspark.sql.functions 模块中的方法.
- from pyspark.sql.functions import *
- df = spark.read.CSV('/sql/customers.csv',header=True)
- df.columns
- df.groupBy('Genre').agg(mean(df.Age)).show()
- +------+------------------+
- | Genre| avg(Age)|
- +------+------------------+
- |Female|38.098214285714285|
- | Male| 39.80681818181818|
- +------+------------------+
apply(udf), 使用 pandas 中的用户自定义函数作用在 GroupedData 的每一组数据之上, 返 回结果作为一个 DataFrame.udf 用户自定义函数接收 pandas.DataFrame 作为参数, 返回另 外一个 pandas.DataFrame 对象. 这个方法是 pyspark2.3 中加入的新方法. 通过 @pandas_udf 表示这是一个 pandas 的方法, 参数为 id long,v double, 指定 PandasUDFType 为分组 map 操作.(测试未通过)
- from pyspark.sql.functions import pandas_udf, PandasUDFType
- from pyspark.sql import SparkSession
- spark = SparkSession.builder.master('spark://hadoopmaste:7077').appName('apply').getOrCreate()
- df = spark.createDataFrame( [(1, 10.0), (1, 21.0), (2, 34.0), (2, 56.0), (2, 19.0)], ("id",
- "v"))
- @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
- def normalize(PDF):
- v = PDF.v
- print(type(v),type(PDF))
- return PDF.assign(v=(v - v.mean()) / v.std())
- df.groupby("id").apply(normalize).show()
- spark.stop()
Pandas 中 DataFrame 的 assign 方法是新建一个 DataFrame 而不会改变原来的 DataFrame.
avg(*cols), 给算给定的数值类型的列的平均值
- df = spark.read.CSV('/sql/customers.csv',header=True)
- df.select(df.Age.cast('int').alias('age'),'Genre').groupBy('Genre').avg('age').show()
- +------+------------------+
- | Genre| avg(age)|
- +------+------------------+
- |Female|38.098214285714285|
- | Male| 39.80681818181818|
- +------+------------------+
count(), 返回每个分组中数据的条数
- df = spark.read.CSV('/sql/customers.csv',header=True)
- df.select(df.Age.cast('int').alias('age'),'Genre').groupBy('Genre').count().show()
max(*cols), 计算给定列中数值最大的值.
- df = spark.read.CSV('/sql/customers.csv',header=True)
- df.select(df.Age.cast('int').alias('age'),'Genre','Annual Income
- (k$)').groupBy('Genre').max().show()
mean(*cols)计算对应列的均值, 列需要是数值类型
- spark.read.CSV('/sql/customers.csv',header=True)
- df.select(df.Age.cast('int').alias('age'),'Genre','Annual Income
- (k$)').groupBy('Genre').mean().show()
min(*cols) 计算对应列的最小值, 列数值类型需要是数值类型
- spark.read.CSV('/sql/customers.csv',header=True)
- df.select(df.Age.cast('int').alias('age'),'Genre','Annual Income
- (k$)').groupBy('Genre').min().show()
sum(*cols), 计算指定列的和, 列的类型需要是数值类型.
- spark.read.CSV('/sql/customers.csv',header=True)
- df.select(df.Age.cast('int').alias('age'),'Genre','Annual Income (k$)').groupBy('Genre').sum().show()
2 总结
Python 技术栈与 Spark 交叉数据分析双向整合, 让我们在大数据融合分析达到了通用, 可以发现 Spark SQL 其实很大部分功能和 Pandas 雷同
秦凯新 于深圳 201812172352
来源: https://juejin.im/post/5c17c7ff5188250d9e6045fd