Spark 笔记 Python版 (一)
Spark
2020-01-15
408
0
基本操作
创建数据集 注意:创建的时候默认是分片的,分片会根据你Exector数量决定。每个分片是一个Task
lines = sc.parallelize(["pandas", "i like pandas"])或
A = [1,2,3,4,5] lines = sc.parallelize(A)读取外部数据集
lines = sc.textFile("README.md") //或者sc.textFile("hdfs://hadoop1:8000/dataguru/week2/directory/")
lines.count() # Count the number of items in this RDD
pythonLines = lines.filter(lambda line: "Python" in line) #filter
pythonLines.persist #如果多个action要使用同一个RDD则可以先暂存该RDD结果
pythonLines.first() # First item in this RDD, i.e. first line of README.md
合并两个RDD
errorsRDD = inputRDD.filter(lambda x: "error" in x) warningsRDD = inputRDD.filter(lambda x: "warning" in x) badLinesRDD = errorsRDD.union(warningsRDD)输出数据集
print "Input had " + str(badLinesRDD.count()) + " concerning lines" print "Here are 10 examples:" for line in badLinesRDD.take(10): print linePython squaring the values in an RDD
nums = sc.parallelize([1, 2, 3, 4]) squared = nums.map(lambda x: x * x).collect() for num in squared: print "%i " % (num)splitting lines into words
lines = sc.parallelize(["hello world", "hi"]) words = lines.flatMap(lambda line: line.split(" ")) words.first() # returns "hello"reduce() in Python
sum = rdd.reduce(lambda x, y: x + y)
常用RDD
K-V演示
kv1=sc.parallelize([("A",1),("B",2),("C",3),("A",4),("B",5)])
kv1.sortByKey().collect() //注意sortByKey的小括号不能省
kv1.groupByKey().mapValues(list).collect()
from operator import add
kv1.reduceByKey(add).collect()
kv2=sc.parallelize([("A",4),("A",4),("C",3),("A",4),("B",5)])
kv2.distinct().collect()
kv1.union(kv2).collect()
kv3=sc.parallelize([("A",10),("B",20),("D",30)])
kv1.join(kv3).collect()
kv1.cogroup(kv3).collect() //Cogroup。对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。
Basic RDD transformations on an RDD containing {1, 2, 3, 3}
Two-RDD transformations on RDDs containing {1, 2, 3} and {3, 4, 5}
常用Action
sum = rdd.reduce(lambda x, y: x + y)
Basic actions on an RDD containing {1, 2, 3, 3}
Persistence (Caching)
操作方式
RDD.cache()
这个具体要看Persistence levels from org.apache.spark.storage.StorageLevel 设置
查看RDD依赖
rdd.toDebugString
传递函数
word = rdd.filter(lambda s: "error" in s) def containsError(s): return "error" in s word = rdd.filter(containsError)
1.简单的函数:lambda表达式。
适合比较短的函数,不支持多语句函数和无返回值的语句。
2.def函数
会将整个对象传递过去,但是最好不要传递一个带字段引用的函数。如果你传递的对象是某个对象的成员,或者在某个函数中引用了一个整个字段,会报错。举个例子:
class MyClass(object): def __init__(self): self.field = “Hello” def doStuff(self, rdd): #报错:因为在self.field中引用了整个self return rdd.map(lambda s: self.field + x)
参考
0条评论