炼数成金 门户 大数据 Spark 查看内容

Spark Streaming笔记——技术点汇总

2017-8-14 22:19| 发布者: 炼数成金_小数| 查看: 19198| 评论: 0|原作者: netoxi|来自: 博客园

摘要: Spark Streaming支持实时数据流的可扩展(scalable)、高吞吐(high-throughput)、容错(fault-tolerant)的流处理(stream processing)。Spark在接收到实时输入数据流后,将数据划分成批次(divides the data int ...

存储 Hadoop Spark Kafka Streams

Spark Streaming支持实时数据流的可扩展(Scalable)、高吞吐(high-throughput)、容错(fault-tolerant)的流处理(stream processing)。

架构图

特性如下:
可线性伸缩至超过数百个节点;
实现亚秒级延迟处理;
可与Spark批处理和交互式处理无缝集成;
提供简单的API实现复杂算法
更多的流方式支持,包括Kafka、Flume、Kinesis、Twitter、ZeroMQ等。

原理
Spark在接收到实时输入数据流后,将数据划分成批次(divides the data into batches),然后转给Spark Engine处理,按批次生成最后的结果流(generate the final stream of results in batches)。 


API

DStream
DStream(Discretized Stream,离散流)是Spark Stream提供的高级抽象连续数据流。

组成:一个DStream可看作一个RDDs序列。
核心思想:将计算作为一系列较小时间间隔的、状态无关的、确定批次的任务,每个时间间隔内接收的输入数据被可靠存储在集群中,作为一个输入数据集。

特性:一个高层次的函数式编程API、强一致性以及高校的故障恢复。
应用程序模板:

模板1
1 import org.apache.spark.SparkConf
 2 import org.apache.spark.SparkContext
 3 import org.apache.spark.streaming.Seconds
 4 import org.apache.spark.streaming.StreamingContext
 5 
 6 object Test {
 7   def main(args: Array[String]): Unit = {
 8     val conf = new SparkConf().setAppName("Test")
 9     val sc = new SparkContext(conf)
10     val ssc = new StreamingContext(sc, Seconds(1))
11     
12     // ...
13   }
14 }

模板2
1 import org.apache.spark.SparkConf
 2 import org.apache.spark.streaming.Seconds
 3 import org.apache.spark.streaming.StreamingContext
 4 
 5 object Test {
 6   def main(args: Array[String]): Unit = {
 7     val conf = new SparkConf().setAppName("Test")
 8     val ssc = new StreamingContext(conf, Seconds(1))
 9     
10     // ...
11   }
12 }

WordCount示例
1 import org.apache.spark.SparkConf
 2 import org.apache.spark.storage.StorageLevel
 3 import org.apache.spark.streaming.Seconds
 4 import org.apache.spark.streaming.StreamingContext
 5 import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
 6 
 7 object Test {
 8   def main(args: Array[String]): Unit = {
 9     val conf = new SparkConf().setAppName("Test")
10     val ssc = new StreamingContext(conf, Seconds(1))
11     
12     val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
13     val wordCounts = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
14     wordCounts.print
15     
16     ssc.start
17     ssc.awaitTermination
18   }
19 }

Input DStream
Input DStream是一种从流式数据源获取原始数据流的DStream,分为基本输入源(文件系统、Socket、Akka Actor、自定义数据源)和高级输入源(Kafka、Flume等)。

Receiver:
每个Input DStream(文件流除外)都会对应一个单一的Receiver对象,负责从数据源接收数据并存入Spark内存进行处理。应用程序中可创建多个Input DStream并行接收多个数据流。
每个Receiver是一个长期运行在Worker或者Executor上的Task,所以会占用该应用程序的一个核(core)。如果分配给Spark Streaming应用程序的核数小于或等于Input DStream个数(即Receiver个数),则只能接收数据,却没有能力全部处理(文件流除外,因为无需Receiver)。
Spark Streaming已封装各种数据源,需要时参考官方文档。

Transformation Operation
常用Transformation

updateStateByKey(func)
updateStateByKey可对DStream中的数据按key做reduce,然后对各批次数据累加
WordCount的updateStateByKey版本
1 import org.apache.spark.SparkConf
 2 import org.apache.spark.storage.StorageLevel
 3 import org.apache.spark.streaming.Seconds
 4 import org.apache.spark.streaming.StreamingContext
 5 import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
 6 
 7 object Test {
 8   def main(args: Array[String]): Unit = {
 9     val conf = new SparkConf().setAppName("Test")
10     val ssc = new StreamingContext(conf, Seconds(1))
11     
12     // updateStateByKey前需设置checkpoint
13     ssc.checkpoint("/spark/checkpoint")
14     
15     val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
16     val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
17       // 当前批次单词的总数
18       val currCount = currValues.sum
19       // 已累加的值
20       val prevCount = prevValueState.getOrElse(0)
21       // 返回累加后的结果,是一个Option[Int]类型
22       Some(currCount + prevCount)
23     }
24     val wordCounts = lines.flatMap(_.split(" ")).map((_, 1)).updateStateByKey[Int](addFunc)
25     wordCounts.print
26     
27     ssc.start
28     ssc.awaitTermination
29   }
30 }

transform(func)
通过对原DStream的每个RDD应用转换函数,创建一个新的DStream。
官方文档代码举例
1 val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
3 val cleanedDStream = wordCounts.transform(rdd => {
4   rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
5   ...
6 })

Window operations
窗口操作:基于window对数据transformation(个人认为与Storm的tick相似,但功能更强大)。
参数:窗口长度(window length)和滑动时间间隔(slide interval)必须是源DStream批次间隔的倍数。
举例说明:窗口长度为3,滑动时间间隔为2;上一行是原始DStream,下一行是窗口化的DStream。
常见window operation


官方文档代码举例 
1 // window operations前需设置checkpoint
2 ssc.checkpoint("/spark/checkpoint")
4 // Reduce last 30 seconds of data, every 10 seconds
5 val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

join(otherStream, [numTasks])

连接数据流
官方文档代码举例1
1 val stream1: DStream[String, String] = ...
2 val stream2: DStream[String, String] = ...
3 val joinedStream = stream1.join(stream2)

官方文档代码举例2
1 val windowedStream1 = stream1.window(Seconds(20))
2 val windowedStream2 = stream2.window(Minutes(1))
3 val joinedStream = windowedStream1.join(windowedStream2)

Output Operation


缓存与持久化
通过persist()将DStream中每个RDD存储在内存。
Window operations会自动持久化在内存,无需显示调用persist()。
通过网络接收的数据流(如Kafka、Flume、Socket、ZeroMQ、RocketMQ等)执行persist()时,默认在两个节点上持久化序列化后的数据,实现容错。

Checkpoint
用途:Spark基于容错存储系统(如HDFS、S3)进行故障恢复。

分类:
元数据检查点:保存流式计算信息用于Driver运行节点的故障恢复,包括创建应用程序的配置、应用程序定义的DStream operations、已入队但未完成的批次。

数据检查点:保存生成的RDD。由于stateful transformation需要合并多个批次的数据,即生成的RDD依赖于前几个批次RDD的数据(dependency chain),为缩短dependency chain从而减少故障恢复时间,需将中间RDD定期保存至可靠存储(如HDFS)。

使用时机:
Stateful transformation:updateStateByKey()以及window operations。
需要Driver故障恢复的应用程序。

使用方法
Stateful transformation
streamingContext.checkpoint(checkpointDirectory)
需要Driver故障恢复的应用程序(以WordCount举例):如果checkpoint目录存在,则根据checkpoint数据创建新StreamingContext;否则(如首次运行)新建StreamingContext。
1 import org.apache.spark.SparkConf
 2 import org.apache.spark.storage.StorageLevel
 3 import org.apache.spark.streaming.Seconds
 4 import org.apache.spark.streaming.StreamingContext
 5 import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
 6 
 7 object Test {
 8   def main(args: Array[String]): Unit = {
 9     val checkpointDir = "/spark/checkpoint"
10     def createContextFunc(): StreamingContext = {
11       val conf = new SparkConf().setAppName("Test")
12       val ssc = new StreamingContext(conf, Seconds(1))
13       ssc.checkpoint(checkpointDir)
14       ssc
15     }
16     val ssc = StreamingContext.getOrCreate(checkpointDir, createContextFunc _)
17     
18     val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
19     val wordCounts = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
20     wordCounts.print
21     
22     ssc.start
23     ssc.awaitTermination
24   }
25 }

checkpoint时间间隔
方法:
dstream.checkpoint(checkpointInterval)
原则:一般设置为滑动时间间隔的5-10倍。
分析:checkpoint会增加存储开销、增加批次处理时间。当批次间隔较小(如1秒)时,checkpoint可能会减小operation吞吐量;反之,checkpoint时间间隔较大会导致lineage和task数量增长。

性能调优

降低批次处理时间
数据接收并行度
增加DStream:接收网络数据(如Kafka、Flume、Socket等)时会对数据反序列化再存储在Spark,由于一个DStream只有Receiver对象,如果成为瓶颈可考虑增加DStream。
1 val numStreams = 5
2 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
3 val unifiedStream = streamingContext.union(kafkaStreams)
4 unifiedStream.print()

设置“spark.streaming.blockInterval”参数:接收的数据被存储在Spark内存前,会被合并成block,而block数量决定了Task数量;举例,当批次时间间隔为2秒且block时间间隔为200毫秒时,Task数量约为10;如果Task数量过低,则浪费了CPU资源;推荐的最小block时间间隔为50毫秒。
显式对Input DStream重新分区:在进行更深层次处理前,先对输入数据重新分区。
inputStream.repartition(<number of partitions>)

数据处理并行度:reduceByKey、reduceByKeyAndWindow等operation可通过设置“spark.default.parallelism”参数或显式设置并行度方法参数控制。
数据序列化:可配置更高效的Kryo序列化。

设置合理批次时间间隔
原则:处理数据的速度应大于或等于数据输入的速度,即批次处理时间大于或等于批次时间间隔。

方法:
先设置批次时间间隔为5-10秒以降低数据输入速度;
再通过查看log4j日志中的“Total delay”,逐步调整批次时间间隔,保证“Total delay”小于批次时间间隔。

内存调优
持久化级别:开启压缩,设置参数“spark.rdd.compress”。
GC策略:在Driver和Executor上开启CMS。

欢迎加入本站公开兴趣群
软件开发技术群
兴趣范围包括:Java,C/C++,Python,PHP,Ruby,shell等各种语言开发经验交流,各种框架使用,外包项目机会,学习、培训、跳槽等交流
QQ群:26931708

Hadoop源代码研究群
兴趣范围包括:Hadoop源代码解读,改进,优化,分布式系统场景定制,与Hadoop有关的各种开源项目,总之就是玩转Hadoop
QQ群:288410967 

鲜花

握手

雷人

路过

鸡蛋

最新评论

热门频道

  • 大数据
  • 商业智能
  • 量化投资
  • 科学探索
  • 创业

即将开课

热门文章

     

    GMT+8, 2020-2-29 20:15 , Processed in 0.101317 second(s), 23 queries .