黑洞

这里藏着一些独特的想法

0%

Pyspark学习之基操速查

本文只是站长的学习记录,内容并不完善,有时间继续补充。

Driver初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from pyspark.sql import SparkSession
import os

# 设置环境变量
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

if __name__ == '__main__':
# todo 创建spark会话
spark = SparkSession.builder \
.appName('first app') \
.master('local[*]') \
.getOrCreate()

sc = spark.sparkContext

"""
master(driver运行的地方)可指定为以下几种:
本地模式 'local[*]'
Standalone 'spark://node1:7077'
Standalone高可用 'spark://node1:7077,node2:7077'
Yarn集群 'yarn-cluster'
Yarn客户端 'yarn-client'
"""

# 结束运行
spark.stop()

RDD

并行化创建

1
rdd1 = sc.parallelize([1, 2, 3, 4, 5])

读写

读取为rdd

1
rdd1 = sc.textFile('/export/data/a.txt')

1
2
rdd1 = sc.textFile('/export/data/log.txt')
rdd1.saveAsTextFile('hdfs://uv/result')

小文件读取

在实际项目中,有时往往处理的数据文件属于小文件(每个文件数据数据量很小,比如KB,几十MB等),文件数量又很大,如果一个个文件读取为RDD的一个个分区,计算数据时很耗时性能低下,使用SparkContext中提供:wholeTextFiles类,专门读取小文件数据。

1
2
3
4
# todo 读取小文件
resultRDD = sc.wholeTextFiles("file:///export/data/ratings100/")
# 取分区数
print("whole textFile numpartitions:", resultRDD.getNumPartitions())

如果从HDFS读取海量数据,应用运行在YARN上,默认情况下,RDD分区数目等于HDFS上Block块数目。

Dataframe

创建

反射推断创建

1
2
3
4
5
6
# todo 反射推断创建
from pyspark import Row

rdd = sc.parallelize([('jack', 18), ('rose', 16)])
schema_rdd = rdd.map(lambda p: Row(name=p[0], age=int(p[1])))
df_person = spark.createDataFrame(schema_rdd)

StructType创建

1
2
3
4
5
6
7
8
9
10
11
12
# todo StructType方式
from pyspark.sql.types import *

rdd = sc.parallelize([('jack', 18), ('rose', 16)])

# 定义schema 分别是 列名 类型 是否允许为空
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])

df_person = spark.createDataFrame(rdd, schema=schema)

读写

读取为Dataframe

1
2
3
4
5
6
7
8
9
# todo 读取csv
spark.read.format('csv') \
.option('header', True) \
.option('sep', ',') \
.option('inferSchema', True) \
.load('/export/data/abc.csv')

# todo 读取json
peopleDF = spark.read.format("json").load("/datas/resources/people.json")

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# todo 导出到数据库
payType_percent_df \
.coalesce(1) \
.write \
.format('jdbc') \
.mode('overwrite') \
.option('driver', 'com.mysql.jdbc.Driver') \
.option('url', 'jdbc:mysql://node1:3306?serverTimezone=UTC&characterEncoding=utf8') \
.option('dbtable', 'test_ads.payType_percent') \
.option('user', 'root') \
.option('password', 'hadoop') \
.save()

"""
保存模式:
第一种:Append 追加模式,当数据存在时,继续追加;
第二种:Overwrite 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;
第三种:ErrorIfExists 存在及报错;
第四种:Ignore 忽略,数据存在时不做任何操作;
实际项目依据具体业务情况选择保存模式,通常选择Append和Overwrite模式。
"""

查询

pySpark提供两种查询风格,一种是DSL,另一种是SQL风格。个人喜欢SQL风格,因为自己比较熟悉SQL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# SQL风格
# 将df注册为临时表
data_df.createOrReplaceTempView('t_table')

countryConfirms = spark.sql('''
select country, count(distinct InvoiceNO) as confirms
from t_table
where InvoiceNO not like 'C%'
group by country
''')

# DSL风格
import pyspark.sql.functions as F
countryReturnInvoiceDF = data \
.filter("InvoiceNo like 'C%'") \
.groupBy('country') \
.agg(F.countDistinct('InvoiceNo').alias('cnt')) \
.orderBy('cnt', ascending=False) \
.limit(10)

数据清洗

关键函数

  • 时间转换
    • from_unixtime
    • to_timestamp
  • 空值处理
    • dropna
    • fillna
  • 重复值处理
    • dropDuplicates
    • monotonically_increasing_id

结合实际业务需求,不能上来就一通乱删!

空值填充

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
df_miss = spark.createDataFrame([
(1, 143.5, 5.6, 28, 'M', 100000),
(2, 167.2, 5.4, 45, 'M', None),
(3, None, 5.2, None, None, None),
(4, 144.5, 5.9, 33, 'M', None),
(5, 133.2, 5.7, 54, 'F', None),
(6, 124.1, 5.2, None, 'F', None),
(7, 129.2, 5.3, 42, 'M', 76000), ],
['id', 'weight', 'height', 'age', 'gender', 'income'])

# todo 查询每行有几个缺失值
row__collect = df_miss.rdd.map(lambda row: (row['id'], sum([c == None for c in row]))).collect()
print(row__collect)

# todo 查询每列的缺失比例
from pyspark.sql import functions as F

df_miss.agg(*[(1 - F.count(c) / F.count('*')).alias(c + '_missing_ratio') for c in df_miss.columns]).show()

# todo 查询每列的缺失数

df_miss.agg(*[(F.count('*') - F.count(c)).alias(c + '_missing_cnt') for c in df_miss.columns]).show()

# todo income字段缺失比例高于60% 故丢弃

df_miss_new = df_miss.select([c for c in df_miss.columns if c != 'income'])
df_miss_new.show()

# todo 求出每列平均值

list_mean = df_miss_new.agg(*[F.mean(c).alias(c) for c in df_miss_new.columns if c != 'gender']) \
.toPandas() \
.to_dict(orient='records')[0]

list_mean['gender'] = 'missing'

print(list_mean)

# todo 填充缺失值
cleaned_df = df_miss_new.fillna(list_mean)
cleaned_df.show()
1
2
3
4
df = spark.read.json('file:///export/data/mini.json') \
.filter("receivable < 10000 and storeProvince != 'null'") \
.dropna(thresh=3, subset=['storeProvince', 'storeCity', 'storeDistrict']) \
.withColumn('date', F.from_unixtime(F.col('dateTS').substr(0, 10), 'yyyy-MM-dd'))

重复值过滤

1
2
3
4
5
6
7
8
9
10
11
12
# 删除完全一样的记录
df2 = df.dropDuplicates()

# 除了某些字段,删除其他字段值都完全一样的记录
df3 = df2.dropDuplicates(subset = [c for c in df2.columns if c!='id'])

# 查看某一列是否有重复值
import pyspark.sql.functions as fn
df3.agg(fn.count('id').alias('id_count'),fn.countDistinct('id').alias('distinct_id_count')).collect()

# 对于id这种无意义的列值重复,添加另外一列自增id
df3.withColumn('new_id',fn.monotonically_increasing_id()).show()

共享变量

广播变量

1
2
3
4
5
6
7
8
9
# todo 广播变量
kv_fruit = sc.parallelize([(1, 'apple'), (2, 'orange'), (3, 'banana'), (4, 'grape')])
# 转为字典方便查询
dict_fruit = kv_fruit.collectAsMap()
# 创建广播变量
broadcast_dict_fruit = sc.broadcast(dict_fruit)
fruit_ids = sc.parallelize([2, 1, 4, 3])
# 用value属性取出变量值
result = fruit_ids.map(lambda i: broadcast_dict_fruit.value[i])

累加器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 定义累加器,num初始值为10
num = sc.accumulator(10)

#定义累加函数实现累加功能
def f(x):
global num
num.add(x)

rdd = sc.parallelize([20, 30, 40, 50])
# 遍历rdd中的每个元素,进行求和
rdd.foreach(f)
# final为150
final = num.value

#缓存rdd
rdd.cache()

累加器也是惰性求值,foreach属于action算子,因此能得到正确结果;若没用action算子直接取值,得到的是初始值。

注意,在使用完累加器后要记得及时缓存RDD,不然下一次使用action算子会重复累加,得到错误结果!

持久化

Cache/Persist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# todo cache / persist
from pyspark.storagelevel import StorageLevel

# 内存级持久化
rdd1.cache()

# 内存和硬盘上持久化
rdd2.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)

# 使用action算子使缓存生效
rdd1.count()
rdd2.count()

# 释放
rdd1.unpersist()
rdd2.unpersist()

Checkpoint

1
2
3
4
5
6
7
8
9
10
11
# 设置检查点目录
sc.setCheckpointDir("hdfs://export/data/checkpoint")

# 启用检查点
rdd1.checkpoint()

# 使用任意action算子让检查点生效 备份rdd结果
rdd1.count()

# 再次执行 从检查点读取数据
rdd1.count()

UDF 用户定义函数

spark udf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# todo 各个省份的支付类型比例
from pyspark.sql.functions import udf

df_joined_top3.select('storeProvince', 'payType').createOrReplaceTempView('pp')

@udf('string')
def udf_to_percent(p):
return str(round(p * 100)) + '%'

payType_percent = spark.sql("""
SELECT storeProvince, payType, (COUNT(*) / pcnt) AS percent
FROM (
SELECT *, COUNT(*) OVER(PARTITION BY storeProvince) AS pcnt FROM pp
) AS sub
GROUP BY storeProvince, payType, pcnt;
""").select('storeProvince', 'payType', udf_to_percent('percent'))

pandas udf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# todo 各个省份的支付类型比例
from pyspark.sql.functions import pandas_udf
import pandas as pd

# 启用Arrow传输
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

df_joined_top3.select('storeProvince', 'payType').createOrReplaceTempView('pp')

def test(x):
return str(round(float(x) * 100)) + '%'

# 使用 pandas udf 需要注意类型转换,再用apply方法转换序列里的每一个元素!
@pandas_udf('string')
def udf_to_percent2(p: pd.Series) -> pd.Series:
p = p.astype(str).apply(test)
return p

payType_percent = spark.sql("""
SELECT storeProvince, payType, (COUNT(*) / pcnt) AS percent
FROM (
SELECT *, COUNT(*) OVER(PARTITION BY storeProvince) AS pcnt FROM pp
) AS sub
GROUP BY storeProvince, payType, pcnt;
""").select('storeProvince', 'payType', udf_to_percent2('percent'))
如果觉得文章写得不错或对您有帮助,请我喝杯柠檬茶吧!