0%

Spark2.x之RDD编程(python版)

RDD编程前的准备工作

  • /usr/local/spark/mycode/rdd目录下新建一个word.txt文件,文件里面随便输入几行英文语句,用于测试。
1
2
mkdir -p usr/local/spark/mycode/rdd
vim word.txt
  • 按照下面的命令启动Hadoop中的HDFS组件:
1
2
cd  /usr/local/hadoop
./sbin/start-dfs.sh
  • 按照下面命令启动pyspark:
1
2
cd /usr/local/spark
./bin/pyspark

从文件系统中加载数据创建RDD

  • Spark可以使用textFile()方法来从文件系统中加载数据创建RDD,该方法把文件的URI作为参数,这个URI可以是本地文件系统的地址,或者是分布式文件系统HDFS的地址,或者是Amazon S3的地址等等。

  • 从本地文件加载数据:

1
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
  • 把刚才在本地文件系统中的“/usr/local/spark/mycode/rdd/word.txt”上传到HDFS文件系统的hadoop用户目录下。
1
2
cd /usr/local/hadoop
bin/hdfs dfs -put /usr/local/spark/mycode/wordcount/word.txt /user/hadoop
  • 进入pyspark的shell界面,从HDFS文件系统加载数据:下面三句语句效果相同
1
2
3
lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
lines = sc.textFile("/user/hadoop/word.txt")
lines = sc.textFile("word.txt")
  • 注意,在使用Spark读取文件时,需要说明以下几点:
    1. 如果使用了本地文件系统的路径,那么,必须要保证在所有的worker节点上,也都能够采用相同的路径访问到该文件,比如,可以把该文件拷贝到每个worker节点上,或者也可以使用网络挂载共享文件系统。
    2. textFile()方法的输入参数,可以是文件名,也可以是目录,也可以是压缩文件等。比如,textFile(“/my/directory”), textFile(“/my/directory/.txt”), and textFile(“/my/directory/.gz”).
    3. textFile()方法也可以接受第2个输入参数(可选),用来指定分区的数目。默认情况下,Spark会为HDFS的每个block创建一个分区(HDFS中每个block默认是128MB)。你也可以提供一个比block数量更大的值作为分区数目,但是,你不能提供一个小于block数量的值作为分区数目。

通过并行集合创建RDD

  • 可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。
1
2
nums = [1,2,3,4,5]
rdd = sc.parallelize(nums)

RDD操作

转换操作

  • 对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。
  • 下面列出一些常见的转换操作(Transformation API):
    1. filter(func):筛选出满足函数func的元素,并返回一个新的数据集。
    2. map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集。
    3. flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果。
    4. groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集。
    5. reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合。

行动操作

  • 行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。
  • 下面列出一些常见的行动操作(Action API):
    1. count() 返回数据集中的元素个数。
    2. collect() 以数组的形式返回数据集中的所有元素。
    3. first() 返回数据集中的第一个元素。
    4. take(n) 以数组的形式返回数据集中的前n个元素。
    5. reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素。
    6. foreach(func) 将数据集中的每个元素传递到函数func中运行。

实例

  • filter([func]):
1
2
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
lines.filter(lambda line : "Spark" in line).count()
  • map([func]).reduce([func]):map进行结果集塑形,而reduce进行结果集函数应用:
1
2
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
lines.map(lambda line : len(line.split(" "))).reduce(lambda a,b : (a > b and a or b))
  • lines.map(lambda line : len(line.split(” “)))会把每行文本都传递给匿名函数,也就是传递给Lamda表达式line : len(line.split(” “))中的line,然后执行处理逻辑len(line.split(” “))。reduce()操作每次接收两个参数,取出较大者留下,然后再继续比较。

缓存机制

  • 在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。如果整个Spark程序中只有一次行动操作,这当然不会有什么问题。但是,在一些情形下,我们需要多次调用不同的行动操作,这就意味着,每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。

  • 下面是一个多次计算同一个RDD的例子:上面代码执行过程中,前后共触发了两次从头到尾的计算。

1
2
3
4
list = ["Hadoop","Spark","Hive"]
rdd = sc.parallelize(list)
print(rdd.count()) //行动操作,触发一次真正从头到尾的计算
print(','.join(rdd.collect())) //行动操作,触发一次真正从头到尾的计算
  • 这时,可以是用cache方法缓存第一次RDD操作的结果,防止重复执行相同的RDD操作
1
2
3
4
5
list = ["Hadoop","Spark","Hive"]
rdd = sc.parallelize(list)
rdd.cache() //会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,这是rdd还没有被计算生成
print(rdd.count()) //第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中
print(','.join(rdd.collect())) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd

分区

  • RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。RDD分区的一个分区原则是使得分区的个数尽量等于集群中的CPU核心(core)数目。

  • 对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:

    1. 本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N;
    2. Apache Mesos:默认的分区数为8;
    3. Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值;
    • 因此,对于parallelize而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism
1
2
array = [1,2,3,4,5]
rdd = sc.parallelize(array,2) #设置两个分区

转载于:子雨大数据

-------------本文结束感谢您的阅读-------------