This website requires JavaScript.

Spark 笔记 Python版 (一)

基本操作

创建数据集 注意:创建的时候默认是分片的,分片会根据你Exector数量决定。每个分片是一个Task

lines = sc.parallelize(["pandas", "i like pandas"])
A = [1,2,3,4,5]
lines = sc.parallelize(A)
读取外部数据集
lines = sc.textFile("README.md") //或者sc.textFile("hdfs://hadoop1:8000/dataguru/week2/directory/")
lines.count() # Count the number of items in this RDD
 
pythonLines = lines.filter(lambda line: "Python" in line) #filter
pythonLines.persist #如果多个action要使用同一个RDD则可以先暂存该RDD结果
pythonLines.first() # First item in this RDD, i.e. first line of README.md

合并两个RDD

errorsRDD = inputRDD.filter(lambda x: "error" in x) warningsRDD = inputRDD.filter(lambda x: "warning" in x) badLinesRDD = errorsRDD.union(warningsRDD)
输出数据集
print "Input had " + str(badLinesRDD.count()) + " concerning lines" print "Here are 10 examples:" for line in badLinesRDD.take(10): print line
Python squaring the values in an RDD
nums = sc.parallelize([1, 2, 3, 4]) squared = nums.map(lambda x: x * x).collect() for num in squared: print "%i " % (num)
splitting lines into words
lines = sc.parallelize(["hello world", "hi"]) words = lines.flatMap(lambda line: line.split(" ")) words.first() # returns "hello"
reduce() in Python

sum = rdd.reduce(lambda x, y: x + y)

常用RDD

K-V演示

kv1=sc.parallelize([("A",1),("B",2),("C",3),("A",4),("B",5)])
kv1.sortByKey().collect() //注意sortByKey的小括号不能省
kv1.groupByKey().mapValues(list).collect()
from operator import add
kv1.reduceByKey(add).collect()
 
kv2=sc.parallelize([("A",4),("A",4),("C",3),("A",4),("B",5)])
kv2.distinct().collect()
kv1.union(kv2).collect()
 
kv3=sc.parallelize([("A",10),("B",20),("D",30)])
kv1.join(kv3).collect()
kv1.cogroup(kv3).collect() //Cogroup。对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。

Basic RDD transformations on an RDD containing {1, 2, 3, 3}

image

Two-RDD transformations on RDDs containing {1, 2, 3} and {3, 4, 5}

image

常用Action

sum = rdd.reduce(lambda x, y: x + y)

Basic actions on an RDD containing {1, 2, 3, 3}

image

image

Persistence (Caching)

操作方式

RDD.cache()

这个具体要看Persistence levels from org.apache.spark.storage.StorageLevel 设置

image

查看RDD依赖

rdd.toDebugString

传递函数

word = rdd.filter(lambda s: "error" in s)
def containsError(s):
    return "error" in s
    word = rdd.filter(containsError)
1.简单的函数:lambda表达式。
     适合比较短的函数,不支持多语句函数和无返回值的语句。
2.def函数
     会将整个对象传递过去,但是最好不要传递一个带字段引用的函数。如果你传递的对象是某个对象的成员,或者在某个函数中引用了一个整个字段,会报错。举个例子:
class MyClass(object):
    def __init__(self):
        self.field = “Hello”

    def doStuff(self, rdd):
        #报错:因为在self.field中引用了整个self
        return rdd.map(lambda s: self.field + x)
解决方法:直接把你需要的字段拿出来放到一个局部变量里,然后传递这个局部变量就可以了。

参考

spark学习笔记总结-spark入门资料精化 Spark Python API Docs

0条评论
avatar