Spark Streaming + Kafka集成指南(Kafka broker version 0.8.2.1 or highe)

本文为Spark官方Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 or higher)译文,有不足之处还请指正。

本篇我们阐述如何配置Spark Streaming从Kafka接收数据。 有两种方法可以实现:老的方法是使用Receiver和Kafka的高级API实现,新方法则不需要使用Receiver(自Spark 1.3开始)。它们有不同的编程模型,性能特性和语义保证,所以请继续阅读更多细节。 这两种方法都被认为是当前版本的Spark的稳定API。

方法 1:基于Receiver的方法

此方法使用Receiver来接收数据。 Receiver是使用Kafka consumer API实现的。 与所有Receiver一样,Receiver从Kafka接收的数据并存储在Spark executor中,然后由Spark Streaming启动的作业处理数据。

然而,在默认配置下,这种方法在程序失败时会丢失数据(为了确保零数据丢失,必须启用Spark Streaming中的Write Ahead Logs(在Spark 1.2中引入)),这会同步保存所有从Kafka接收的数据写入分布式文件系统(例如HDFS),以便所有数据可以在故障时恢复。有关Write Ahead Logs更多信息请参阅streaming编程指南中的部署一节。

接着我们要讨论如何在你的Streaming程序中使用这种方法。

1.Jar包: Scala/Java程序可以使用SBT/Maven包管理,在包配置文件中加入以下artifact(更多细节请查看Spark编程指南的Linking章节)

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.0.2

对于Python应用程序,在部署应用程序时,必须添加上述库及其依赖关系。 请参阅下面的部署小节。

2.编程: 在代码中import Kafka包并且创建一个DStream。

import org.apache.spark.streaming.kafka._

val kafkaStream = KafkaUtils.createStream(streamingContext,
    [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

您还可以使用createStream指定键和值类及其相应的解码器类。 请参阅API文档示例

注意事项:

  • Kafka主题的分区与在Spark Streaming中生成的RDD的分区无关。 因此,增加KafkaUtils.createStream()中特定主题分区的数量只会增加在单个接收器中使用的主题的线程数。 它不会增加Spark在处理数据时的并行性。有关详细信息,请参阅主文档。
  • 多个Kafka DStreams可以用不同的组和主题创建,使得多receiver可以并行接收数据。
  • 如果在HDFS这种‘副本型’文件系统中启用了Write Ahead Logs,则接收的数据已经被‘复制’因此,存储级别为StorageLevel.MEMORY_AND_DISK_SER(即,使用KafkaUtils.createStream(…,StorageLevel.MEMORY_AND_DISK_SER))。

3.部署: 与任何Spark应用程序一样,spark-submit用于启动您的应用程序。 但是,Scala/Java应用程序和Python应用程序的细节略有不同。

对于Scala和Java应用程序,如果你使用SBT或Maven进行项目管理,需要将spark-streaming-kafka-0-8_2.11及其依赖项打包到JAR中。spark-core_2.11和spark-streaming_2.11本身就在Spark包中因此不需要打包。然后运行spark-submit执行你的应用程序(请参阅主程序指南中的部署部分

对于缺少SBT / Maven项目管理的Python应用程序,spark-streaming-kafka-0-8_2.11及其依赖项可以使用–packages直接添加到spark-submit中(请参阅Application Submission Guide)。

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 ...

或者,您也可以从Maven repository下载spark-streaming-kafka-0-8-assembly的JAR包,并将其添spark-submit的-jars参数中

方法2:直接访问(无Receiver)

这种新的无Receiver的“直接”方法已经在Spark 1.3中引入,以确保更强的端到端保证。 代替使用接收器来接收数据,该方法周期性地查询Kafka以获得主题和分区中的最新偏移量,并且定义每个批量中处理的偏移范围。 当启动作业时,Kafka的Consumer API Kafka服务器中读取定义好偏移范围的数据(类似于从文件系统读取文件)。 请注意,此特性在Spark 1.3中针对Scala和Java API引入,在Spark 1.4中针对Python API引入。

这种方法相对方法1有以下优点。

  • 简化并行性:无需创建多个输入Kafka流并联合它们。 使用directStream,Spark Streaming将创建与消费的Kafka分区一样多的RDD分区,这将从Kafka并行读取数据。 因此,Kafka和RDD分区之间存在一对一映射,这更容易理解和调整。
  • 高效:在第一种方法中实现零数据丢失需要将数据存储在预写日志中,该日志进一步复制数据。 这实际上是低效的,因为数据有效地被复制两次 一次是Kafka,另一次是Write Ahead Log。 第二种方法消除了问题,因为没有接收器,因此不需要 Write Ahead Logs。 只要您的Kafka有足够的保留时间消息可以从Kafka恢复。
  • Exactly-once语义:第一种方法使用Kafka的高级API在Zookeeper中存储consume的偏移量。这是传统上消费Kafka数据的方式。虽然这种方法(与预写日志结合)可以确保零数据丢失(即至少一次语义),但是在一些故障下一些记录可能被消耗两次的机会很小。这是因为Spark Streaming可靠接收的数据与Zookeeper跟踪的偏移之间存在不一致。因此,在第二种方法中,我们使用不使用Zookeeper的简单的Kafka API。偏移由Spark Streaming在其检查点内跟踪。这消除了Spark Streaming和Zookeeper / Kafka之间的不一致,所以每个记录被Spark Streaming有效地接收一次,尽管失败了。为了实现输出结果的一次性语义,将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务(请参阅Semantics of output operations 获取更多信息)。

注意,这种方法的一个缺点是它不更新Zookeeper中的偏移,因此基于Zookeeper的Kafka监控工具将不会显示进度。 但是,您可以在每个批处理中访问由此方法处理的偏移量,并自己更新Zookeeper(请参阅下文)。

接下来,我们讨论如何在应用程序中使用此方法。

1.Jar包: Scala/Java程序可以使用SBT/Maven包管理,在包配置文件中加入以下artifact(更多细节请查看Spark编程指南的Linking章节)

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.0.2

2.编程: 在代码中import Kafka包并且创建一个DStream。

import org.apache.spark.streaming.kafka._

val directKafkaStream = KafkaUtils.createDirectStream[
    [key class], [value class], [key decoder class], [value decoder class] ](
    streamingContext, [map of Kafka parameters], [set of topics to consume])

您还可以将messageHandler参数传递给createDirectStream,以访问包含有关当前消息的元数据的MessageAndMetadata,并将其转换为任何所需类型。 请参阅API文档示例

在Kafka参数中,您必须指定metadata.broker.list或bootstrap.servers。 默认情况下,它将从每个Kafka分区的最新偏移量开始消费。 如果将Kafka参数中的配置auto.offset.reset设置为最小,那么它将从最小偏移开始消费。

您还可以使用KafkaUtils.createDirectStream的其他变体开始从任何任意偏移量消费。 此外,如果要访问每个批次中消费的Kafka偏移量,可以执行以下操作。

// Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()

directKafkaStream.transform { rdd =>
  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd
}.map {
          ...
}.foreachRDD { rdd =>
  for (o <- offsetRanges) {
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
  ...
}

可以用这个更新Zookeeper ,如果你想Kafka 监控工具显示流处理进程的话。

注意,HasOffsetRanges的类型转换只会在directKafkaStream调用的第一个方法中完成。 您可以使用transform()代替 foreachRDD()作为第一个调用方法,以便访问偏移量,然后调用其他Spark方法。 然而,请注意,RDD分区和Kafka分区之间的一对一映射的关系,shuffle或重新分区后不会保留。 例如 reduceByKey()或window()。

另一个要注意的是,由于此方法不使用接收器(Receiver),标准接收器相关(即spark.streaming.receiver. 相关的配置 )将不适用于通过此方法创建的DStreams。 相反,spark.streaming.kafka.中一个重要的配置项是spark.streaming.kafka.maxRatePerPartition,它是每个Kafka分区将被此直接API读取的最大速率(每秒消息数)。

3.部署:与第一种方法相同。

打赏支持:如果你觉得我的文章对你有所帮助,可以打赏我哟。