This website requires JavaScript.

Spark学习笔记

常见 RDD 操作

对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD转化操作

函数名目的示例结果
map()将函数应用于 RDD 中的每个元素,将返回值构成新的 RDDrdd.map(x => x + 1){2, 3, 4, 4}
flatMap()将函数应用于 RDD 中的每个元素,将返回的迭代器的所有内容构成新的 RDD。通常用来切分单词rdd.flatMap(x => x.to(3)){1, 2, 3, 2, 3, 3, 3}
filter()返回一个由通过传给 filter() 的函数的元素组成的 RDDrdd.filter(x => x != 1){2, 3, 3}
distinct()去重rdd.distinct(){1, 2, 3}
sample(withRe placement, fraction, [seed])对 RDD 采样,以及是否替换rdd.sample(false, 0.5)非确定的

对数据分别为{1, 2, 3}和{3, 4, 5}的RDD进行针对两个RDD的转化操作

函数名目的示例结果
union()生成一个包含两个 RDD 中所有元素的 RDDrdd.union(other){1, 2, 3, 3, 4, 5}
intersection()求两个 RDD 共同的元素的 RDDrdd.intersection(other){3}
subtract()移除一个 RDD 中的内容(例如移除训练数据)rdd.subtract(other){1, 2}
cartesian()与另一个 RDD 的笛卡儿积rdd.cartesian(other){(1, 3), (1, 4), ... (3, 5)}

对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD行动操作

函数名目的示例结果
collect()返回 RDD 中的所有元素rdd.collect(){1, 2, 3, 3}
count()RDD 中的元素个数rdd.count()4
countByValue()各元素在 RDD 中出现的次数rdd.countByValue(){(1, 1), (2, 1), (3, 2)}
take(num)从 RDD 中返回 num 个元素rdd.take(2){1, 2}
top(num)从RDD中返回最前面的num 个元素rdd.top(2){3, 3}
takeOrdered(num) (ordering)从 RDD 中按照提供的顺序返 回最前面的 num 个元素rdd.takeOrdered(2)(myOrdering){3, 3}
takeSample(withReplace ment, num, [seed])从 RDD 中返回任意一些元素rdd.takeSample(false, 1)非确定的
reduce(func)并行整合 RDD 中所有数据 (例如 sum)rdd.reduce((x, y) => x + y)9
fold(zero)(func)和 reduce() 一样,但是需要 提供初始值rdd.fold(0)((x, y) => x + y)9
aggregate(zeroValue) (seqOp, combOp)和 reduce() 相似,但是通常 返回不同类型的函数rdd.aggregate((0, 0)) ((x, y) =>(x._1 + y, x._2 + 1), (x, y) =>(x._1 + y._1, x._2 + y._2))(9,4) 参考Spark函数讲解:aggregate
foreach(func)对 RDD 中的每个元素使用给 定的函数rdd.foreach(func)(无)

Pair RDD的转化操作(以键值对集合{(1, 2), (3, 4), (3, 6)}为例)

函数名目的示例结果
reduceByKey(func)合并具有相同键的值rdd.reduceByKey((x, y) => x + y){(1, 2), (3,10)}
groupByKey()对具有相同键的值进行分组rdd.groupByKey(){(1, [2]),(3, [4, 6])}
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)使用不同的返回类型合并具有相同键的值见例 4-12 到例 4-14。
mapValues(func)对 pair RDD 中的每个值应用一个函数而不改变键rdd.mapValues(x => x+1){(1, 3), (3,5), (3, 7)}
flatMapValues(func)对 pair RDD 中的每个值应用 一个返回迭代器的函数,然后 对返回的每个元素都生成一个 对应原键的键值对记录。通常 用于符号化rdd.flatMapValues(x => (x to 5)){(1, 2), (1,3), (1, 4), (1, 5), (3, 4), (3, 5)}
keys()返回一个仅包含键的 RDDrdd.keys{(1, 3, 3}
values()返回一个仅包含值的 RDDrdd.values(){(2, 4, 6}
sortByKey()返回一个根据键排序的 RDDrdd.sortByKey(){(1,2), (3, 4), (3, 6)}

针对两个pair RDD的转化操作(rdd = {(1, 2), (3, 4), (3, 6)}other = {(3, 9)})

函数名目的示例结果
subtractByKey删掉 RDD 中键与 other RDD 中的键相同的元素rdd.subtractByKey(other){(1, 2)}
join对两个 RDD 进行内连接rdd.join(other){(3, (4, 9)), (3, (6, 9))}
rightOuterJoin对两个 RDD 进行连接操作,确保第一个 RDD 的键必须存在(右外连接)rdd.rightOuterJoin(other){(3,(Some(4),9)),(3,(Some(6),9))}
leftOuterJoin对两个 RDD 进行连接操作,确保第二个 RDD 的键必须存在(左外连接)rdd.leftOuterJoin(other){(1,(2,None)), (3, (4,Some(9))), (3, (6,Some(9)))}
cogroup将两个 RDD 中拥有相同键的数据分组到一起rdd.cogroup(other){(1,([2],[])), (3, ([4, 6],[9]))}

Pair RDD的行动操作(以键值对集合{(1, 2), (3, 4), (3, 6)}为例)

函数名目的示例结果
countByKey()对每个键对应的元素分别计数rdd.countByKey(){(1, 1), (3, 2)}
collectAsMap()将结果以映射表的形式返回,以便查询rdd.collectAsMap()Map{(1, 2), (3, 6)}
lookup(key)返回给定键对应的所有值rdd.lookup(3)[4, 6]

持久化

org.apache.spark.storage.StorageLevel和pyspark.StorageLevel中的持久化级 别;如有必要,可以通过在存储级别的末尾加上“_2”来把持久化数据存为两份

级别使用的空间CPU时间是否在内存中是否在磁盘上备注
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK中等部分部分如果数据在内存中放不下,则溢写到磁盘上
MEMORY_AND_DISK_SER部分部分如果数据在内存中放不下,则溢写到磁盘上.在内存中存放序列化后的数据
DISK_ONLY
在 Scala 中使用 persist()
import org.apache.spark.storage.StorageLevel
    val result = input.map(x => x * x)
    result.persist(StorageLevel.DISK_ONLY)
    println(result.count())
    println(result.collect().mkString(","))

RDD 还有一个方法叫作 unpersist(),调用该方法可以手动把持久化的 RDD 从缓存中移除。

代码示例

数据排序

在 Python 中以字符串顺序对整数进行自定义排序

rdd.sortByKey(ascending=True, numPartitions=None, keyfunc = lambda x: str(x))

在 Scala 中以字符串顺序对整数进行自定义排序

val input: RDD[(Int, Venue)] = ...
    implicit val sortIntegersByString = new Ordering[Int] {
      override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
    }
    rdd.sortByKey()

数据分区

// 初始化代码;从HDFS上的一个Hadoop SequenceFile中读取用户信息
// userData中的元素会根据它们被读取时的来源,即HDFS块所在的节点来分布
// Spark此时无法获知某个特定的UserID对应的记录位于哪个节点上
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
.partitionBy(new HashPartitioner(100)) // 构造100个分区 .persist()
// 周期性调用函数来处理过去五分钟产生的事件日志
// 假设这是一个包含(UserID, LinkInfo)对的SequenceFile def processNewLogs(logFileName: String) {
      val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
      val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs
      val offTopicVisits = joined.filter {
        case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components
          !userInfo.topics.contains(linkInfo.topic)
}.count()
      println("Number of visits to non-subscribed topics: " + offTopicVisits)
    }

自定义分区方式 请查看Learning Spark 59页

数据读取与保存

在Scala中求每个文件的平均值

如果文件足够小,那么可以使用SparkContext.wholeTextFiles()方法,该方法返回一个pairRDD,其中键是输入文件的文件名。

val input = sc.wholeTextFiles("file:///home/holden/salesFiles")
    val result = input.mapValues{y =>
      val nums = y.split(" ").map(x => x.toDouble)
      nums.sum / nums.size.toDouble
    }

文件系统读取

val rdd = sc.textFile("file:///home/holden/happypandas.gz") val rdd = sc.textFile("s3n://bucket/my-Files/*.txt") val rdd = sc.textFile("hdfs://master:port/path")

Json读取

{"user": {"name": "Holden", "location": "San Francisco"}, "text": "Nice day out today"}
{"user": {"name": "Matei", "location": "Berkeley"}, "text": "Even nicer here :)"}

Python中使用SparkSQL读取JSON数据

tweets = hiveCtx.jsonFile("tweets.json")
    tweets.registerTempTable("tweets")
    results = hiveCtx.sql("SELECT user.name, text FROM tweets")

Scala中使用SparkSQL读取JSON数据

val tweets = hiveCtx.jsonFile("tweets.json")
    tweets.registerTempTable("tweets")
    val results = hiveCtx.sql("SELECT user.name, text FROM tweets")

Java数据库连接

Scala中的JdbcRDD

def createConnection() = {
    Class.forName("com.mysql.jdbc.Driver").newInstance();
        DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
    }
    def extractValues(r: ResultSet) = {
    (r.getInt(1), r.getString(2))
}
    val data = new JdbcRDD(sc,
    createConnection, "SELECT * FROM panda WHERE ? <= id AND id <= ?",
    lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
    println(data.collect().toList)

这个函数的最后一个参数是一个可以将结果从java.sql.ResultSet 转为对操作数据有用的格式的函数。我们会得到(Int,String)对,如果这个参数空缺,Spark会自动将每行结果转为一个对象数组。

官方建议使用SPARK SQL来访访问,这比上面提到的JdbcRDD处理数据方便。

Cassandra

随着DataStax开源其用于Spark的[Cassandra连接器](https://github.com/datastax/spark-cassandra- connector) ,Spark对Cassandra的支持大大提升。

Cassandra连接器sbt依赖

"com.datastax.spark" %% "spark-cassandra-connector" % "1.0.0-rc5",
"com.datastax.spark" %% "spark-cassandra-connector-java" % "1.0.0-rc5"

Cassandra连接器Maven依赖

<dependency> <!-- Cassandra -->
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector</artifactId>
    <version>1.0.0-rc5</version>
</dependency>
<dependency> <!-- Cassandra -->
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-java</artifactId>
    <version>1.0.0-rc5</version>
</dependency>

跟Elasticsearch很像,Cassandra连接器要读取一个作业属性来决定连接到哪个集群。我们把spark.cassandra.connection.host设置为指向Cassandra群集。如果有用户名和密码的话,则需要分别设置spark.cassandra.auth.username和spark.cassandra.auth.password。假设你只有一个Cassandra群集要连接,可以在创建SparkContext时就把这些都设定好。

在Scala中配置Cassandra属性

val conf = new SparkConf(true)
    .set("spark.cassandra.connection.host", "hostname")
val sc = new SparkContext(conf)

Datastax的Cassandra连接器使用Scala中隐式转换来为SparkContext和RDD提供一些附加函数。让我们引入这些隐式转换,并尝试读取一些数据。

//为SparkContext和RDD提供附加函数的隐式转换
import com.datastax.spark.connector._

//将整张表读为一个RDD。假设你的表test的创建语句为
// CREATE TABLE test.kv(key text PRIMARY KEY, value int); val data = sc.cassandraTable("test" , "kv")
//打印出value字段的一些基本统计。
data.map(row => row.getInt("value")).stats()

在Scala中保存数据到Cassandra

val rdd = sc.parallelize(List(Seq("moremagic", 1)))
rdd.saveToCassandra("test" , "kv", SomeColumns("key", "value"))

HBase

由于org.apache.hadoop.hbase.mapreduce.TableInputFormat类的实现,Spark可以通过Hadoop输入格式访问Hbase 。这个输入格式会返回键值对数据,其中键的类型为org.apache.hadoop.hbase.io.ImmutableBytesWritable,而值得类型为org.apache.hadoop.hbase. client.Result 类型包含多种根据列获取值得方法,在其[API文档](https://hbase. apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html)中有所描述。

从HBase读取收的Scala示例

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, "tablename") //扫描哪张表

val rdd = sc.newAPIHadoopRDD(
    conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable],classOf[Result])

TableInputFormat包含多个可以用来优化对HBase的读取的设置项,比如将扫描限制到一部分列中,以及限制扫描的时间范围.你可以在[TableInputFormat](http:// hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html)的API文档中找到这些选项,并在HBaseConfiguration中设置它们,然后在把它传给Spark.

Elasticsearch

Spark可以使用Elasticsearch-Hadoop从Elasticsearch中读写数据.Elasticsearch是一个开源的、基于Lucene的搜索系统。

在Scala中使用Elasticsearch输出

val jobConf = new JobConf(sc.hadoopConfiguration)
 jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.
 mr.EsOutputFormat")
 jobConf.setOutputCommitter(classOf[FileOutputCommitter])
 jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets")
 jobConf.set(ConfigurationOptions.ES_NODES, "localhost")
 FileOutputFormat.setOutputPath(jobConf, new Path("-"))
 output.saveAsHadoopDataset(jobConf)

在Scala中使用Elasticsearch输入

def mapWritableToInput(in: MapWritable): Map[String, String] = {
    in.map{case (k, v) => (k.toString, v.toString)}.toMap
}
    val jobConf = new JobConf(sc.hadoopConfiguration)
    jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args(1))
    jobConf.set(ConfigurationOptions.ES_NODES, args(2))
    val currentTweets = sc.hadoopRDD(jobConf,
    classOf[EsInputFormat[Object, MapWritable]], classOf[Object],
classOf[MapWritable])
//    map
//  MapWritable[Text, Text]  Map[String, String]
val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }

就输出而言,Elasticsearch可以进行映射推断,但是偶尔会推断出部正确的数据类型,因此如果你要存储字符串以外的数据类型,最好明确指定[类型映射](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put- mapping.html)

累加器

在Python中累加空行

file = sc.textFile(inputFile)
# 创建Accumulator[Int]并出示华为0
blankLines = sc.accumulator(0)

def extractCallSigns(line):
    global blankLines # 访问全局变量
    if (line == ""):
        blankLines += 1
    return line.split(" ")

callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value

在Scala中累加空行

val sc = new SparkContext(...)
val file = sc.textFile("file.txt")

val blankLines = sc.accumulator(0) //创建Accumulator[Int]并初始化为0

val callSigns = file.flatMap(line => {
    if (line == "") {
        blankLines += 1 // 累加器加1
    }
    line.split(" ")
})

累加器的用法如下所示.

  • 通过驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器.返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型.
  • Spark闭包里的执行器代码可以使用累加器+=方法增加累加器的值.
  • 驱动器程序可以调用累加器的value属性.

在Python使用累加器进行错误计数

# 创建用来验证呼叫的累加器
validSignCount = sc.accumulator(0)
invalidSignCount = sc.accumulator(0)

def validateSign(sign):
    global validSignCount, invalidSignCount
    if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z", sign):
        validSignCount += 1
        return True
    else:
        invalidSignCount += 1
        return False

# 对于每个呼号的联系次数进行计数
validSigns = callSigns.filter(validateSign)
contactCount = validSigns.map(lambda sign: (sign, 1)).reduceByKey(lambda (x, y): x + y)

#强制求值计算计数
contactCount.count()
if invalidSignCount.value < 0.1 * validSignCount.value:
    contactCount.saveAsTextFile(outputDir + "/contactCount")
else:
    print "Too many errors: %d in %d" % (invalidSignCount.value, validSignCount.
value)

如果要自定义累加器,需要扩展AccumulatorParam,这在Spark API文档(http://spark.apache.org/docs/ latest/api/scala/index.html#package)中有所介绍.

广播变量

在Python中使用广播变量查询国家

# 查询RDD contactCounts中的呼叫的对应位置.将呼叫前缀
# 读取为国家代码来进行查询
signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes.value)
    count = sign_count[1]
    return (country, count)
countryContactCounts = (contactCounts
                    .map(processSignCount)
                    .reduceByKey((lambda x, y: x+ y)))
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

在Scala中使用广播变量查询国家

    // 查询RDD contactCounts中的呼叫的对应位置.将呼叫前缀
    // 读取为国家代码来进行查询
val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign, count) =>
    val country = lookupInArray(sign, signPrefixes.value)
    (country, count)
}.reduceByKey((x, y) => x + y)
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

基于分区进行操作

以下示例中,使用mapPartitions函数获取输入RDD的每个分区中的元素迭代器,而需要返回的是执行结果的序列的迭代器.

在Python中使用共享连接池

def processCallSigns(signs):
    """使用连接池查询呼号"""
    # 创建一个连接池
    http = urllib3.PoolManager()
    # 与每条呼号记录相关联的URL
    urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs)
    # 创建请求(非阻塞)
    requests = map(lambda x: (x, http.request('GET', x)), urls)
    # 获取结果
    result = map(lambda x: (x[0], json.loads(x[1].data)), requests)
    # 删除空的结果并返回
    return filter(lambda x: x[1] is not None, result)

def fetchCallSigns(input):
    """获取呼叫号"""
    return input.mapPartitions(lambda callSigns : processCallSigns(callSigns))

contactsContactList = fetchCallSigns(validSigns)

在Scala中使用共享连接池与JSON解析器

val contactsContactLists = validSigns.distinct().mapPartitions{
    signs =>
    val mapper = createMapper()
    val client = new HttpClient()
    client.start()
    //创建http请求
    signs.map {sign =>
        createExchangeForSign(sign) 
        //获取响应
    }.map{ case (sign, exchange) =>
    (sign, readExchangeCallLog(mapper, exchange))
    }.filter(x => x._2 != null) // 删除空的呼叫日志
}

按分区执行的操作符

函数名调用所提供的返回的对于RDD[T]的函数签名
mapPartitions()该分区中元素的迭代器返回的元素的迭代器f: (Iterator[T]) → Iterator[U]
mapPartitionsWithIndex()分区序号,以及每个分区中的元素的迭代器返回的元素的迭代器f: (Int, Itera tor[T]) → Iter ator[U]
foreachPartitions()元素迭代器f: (Iterator[T]) → Unit

与外部程序间的管道

Spark在RDD上提供pipe()方法,可以讲数据通过管道传给用其他语言编写的程序,只要它能读写Unix标准刘就行,比如R语言脚本.

R语言的距离程序

#!/usr/bin/env Rscript
library("Imap")
f <- file("stdin")
open(f)
while(length(line <- readLines(f,n=1)) > 0) {
  # 处理行
  contents <- Map(as.numeric, strsplit(line, ","))
  mydist <- gdist(contents[[1]][1], contents[[1]][2],
                  contents[[1]][3], contents[[1]][4],
                  units="m", a=6378137.0, b=6356752.3142, verbose = FALSE)
  write(mydist, stdout())
}

执行起来./src/R/nddistance.R 应该像这样

$ ./src/R/finddistance.R
37.75889318222431,-122.42683635321838,37.7614213,-122.4240097 349.2602
coffee
NA
ctrl-d

目前为止一切顺利,可以讲stdin中的每一行数据都转为stdout中的输出了.现在需要做的事情就是让每个工作节点都能访问finddistance,并调用这个脚本来对RDD进行实际的转化操作.

在Python中使用pipe()调用finddistance.R的driver程序

# 使用一个R语言外部程序计算每次呼叫的距离
distScript = "./src/R/finddistance.R"
distScriptName = "finddistance.R"
sc.addFile(distScript)
def hasDistInfo(call):
    """验证一次呼叫是否有计算距离时必需的字段"""
    requiredFields = ["mylat", "mylong", "contactlat", "contactlong"]
    return all(map(lambda f: call[f], requiredFields))
def formatCall(call):
    """将呼叫按新的格式重新组织以使之可以被R程序解析"""
    return "{0},{1},{2},{3}".format(
          call["mylat"], call["mylong"],
          call["contactlat"], call["contactlong"])

pipeInputs = contactsContactList.values().flatMap(
      lambda calls: map(formatCall, filter(hasDistInfo, calls)))
distances = pipeInputs.pipe(SparkFiles.get(distScriptName))
print distances.collect()

在Scala中使用pipe()调用finddistance.R的驱动器程序

// 使用一个R语言外部程序计算每次呼叫的距离
// 将脚本添加到各个节点需要在本次作业中下载的文件的列表中
val distScript = "./src/R/finddistance.R"
val distScriptName = "finddistance.R"
sc.addFile(distScript)
val distances = contactsContactLists.values.flatMap(x => x.map(y =>
  s"$y.contactlay,$y.contactlong,$y.mylat,$y.mylong")).pipe(Seq(
    SparkFiles.get(distScriptName)))
println(distances.collect().toList)

所有通过SparkContext.addFile(path)添加的文件存都存储在同一个目录中,所以有必要使用唯一的名字.我们可以在工作节点通过SparkFiles.getRootDirectory找到它们.我们也可以使用SparkFiles.get(Filename)来定位单个文件

数值RDD的操作

Spark的数值操作是通过流式算法实现,允许以每次一个元素的方式构建出模型.这些统计数据都会在调用stats()时通过一次遍历数据计算出来,并以StatsCounter对象返回.

StatsCounter中可用的汇总统计数据

方法含义
count()RDD中的元素个数
mean()元素的平均值
sum()总和
max()最大值
min()最小值
variance()元素的方差
sampleVariance()从采样中计算出的方差
stdev()标准差
sampleStdev()采样的标准差

用Python移除异常值

# 要把String类型RDD转为数字数据,这样才能
# 使用统计函数并移除异常值
distanceNumerics = distances.map(lambda string: float(string))
stats = distanceNumerics.stats()
stddev = stdts.stdev()
mean = stats.mean()
reasonableDistances = distanceNumerics.filter(
    lambda x: math.fabs(x - mean) < 3 * stddev)
print reasonableDistances.collect()

用Scala移除异常值

// 现在要移除一些异常值,因为有些地点可能是误报的
// 首先要获取字符串RDD并将它转换为双精度浮点型
val distanceDouble = distance.map(string => string.toDouble)
val stats = distanceDoubles.stats()
val stddev = stats.stdev
val mean = stats.mean
val reasonableDistances = distanceDoubles.filter(x => math.abs(x-mean) < 3 * stddev)
println(reasonableDistance.collect().toList)

并行度

并行度会从两方面影响程序性能.首先,当并行度过低时,Spark集群会出现资源闲置的情况.比如,假设你的应用有1000个可使用的计算核心,但所有运行的步骤只有30个任务,你就应该提高并行度来充分利用更多的计算核心.而当并行度过高时,每个分区产生的间接开销累积起来就会更大.评判并行度是否过高的标准包括任务是否几乎在瞬间(毫秒级)完成的,或者是否观察到任务没有读写任何数据.

Spark提供了两种方法来对操作的并行度进行调优.第一种方法是在数据混洗操作时,使用参数的方式为混洗后的RDD指定并行度.第二种方法是对于任何已有的RDD,可以进行重新分区来获取更多或者更少的分区数.重新分区操作通过repartition()实现,该操作会把RDD随机打乱并分成设定的分区数目.如果你确定要减少RDD分区,可以使用coalesce()操作.由于没有打乱数据,该操作比repartition()更为高效.如果你认为当前的并行度过高或者过低,可以利用这些方法对数据分布进行重新调整.

举个例子,假设我们从S3上读取了大量数据,然后马上进行filter()操作筛选掉数据集中的绝大部分数据.默认情况下,filter()返回的RDD的分区数和其父节点一样,这样可能会产生很多空的分区或者只有很少数据的分区.在这样的情况下,可以通过合并得到分区更少的RDD来提高应用性能.

在PySpark shell中合并分区过多的RDD

# 以可以匹配数千个文件的通配字符串作为输入
>>> input = sc.textFile("s3n://log-files/2014/*.log")
>>> input.getNumPartitions()
35154
# 排除掉大部分数据的筛选方法
>>> lines = input.filter(lambda line: line.startswith("2014-10-17")) >>> lines.getNumPartitions()
35154
# 在缓存lines之前先对其进行合并操作
>>> lines = lines.coalesce(5).cache()
>>> lines.getNumPartitions()
4
# 可以在合并之后的RDD上进行后续分析
>>> lines.count()

Spark SQL

SchemaRDD中可以存储的数据类型

读取数据和执行查询都会返回SchemaRDD.SchemaRDD和传统数据库中的表的概念类似.

Spark SQL/HiveQL类型Scala类型Java类型Python
TINYINTByteByte/byteint/long(在-128到127之间)
SMALLINTShortShort/shortint/long(在-32768到32767之间)
INTIntInt/intint或long
BIGINTLongLong/longlong
FLOATFloatFloat/floatfloat
DOUBLEDoubleDouble/doublefloat
DECIMALScala.math.BigDecimaljava.math.BigDecimaldecimal.Decimal
STRINGStringStringstring
BINARYArray[Byte]byte[]bytearray
BOOLEANBooleanBoolean/booleanbool
TIMESTAMPjava.sql.timestampjava.sql.timestampdatetime.datetime
ARRAY<DATA_TYPE>SeqListlist、tuple或array
MAP<KEY_TYPE, VAL_TYPE>MapMapdict
STRUCT<COL1: COL1_TYPE, ...>RowRowRow

使用Row对象

Row对象表示SchemaRDD中的记录,其本质就是一个定长的字段数组.在Scala/Java中,Row对象有一系列getter方法,可以通过下表获取每个字段的值.标准的取值方法get(或Scala中的apply),读入一个列的序号然后返回一个Object类型(或Scala中的Any类型)的对象,然后由我们把对象转为正确的类型.对于Boolean、Byte、Double、Float、Int、Long、Short和String类型,都有对应的getType()方法,可以把值直接作为相应的类型返回.例如,getString(0)会把字段0的值作为字符串返回.

在Scala中访问 topTweet这个SchemaRDD中的text列(也就是第一列)

val topTweetText = topTweets.map(row => row.getString(0))

在Python中,由于没有显式的类型系统,Row对象变得稍有不同.我们使用row[i]来访问第i个元素.除此之外,Python中的Row还支持以tow.column_name的形式使用名字来访问其中的字段.

在Python中访问topTweet这个SchemaRDD中的text列

topTweetText = topTweets.map(lambda row: row.text)

使用Python从Hive读取

from pyspark.sql import HiveContext

hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT key, value FROM mytable")
keys = rows.map(lambda row: row[0])

使用Scala从Hive读取

import org.apache.spark.sql.hive.HiveContext

val hiveCtx = new HiveContext(sc)
val rows = hiveCtx.sql("SELECT key, value FROM mytable")
val keys = rows.map(row => row.getInt(0))

Parquet

Python中的Parquet数据读取

# 从一个有name和favouriteAnimal字段的Parquet文件中读取数据
rows = hiveCtx.parquetFile(parquetFile)
names = rows.map(lambda row: row.name)
print "Everyone"
print names.collect()

Python中的Parquet数据查询

# 寻找熊猫爱好者
tbl = rows.registerTempTable("people")
pandaFriends = hiveCtx.sql("SELECT name FROM people WHERE favouriteAnimal = \"panda\"")
print "Panda friends"
print pandaFriends.map(lambda row: row.name).collect()
pandaFriends.saveAsParquetFile("hdfs://...")

Json

输入记录

{"name": "Holden"}
{"name": "Sparky The Bear", "lovesPandas":true,"knows": {"friends":["holden"]}}

在Python中使用Spark SQL读取JSON数据

input = hiveCtx.jsonFile(inputFile)

在Scala中使用Spark SQL读取JSON数据

val input = hiveCtx.jsonFile(inputFile)

printSchema()输出结构信息

用SQL查询嵌套数据以及数组元素

select hashtagEntities[0].text from tweets LIMIT 1;

基于RDD

除了读取数据,也可以基于RDD创建SchemaRDD.在Scala中带有case class的RDD可以隐式转换成SchemaRDD.

在Python中可以创建一个由Row对象组成的RDD,然后调用inferSchema(),如下所示

happyPeopleRDD = sc.parallelize([Row(name="holden", favouriteBeverage="coffee")])
happyPeopleSchemaRDD = hiveCtx.inferSchema(happyPeopleRDD)
happyPeopleSchemaRDD.registerTempTable("happy_people")

在Scala中基于case class 创建SchemaRDD

case class HappyPerson(handle: String, favouriteBeverage: String)
...
// 创建了一个人的对象,并且把它转成SchemaRDD
val happyPeopleRDD = sc.parallelize(List(HappyPerson("holden", "coffee")))
// 注意:此处发生了隐式转换
// 该转换等价于 sqlCtx.createSchemaRDD(happyPeopleRDD) happyPeopleRDD.registerTempTable("happy_people")

JDBC/ODBC服务器

Spark SQL也提供JDBC连接支持. Spark SQL的JDBC服务器与Hive中的HiveServer2相一致.由于使用了Thrift通信协议,它也被称为"Thrift server".注意,JDBC服务器支持需要Spark在打开Hive支持的选项下编译. 服务器可以通过Spark目录中的sbin/start-thriftserver.sh启动.这个脚本接受的参数选项大多与spark-submit相同.默认情况下,服务器会在localhost:10000上进行监听,我们可以通过环境变量(HIVE_SERVER2_THRIFT_PORTHIVE_SERVER2_THRIFT_BIND_HOST)修改这些设置,也可以通过Hive配置选项(hive. server2.thrift.port和hive.server2.thrift.bind.host)来修改,你也可以通过命令行参数 --hiveconf property=value 来设置Hive选项.

启动JDBC服务器

./sbin/start-thriftserver.sh --master sparkMaster

使用Beeline连接JDBC服务器

holden@hmbp2:~/repos/spark$ ./bin/beeline -u jdbc:hive2://localhost:10000

Spark SQL中的性能选项

选项默认值用途
spark.sql.codegenfalse设为true时,Spark SQL会把每条查询语句在运行时编译为Java二进制代码.这可以提高大型查询的性能,但在进行小规模查询时会变慢
spark.sql.inMemoryColumnarStorage.compressedfalse自动对内存中的列式存储进行压缩
spark.sql.inMemoryColumnarStorage.batchSize1000列式缓存时的每个批处理的大小.把这个值调大可能会导致内存不够的异常
spark.sql.parquet.compression.codecsnappy使用哪种压缩编码器.可以选的选项包括uncompressed/snappy/gzip/lzo

在Beeline命令中打开codegen选项

  beeline> set spark.sql.codegen=true;
  SET spark.sql.codegen=true
  spark.sql.codegen=true
  Time taken: 1.196 seconds

在Scala中打开Codegen选项

conf.set("spark.sql.codegen", "true")

Spark Streaming

一个简单的例子

Scala流计算import声明

import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds

用Scala进行流式筛选,打印出包含"error"的行

// 从SparkConf创建StreamingContext并指定1秒钟的批处理大小
val ssc = new StreamingContext(conf, Seconds(1))
// 连接到本地机器7777端口上后,使用收到的数据创建Dstream
val lines = ssc.socketTextStream("localhost", 7777)
// 从Dstream中筛选出包含字符串"error"的行
val errorLines = lines.filter(_.contains("error"))
// 打印出有"error"的行
errorLines.print()

以上只是设定好了要进行的计算,系统收到数据时计算就会开始.要开始收集数据,必须显示调用StreamingContext的start()方法.这样.Spark Streaming就会开始把Spark作业不断交给下面的SparkContext去调度执行.执行会在另一个线程中进行,所以需要调用awaitTermination来等待流计算完成,来防止应用退出.

// 启动流计算环境StreamingContext并等待它"完成"
ssc.start()
// 等待作业完成
ssc.awaitTermination()

在Linux/Mac操作系统上运行流计算应用并提供数据

$ spark-submit --class com.oreilly.learningsparkexamples.scala.StreamingLogInput \
    $ASSEMBLY_JAR local[4]

$ nc localhost 7777 # 使你可以键入输入的行来发送给服务器
<此处是你的输入>

节省下来把这个例子加以扩展处理Apache 日志文件. 如果你需要生成一些假的日志可以运行以下脚本.

#!/usr/bin/env sh
rm /tmp/logdata
touch /tmp/logdata
tail -f /tmp/logdata | nc -lk 7777 &
TAIL_NC_PID=$!
cat ./files/fake_logs/log1.log >> /tmp/logdata
sleep 5
cat ./files/fake_logs/log2.log >> /tmp/logdata
sleep 1
cat ./files/fake_logs/log1.log >> /tmp/logdata
sleep 2
cat ./files/fake_logs/log1.log >> /tmp/logdata
sleep 3
sleep 20
kill $TAIL_NC_PID

转换操作

Dstream的转换操作可以分为无状态(stateless)和有状态(stateful)两种.

  • 在无状态转换操作中,每个批次的处理不依赖于之前批次的数据.例如map()、filter()、reduceByKey()等等,都是无状态转化操作.
  • 相对的,有状态转化操作需要使用之前批次的数据或者是中间结果来计算当前批次的数据.有状态转化操作包括基于滑动窗口的转换操作和追踪状态变化的转化操作.

无状态转换操作

DStream无状态转化操作的例子(不完整列表)

函数名称目的Scala示例用来操作DStream[T]的用户自定义函数的函数签名
map()对DStream中的每个元素应用给定函数,返回由各元素输出的元素组成的Dstream.ds.map(x => x + 1)f: (T) -> U
flatMap()对DStream中的每个元素应用给定函数,返回由各元素输出的迭代器组成的DStreamds.flatMap(x => x.split(" "))f: T -> Iterable[U]
filter()返回由给定DStream中通过筛选的元素组成的DStream.ds.filter(x => x != 1)f: T -> Boolean
repartition()改变DStream的分区数ds.repartition(10)N/A
reduceByKey()将每个批次中键相同的记录归约ds.reduceByKey((x, y) => x + y)f: T, T -> T
groupByKey()将每个批次中的记录根据键分组ds.groupByKey()N/A

在Scala中对Dstream使用Map()和reduceByKey()

// 假设ApacheAccessingLog是用来从Apache日志中解析条目的工具类
val accessLogDStream = logData.map(line => ApacheAccessLog.parseFromLogLine(line)) val ipDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), 1))
val ipCountsDStream = ipDStream.reduceByKey((x, y) => x + y)

无状态转化操作也能在多个Dstream间整合数据,不过也是在各个时间区间内.例如,键值对DStream拥有和RDD一样的与连接相关的转化操作.也就是cogroup()、join()、lefOuterJoin()等。

在Scala中连接两个DStream

val ipBytesDStream =
    accessLogsDStream.map(entry => (entry.getIpAddress(), entry.getContentSize()))
val ipBytesSumDStream =
    ipBytesDStream.reduceByKey((x, y) => x + y)
val ipBytesRequestCountDStream =
    ipCountsDStream.join(ipBytesSumDStream)

如果这些无状态转换操作不够用DStream还提供了一个叫做transform()操作符,允许你对DStream提供任意一个RDD到RDD的函数.这个函数会在数据流中的每个批次中被调用,生成一个新的流.比如你有一个 extractOutliers() 函数

在Scala中对DStream使用transform()

val outlierDStream = accessLogsDStream.transform { rdd =>
    extractOutliers(rdd)
}

有状态转化操作

DStream的有状态转化操作是跨时间区间跟踪数据的操作;也就是说,一些先前批次的数据也被用来在新的批次中计算结果.主要的两种类型是滑动窗口和updateStateBykey(),前者以一个时间阶段为滑动窗口进行操作,后者则用来跟踪每个键的状态变化(例如构建一个代表用户会话的对象).

有状态的转化操作需要在你的StreamingContext中打开检查点

ssc.checkpoint("hdfs://...")

在Scala中使用window()对窗口进行计数

val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10))
val windowCounts = accessLogsWindow.count()

其他函数:

reduceByWindow()和reduceByKeyAndWindow()让我们可以对每个窗口更高效地进行归约操作. countByWindow() countByValueAndWindow() updateStateByKey()

输出操作

在Scala中将DStream保存为文本文件,它们接受一个目录作为参数来存储文件,还支持通过可选参数来设置文件和后缀名,每个批次的结果被保存在给定目录的子目录中,且文件名中含有时间和后缀名.

ipAddressRequestCount.saveAsTextFiles("outputDir", "txt")

还有一个 saveAsHadoopFiles(),接收一个Hadoop输出格式作为参数

val writableIpAddressRequestCount = ipAddressRequestCount.map {
   (ip, count) => (new Text(ip), new LongWritable(count)) }
 writableIpAddressRequestCount.saveAsHadoopFiles[
   SequenceFileOutputFormat[Text, LongWritable]]("outputDir", "txt")

在Scala中使用foreachRDD()将数据存储到外部系统中

ipAddressRequestCount.foreachRDD { rdd =>
    rdd.foreachPartition { partition =>
    // 打开到存储系统的连接(比如一个数据库的连接)
    partition.foreach { item =>
    // 使用连接把item存到系统中
    }
    // 关闭连接
    }
}

输入源

文件流

1.用Scala读取目录中的文件流

val logData = ssc.textFileStream(logDirectory)

2.用Scala读取目录中的SequenceFile流

ssc.fileStream[LongWritable, IntWritable,
    SequenceFileInputFormat[LongWritable, IntWritable]](inputDirectory).map {
    case (x, y) => (x.get(), y.get())
}

3.Akka actor 流 actorStream,它可以把Akka actor作为数据流的源.

4.Apache Kafka

在Scala中用Apache Kafka订阅Panda主题

import org.apache.spark.streaming.kafka._
...
// 创建一个从主题到接收器线程数的映射表
val topics = List(("pandas", 1), ("logs", 1)).toMap
val topicLines = KafkaUtils.createStream(ssc, zkQuorum, group, topics)
StreamingLogInput.processLines(topicLines.map(_._2))

5.Apache Flume

参考Spark快速大数据分析 P177

驱动器程序容错

驱动器程序的容错要求我们以特殊的方式创建StreamingContext.我们需要把检查点目录提供给StreamingContext.与直接调用new StreamingContext不同,应该使用StreamingContext.getOrCreate()函数.应把之前的示例中的代码改成如例10-43和例10-44所示的那样.

def createStreamingContext() = {
    ...
    val sc = new SparkContext(conf)
    //以1秒作为批次大小创建StreamingContext
    val ssc = new StreamingContext(sc, Seconds(1))
    ssc.checkpoint(checkpointDir)
}
...
val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)

当这段代码第一次运行时,假设检查点目录还不存在,那么StreamingContext会在你调用工厂函数(在Scala中为createStreamingContext())时把目录创建出来.此处你需要设置检查点目录.在驱动程序失败之后,如果你重启驱动器程序并再次执行代码,getOrCCreate()会重新从检查点目录中初始化出StreamingContext,然后继续处理.

其他

++ 运算符理解

val filteredStats = cleanStats.map { x =>
  val pieces = x.split(",")
  val year = pieces(0)
  val name = pieces(2)
  val team = pieces(5)
  (year + "_" + name, Map(team -> x))
}.reduceByKey(_ ++ _).map { case (x, y) =>
  if (y.contains("TOT")) {
    y("TOT")
  } else {
    y.last._2
  }
}
filteredStats.cache()

++ 运算符是将元素相加,然后返回一个新的集合

例如上述代码中reduceByKey之前传入的是

((1986_George Johnson,Map(SEA -> (1986,130,George Johnson,C,37,SEA,41,0,6.4,0.3,0.6,.522,0.0,0.0,0,0.3,0.6,.522,.522,0.3,0.4,.688,0.6,0.8,1.5,0.3,0.1,0.9,0.3,1.1,0.9))

((1986_George Johnson,Map(WSB -> (1986,131,George Johnson,PF,29,WSB,2,0,3.5,0.5,1.5,.333,0.0,0.0,0,0.5,1.5,.333,.333,1.0,1.0,1.000,0.5,0.5,1.0,0.0,0.0,0.0,0.5,0.5,2.0)))

那么结果为

((1986_George Johnson,Map(SEA -> (1986,130,George Johnson,C,37,SEA,41,0,6.4,0.3,0.6,.522,0.0,0.0,0,0.3,0.6,.522,.522,0.3,0.4,.688,0.6,0.8,1.5,0.3,0.1,0.9,0.3,1.1,0.9), WSB -> (1986,131,George Johnson,PF,29,WSB,2,0,3.5,0.5,1.5,.333,0.0,0.0,0,0.5,1.5,.333,.333,1.0,1.0,1.000,0.5,0.5,1.0,0.0,0.0,0.0,0.5,0.5,2.0)))
0条评论
avatar