PySpark基本入门(附python代码示例)
在整理数据,处理数据上。对于大规模数据分析,相较于hadoop来说,spark是个更为方便的工具。今天为大家带来pyspark的快速入门,希望对大家的工作和学习有帮助。
基本概念介绍
首先介绍一下spark中常见的基本概念:
RDD:弹性分布式数据集的简称,是一个分布式对象集合,本质上是一个只读的分区记录集合。不能直接修改,只能通过一定的转换操作(map,reduce,join,groupby)来创建新的RDD。
DAG:有向无环图,反应了RDD之间的依赖关系。
Executor:一个进程,负责运行任务。
Application:用户编写的spark应用程序。
Task:运行在Excutor上的工作单元。
Job:一个job包含多个RDD以及对应的RDD上的各种操作。
Stage:作业的基本调度单位。一个作业会被分为多组Task,每组任务称为一个stage。
其中,RDD是一种高度受限的内存模型,一次只能对RDD全集进行修改。听完上述说明,大家可能理解起来很抽象,接下来我将介绍RDD编程模型,并通过程序例子来说明,方便大家理解。
RDD编程例子
1。从文件系统中加载数据并转化成RDD格式
下面的例程可以将文本文件转化成RDD数据格式读入,便于Spark对RDD数据并行处理。
frompysparkimportSparkConf,SparkContext
scSparkContext()
可以通过sc。textFiles来将text文件转化成RDD格式的数据。
如果是本地文件,要加上file:
linessc。textFiles(file:usrlocalsparlexample。txt)
下面三条语句是完全等价的
linessc。textFiles(hdfs:localhost:9000userhadoopexample。txt)
linessc。textFiles(userhadoopexample。txt)
linessc。textFiles(example。txt)
lines。foreach(print)
2。将数组转化成RDD格式
array〔1,2,3,4,5〕
通过sc。parallelize将数组转化成RDD格式
rddsc。parallelize(array)
rdd。foreach(print)
1
2
3
4
5
3。RDD操作:Transformation
1。Filter
linessc。parallelize(〔Sparkisveryfast,MynameisLiLei〕)
筛选出含有Spark的行,操作为并行。
linesWithSparklines。filter(lambdaline:Sparkinline)
每行并行打印
linesWithSpark。foreach(print)
Sparkisveryfast
2。Map
linessc。parallelize(〔Sparkisveryfast,MynameisLiLei〕)
每一行通过map并行处理。
wordslines。map(lambdaline:line。split())
words。foreach(print)
〔Spark,is,very,fast〕
〔My,name,is,LiLie〕
3。groupByKey
wordssc。parallelize(〔(Hadoop,1),(is,1),(good,1),
(Spark,1),(is,1),(fast,1),(Spark,1),(is,1),(better,1)〕)
groupByKey()应用于(K,V)键值对的数据集时,返回一个新的(K,Iterable)形式的数据集
words1words。groupByKey()
words1。foreach(print)
(Hadoop,pyspark。resultiterable。ResultIterableobjectat0x7fb210552c88)
(better,pyspark。resultiterable。ResultIterableobjectat0x7fb210552e80)
(fast,pyspark。resultiterable。ResultIterableobjectat0x7fb210552c88)
(good,pyspark。resultiterable。ResultIterableobjectat0x7fb210552c88)
(Spark,pyspark。resultiterable。ResultIterableobjectat0x7fb210552f98)
(is,pyspark。resultiterable。ResultIterableobjectat0x7fb210552e10)
4。reduceByKey
wordssc。parallelize(〔(Hadoop,1),(is,1),(good,1),(Spark,1),
(is,1),(fast,1),(Spark,1),(is,1),(better,1)〕)
reduceByKey:相同的key通过指定操作进行聚合,下方代码利用求和进行聚合
words1words。reduceByKey(lambdaa,b:ab)
words1。foreach(print)
(good,1)
(Hadoop,1)
(better,1)
(Spark,2)
(fast,1)
(is,3)
4。RDD操作:Action
由于Spark的惰性机制,当RDD通过Transformation操作,直到遇到Action操作后,才会执行真正的计算,从文件中加载数据,完成一次又一次Transformation操作,最终,完成Action操作得到结果。
rddsc。parallelize(〔1,2,3,4,5〕)
rdd的数量
rdd。count()
5
第一行rdd
rdd。first()
1
前三行rdd
rdd。take(3)
〔1,2,3〕
rdd。reduce(lambdaa,b:ab)
15
以数组的形式返回rdd中所有元素
rdd。collect()
〔1,2,3,4,5〕
rdd。foreach(lambdaelem:print(elem))
总结
通过将输入(文件,数组)转化成RDD,并将多个简单的Transformation和Action操作进行串联,Spark可以高效的完成很多复杂数据的处理。同时,在完成大规模的数据处理后,我们也可以利用Spark中内置的机器学习算法来对这些大规模的数据进行学习和建模。Spark中内部实现了很多分布式机器学习算法,例如SVM,Word2Vec等,我们将在后面的文章分享