统一API语法:
df.write.mode().format().option(K,V).save(PATH)
coding:utf8
import ...
if __name__ == '_main__':
#O.构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
getorcCreate()
sc = spark.sparkContext
#1.读取数据集
schema = StructType().add("user_id",StringType(),nullable=True).\
add("movie_id",IntegerType(),nullable=True).
add("rank",IntegerType(),nullable=True).
add("ts",StringType),nullable=True)
df = spark.read.format("csv").\
option("sep","\t").\
option("header",False).\
option("encoding","utf-8").\
schema(schema=schema).\
Load("../data/input/sql/u.data")
#Write text写出,只能写出一个列的数据,需要将df转换为单列df
#这里用F对象里的concat_ws函数,指定---分隔,连接指定的字段
df.select(F.concat_ws("---","user_id","movid_id","rank","ts")).\
write.\
mode("overwrite").\
format("text").\
save("../data/output/sql/text")
# Write csv
df.write.mode("overwrite").\
format("csv").\
option("sep", ";").
option("header", True).\
save("../data/output/sql/csv")
# Write json
df.write.mode("overwrite").\
format("json").\
save("../data/output/sql/json")
# Write parquet
df.write.mode("overwrite").\
format("parqeut").\
save("../data/output/sql/parquet")
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- shangjiatang.cn 版权所有 湘ICP备2022005869号-4
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务