RDD编程前的准备工作
- 在
/usr/local/spark/mycode/rdd
目录下新建一个word.txt文件,文件里面随便输入几行英文语句,用于测试。
1 | mkdir -p usr/local/spark/mycode/rdd |
- 按照下面的命令启动Hadoop中的HDFS组件:
1 | cd /usr/local/hadoop |
- 按照下面命令启动pyspark:
1 | cd /usr/local/spark |
从文件系统中加载数据创建RDD
Spark可以使用
textFile()
方法来从文件系统中加载数据创建RDD,该方法把文件的URI作为参数,这个URI可以是本地文件系统的地址,或者是分布式文件系统HDFS的地址,或者是Amazon S3的地址等等。从本地文件加载数据:
1 | lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") |
- 把刚才在本地文件系统中的“/usr/local/spark/mycode/rdd/word.txt”上传到HDFS文件系统的hadoop用户目录下。
1 | cd /usr/local/hadoop |
- 进入pyspark的shell界面,从HDFS文件系统加载数据:下面三句语句效果相同
1 | lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") |
- 注意,在使用Spark读取文件时,需要说明以下几点:
- 如果使用了本地文件系统的路径,那么,必须要保证在所有的worker节点上,也都能够采用相同的路径访问到该文件,比如,可以把该文件拷贝到每个worker节点上,或者也可以使用网络挂载共享文件系统。
- textFile()方法的输入参数,可以是文件名,也可以是目录,也可以是压缩文件等。比如,textFile(“/my/directory”), textFile(“/my/directory/.txt”), and textFile(“/my/directory/.gz”).
- textFile()方法也可以接受第2个输入参数(可选),用来指定分区的数目。默认情况下,Spark会为HDFS的每个block创建一个分区(HDFS中每个block默认是128MB)。你也可以提供一个比block数量更大的值作为分区数目,但是,你不能提供一个小于block数量的值作为分区数目。
通过并行集合创建RDD
- 可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。
1 | nums = [1,2,3,4,5] |
RDD操作
转换操作
- 对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。
- 下面列出一些常见的转换操作(Transformation API):
- filter(func):筛选出满足函数func的元素,并返回一个新的数据集。
- map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集。
- flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果。
- groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集。
- reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合。
行动操作
- 行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。
- 下面列出一些常见的行动操作(Action API):
- count() 返回数据集中的元素个数。
- collect() 以数组的形式返回数据集中的所有元素。
- first() 返回数据集中的第一个元素。
- take(n) 以数组的形式返回数据集中的前n个元素。
- reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素。
- foreach(func) 将数据集中的每个元素传递到函数func中运行。
实例
- filter([func]):
1 | lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") |
- map([func]).reduce([func]):map进行结果集塑形,而reduce进行结果集函数应用:
1 | lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") |
- lines.map(lambda line : len(line.split(” “)))会把每行文本都传递给匿名函数,也就是传递给Lamda表达式line : len(line.split(” “))中的line,然后执行处理逻辑len(line.split(” “))。reduce()操作每次接收两个参数,取出较大者留下,然后再继续比较。
缓存机制
在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。如果整个Spark程序中只有一次行动操作,这当然不会有什么问题。但是,在一些情形下,我们需要多次调用不同的行动操作,这就意味着,每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。
下面是一个多次计算同一个RDD的例子:上面代码执行过程中,前后共触发了两次从头到尾的计算。
1 | list = ["Hadoop","Spark","Hive"] |
- 这时,可以是用cache方法缓存第一次RDD操作的结果,防止重复执行相同的RDD操作
1 | list = ["Hadoop","Spark","Hive"] |
分区
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。RDD分区的一个分区原则是使得分区的个数尽量等于集群中的CPU核心(core)数目。
对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:
- 本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N;
- Apache Mesos:默认的分区数为8;
- Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值;
- 因此,对于parallelize而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism
1 | array = [1,2,3,4,5] |
转载于:子雨大数据