Pyspark获取并处理RDD数据代码实例
弹性分布式数据集(RDD)是一组不可变的JVM对象的分布集,可以用于执行高速运算,它是ApacheSpark的核心。
在pyspark中获取和处理RDD数据集的方法如下:
1.首先是导入库和环境配置(本测试在linux的pycharm上完成)
importos frompysparkimportSparkContext,SparkConf frompyspark.sql.sessionimportSparkSession os.environ["PYSPARK_PYTHON"]="/usr/bin/python3" conf=SparkConf().setAppName('test_rdd') sc=SparkContext('local','test',conf=conf) spark=SparkSession(sc)
2.然后,提供hdfs分区数据的路径或者分区表名
txt_File=r"hdfs://host:port/apps/hive/warehouse/数据库名.db/表名/分区名/part-m-00029.deflate"#part-m-00029.deflate
#txt_File=r"hdfs://host:port/apps/hive/warehouse/数据库名.db/表名"#hivetable
3.sc.textFile进行读取,得到RDD格式数据<还可以用spark.sparkContext.parallelize(data)来获取RDD数据>,参数中还可设置数据被划分的分区数
txt_=sc.textFile(txt_File)
4.基本操作:
- type(txt_):显示数据类型,这时属于'pyspark.rdd.RDD'
- txt_.first():获取第一条数据
- txt_.take(2):获取前2条数据,形成长度为2的list
- txt_.take(2)[1].split('\1')[1]:表示获取前两条中的第[1]条数据(也就是第2条,因为python的索引是从0开始的),并以'\1'字符分隔开(这要看你的表用什么作为分隔符的),形成list,再获取该list的第2条数据
- txt_.map(lambdax:x.split('\1')):使用lambda函数和map函数快速处理每一行数据,这里表示将每一行以'\1'字符分隔开,每一行返回一个list;此时数据结构是:'pyspark.rdd.PipelinedRDD'
- txt_.map(lambdax:(x,x.split('\1'))).filter(lambday:y[0].startswith('北京')):表示在返回(x,x.split('\1'))后,进行筛选filter,获取其中以'北京'开头的行,并按照相同格式(例如,这里是(x,x.split('\1'))格式,即原数据+分割后的列表数据)返回数据
- txt_.collect():返回所有RDD数据元素,当数据量很大时谨慎操作
- txt_.toDF():不能直接转成DataFrame格式,需要设置Schema
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。