Spark Streaming详解(重点窗口计算)

Spark Streaming详解(重点窗口计算)前面有几篇关于SparkStreaming的博客,那会只是作为Spark入门,快速体验Spark之用,只是照着葫芦画瓢。本文结合Spark官网上SparkStreaming的编程指南对SparkStreaming进行介绍StreamingContext如同SparkContext一样,StreamingContext也是SparkStreaming应用程序通往Spark集群的通道,它的定义…

大家好,又见面了,我是你们的朋友全栈君。

StreamingContext

如同SparkContext一样,StreamingContext也是Spark Streaming应用程序通往Spark集群的通道,它的定义如下:

Java代码  
收藏代码

  1. /** 
  2.  * Main entry point for Spark Streaming functionality. It provides methods used to create 
  3.  * [[org.apache.spark.streaming.dstream.DStream]]s from various input sources. It can be either 
  4.  * created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf 
  5.  * configuration (see core Spark documentation), or from an existing org.apache.spark.SparkContext. 
  6.  * The associated SparkContext can be accessed using `context.sparkContext`. After 
  7.  * creating and transforming DStreams, the streaming computation can be started and stopped 
  8.  * using `context.start()` and `context.stop()`, respectively. 
  9.  * `context.awaitTermination()` allows the current thread to wait for the termination 
  10.  * of the context by `stop()` or by an exception. 
  11.  */  
  12. class StreamingContext private[streaming] (  
  13.     sc_ : SparkContext,  
  14.     cp_ : Checkpoint,  
  15.     batchDur_ : Duration  
  16.   ) extends Logging {  

 通过类的文档注释,我们看到:

1. 提供了从各种输入数据源创建DStream的方法

2,参数中的batchDur_是Duration类型的对象,比如Second(10),这个参数的含义是the time interval at which streaming data will be divided into batches,也就是说,假如batchDur_为Second(10)表示Spark Streaming会把每10秒钟的数据作为一个Batch,而一个Batch就是一个RDD?是的,一个RDD的数据对应一个batchInterval累加读取到的数据

 

DStream

Java代码  
收藏代码

  1. /** 
  2.  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous 
  3.  * sequence of RDDs (of the same type) representing a continuous stream of data (see 
  4.  * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). 
  5.  * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, 
  6.  * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by 
  7.  * transforming existing DStreams using operations such as `map`, 
  8.  * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream 
  9.  * periodically generates a RDD, either from live data or by transforming the RDD generated by a 
  10.  * parent DStream. 
  11.  * 
  12.  * This class contains the basic operations available on all DStreams, such as `map`, `filter` and 
  13.  * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains 
  14.  * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and 
  15.  * `join`. These operations are automatically available on any DStream of pairs 
  16.  * (e.g., DStream[(Int, Int)] through implicit conversions when 
  17.  * `org.apache.spark.streaming.StreamingContext._` is imported. 
  18.  * 
  19.  * DStreams internally is characterized by a few basic properties: 
  20.  *  – A list of other DStreams that the DStream depends on 
  21.  *  – A time interval at which the DStream generates an RDD 
  22.  *  – A function that is used to generate an RDD after each time interval 
  23.  */  

 从文档中,我们可以看到如下几点:

1. 对DStream实施map操作,会转换成另外一个DStream

2. DStream是一组连续的RDD序列,这些RDD中的元素的类型是一样的。DStream是一个时间上连续接收数据但是接受到的数据按照指定的时间(batchInterval)间隔切片,每个batchInterval都会构造一个RDD,因此,Spark Streaming实质上是根据batchInterval切分出来的RDD串,想象成糖葫芦,每个山楂就是一个batchInterval形成的RDD

3. 对DStream实施windows或者reduceByKeyAndWindow操作,也是转换成另外一个DStream(window操作是stateful DStream Transformation)

4. DStream同RDD一样,也定义了map,filter,window等操作,同时,对于元素类型为(K,V)的pair DStream,Spark Streaming提供了一个隐式转换的类,PairStreamFunctions

5. DStream内部有如下三个特性:

-DStream也有依赖关系,一个DStream可能依赖于其它的DStream(依赖关系的产生,同RDD是一样的)

-DStream创建RDD的时间间隔,这个时间间隔是不是就是构造StreamingContext传入的第三个参数?是的!

-在时间间隔到达后,DStream创建RDD的方法

Spark Streaming详解(重点窗口计算)

 在DStream内部,DStream表现为一系列的RDD的序列,针对DStream的操作(比如map,filter)会转换到它底层的RDD的操 作,由这个图中可以看出来,0-1这段时间的数据累积构成了RDD@time1,1-2这段时间的数据累积构成了RDD@time2,。。。也就是说,在 Spark Streaming中,DStream中的每个RDD的数据是一个时间窗口的累计。

 

 

下图展示了对DStream实施转换算子flatMap操作。需要指出的是,RDD的转换操作是由Spark Engine来实现的,原因是Spark Engine接受了原始的RDD以及作用于RDD上的算子,在计算结果时才真正的对RDD实施算子操作

 
Spark Streaming详解(重点窗口计算)
 

 

 

 

按照下面这幅图所呈现出来的含义是,Spark Streaming用于将输入的数据进行分解成一个一个的RDD,每个RDD交由Spark Engine进行处理以得到最后的处理数据?是的!

 

Spark Streaming详解(重点窗口计算)
上图中,Spark Streaming模块用于将接受到数据定时的切分成RDD(上图中定义为batch of input data),这些RDD交由Spark Engine进行计算。Spark Streaming模块负责数据接收并定时转换成一系列RDD,Spark Engine对Spark Streaming送过来的RDD进行计算

 

DStream层次关系

 

 

DStream的window操作

Java代码  
收藏代码

  1. /** 
  2.  * Return a new DStream in which each RDD contains all the elements in seen in a 
  3.  * sliding window of time over this DStream. 
  4.  * @param windowDuration width of the window; must be a multiple of this DStream’s 
  5.  *                       batching interval 
  6.  * @param slideDuration  sliding interval of the window (i.e., the interval after which 
  7.  *                       the new DStream will generate RDDs); must be a multiple of this 
  8.  *                       DStream’s batching interval 
  9.  */  
  10. def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {  
  11.   new WindowedDStream(this, windowDuration, slideDuration)  
  12. }  

DStream与window相关的两个参数是windowDuration和slideDuration,这两个参数究竟表示什么含义。通过window操作,DStream转换为了WindowedDStream

windowDuration表示的是对过去的一个windowDuration时间间隔的数据进行统计计算, windowDuration是intervalBatch的整数倍,也就是说,假如windowDuration=n*intervalBatch, 那么window操作就是对过去的n个RDD进行统计计算
如下内容来自于Spark Streaming的官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html

 

Spark Streaming也提供了窗口计算(window computations)的功能,允许我们每隔一段时间(sliding duration)对过去一个时间段内(window duration)的数据进行转换操作(tranformation).

slideDruation控制着窗口计算的频度,windowDuration控制着窗口计算的时间跨度。slideDruation和windowDuration都必须是batchInterval的整数倍。假想如下一种场景:

windowDuration=3*batchInterval,

slideDuration=10*batchInterval,

表示的含义是每个10个时间间隔对之前的3个RDD进行统计计算,也意味着有7个RDD没在window窗口的统计范围内。slideDuration的默认值是batchInterval

 

 

下图展示了滑动窗口的概念
 

 

 
Spark Streaming详解(重点窗口计算)
如上图所示,一个滑动窗口时间段((sliding window length)内的所有RDD会进行合并以创建windowed DStream所对应的RDDD。每个窗口操作有两个参数:

 

  • window length – The duration of the window (3 in the figure),滑动窗口的时间跨度,指本次window操作所包含的过去的时间间隔(图中包含3个batch interval,可以理解时间单位)
  • sliding interval – The interval at which the window operation is performed (2 in the figure).(窗口操作执行的频率,即每隔多少时间计算一次)

These two parameters must be multiples of the batch interval of the source DStream (1 in the figure). 这表示,sliding window length的时间长度以及sliding interval都要是batch interval的整数倍。
batch interval是在构造StreamingContext时传入的(1 in the figure)
 

说明:

window length为什么是3?如椭圆形框,它是从第三秒开始算起(包括第三秒),第五秒结束,即包含3,4,5三个1秒,因此是3

sliding interval为什么是2?主要是看圆角矩形框的右边线,虚线的圆角矩形框的右边线在time 3结束, 实线的圆角矩形框的右边线在time 5结束,所以跨度是2。也就是看时间的最右侧即可,以右边线为基准,每个窗口操作(window length)占用了3个时间片。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
表示每隔10秒钟对过去30秒钟产生的单词进行计数。这个方法有个不合理的地方,既然要求sliding window length和sliding interval都是batch interval的整数倍,那么此处为什么不用时间单位,而使用绝对的时间长度呢?

 

 

 

Spark Streaming Sources

这是Spark Streaming的数据输入源,包括两类:基本数据源和高级数据源

 

基本数据源

  • file systems
  • socket connections
  • Akka actors

以上数据源,StreamingContext的API直接提供,

  • fileStream,

监听HDFS文件系统的新文件的创建,读取其中内容。如果文件已存在而内容有变化,是不会被监听到的,因此只能将文件内容在某个位置写好后,然后移动到Spark Streaming监听的目录,如果文件在这个目录下内容发生变化,则Spark Streaming无法监听到

 

另外需要注意的是,Spark Streaming启动后,Spark Streaming通过文件的最后修改时间(modify time)来判断一个新加入到监听目录的文件是否有效。如果一个较长时间没有更新的文件move到监听目录,Spark Streaming也不会对它进行读取进而计算

 

Java代码  
收藏代码

  1. /** 
  2.  * Create a input stream that monitors a Hadoop-compatible filesystem 
  3.  * for new files and reads them using the given key-value types and input format. 
  4.  * Files must be written to the monitored directory by “moving” them from another 
  5.  * location within the same file system. File names starting with . are ignored. 
  6.  * @param directory HDFS directory to monitor for new file 
  7.  * @tparam K Key type for reading HDFS file 
  8.  * @tparam V Value type for reading HDFS file 
  9.  * @tparam F Input format for reading HDFS file 
  10.  */  
  11. def fileStream[  
  12.   K: ClassTag,  
  13.   V: ClassTag,  
  14.   F <: NewInputFormat[K, V]: ClassTag  
  15. ] (directory: String): InputDStream[(K, V)] = {  
  16.   new FileInputDStream[K, V, F](this, directory)  
  17. }  

 

  • socket connections

 

Java代码  
收藏代码

  1. /** 
  2.  * Create an input stream from TCP source hostname:port. Data is received using 
  3.  * a TCP socket and the receive bytes it interepreted as object using the given 
  4.  * converter. 
  5.  * @param hostname      Hostname to connect to for receiving data 
  6.  * @param port          Port to connect to for receiving data 
  7.  * @param converter     Function to convert the byte stream to objects 
  8.  * @param storageLevel  Storage level to use for storing the received objects 
  9.  * @tparam T            Type of the objects received (after converting bytes to objects) 
  10.  */  
  11. def socketStream[T: ClassTag](  
  12.     hostname: String,  
  13.     port: Int,  
  14.     converter: (InputStream) => Iterator[T],  
  15.     storageLevel: StorageLevel  
  16.   ): ReceiverInputDStream[T] = {  
  17.   new SocketInputDStream[T](this, hostname, port, converter, storageLevel)  
  18. }  

 问题: converter怎么使用?把InputStream转换为Iterator[T]集合

 

 高级数据源

 

Source Artifact

Kafka spark-streaming-kafka_2.10
Flume spark-streaming-flume_2.10
Kinesis spark-streaming-kinesis-asl_2.10 [Amazon Software License]
Twitter spark-streaming-twitter_2.10
ZeroMQ spark-streaming-zeromq_2.10
MQTT spark-streaming-mqtt_2.10

 

Spark Streaming注意点:

 1. When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL where n > number of receivers to run (see Spark Properties for information on how to set the master).Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process them.

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/148736.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 最好用的免费音乐播放器_最好用的免费音乐播放器

    最好用的免费音乐播放器_最好用的免费音乐播放器不知道大家在工作的时候,是不是跟我一样,喜欢听着自己熟悉的旋律,心情也会很好。但是,原来的很多经典歌曲,要么改收费一首歌几块钱、要么是翻唱的,听起来也没有原版好,对于我们这些只是偶尔听听歌的、写写东西的人来说,的确有点不方便。今天,小莫为大家挑选了四个,截止到目前还能正常使用,并且功能十分强大的音乐播放器,歌曲都是免费的,建议低调收藏。1、音乐社一款很简洁的音乐播放器,涵盖了主流播…

  • idea 2021 mac 激活码(最新序列号破解)

    idea 2021 mac 激活码(最新序列号破解),https://javaforall.cn/100143.html。详细ieda激活码不妨到全栈程序员必看教程网一起来了解一下吧!

  • python interpolate.interp1d_索引错误scipy.interpolate.interp1d「建议收藏」

    python interpolate.interp1d_索引错误scipy.interpolate.interp1d「建议收藏」我试图得到一个三次样条函数scipy.interpolate.interp1d功能。我试图让documentationpage上的示例正常工作,但每当我运行它时,都会出现以下错误:plt.plot(x,y,’o’,xnew,f(xnew),’-‘,xnew,f2(xnew),’–‘)File”/Library/Python/2.7/site-packages/scipy-0.12.0…

  • Python 支付宝转账到银行卡二维码制作步骤分享[通俗易懂]

    Python 支付宝转账到银行卡二维码制作步骤分享[通俗易懂]PS:最近有需求需要根据信息自动生成支付宝转账二维码,实现功能支付宝扫码后信息自动输入。谷歌百度知乎各种搜索教程一大堆没有一个能成功实现(有可能是我流程不对),大致的流程为一下三步:根据url生成链接url转短链短链生成二维码PS:根据此教程做出的二维码扫码会提示违规,不能实现预定目标经多次测试总结出以下流程:转账URL地址拼接:~~alipays://pl…

  • Windows10下安装linux(Utunbu)双系统「建议收藏」

    Windows10下安装linux(Utunbu)双系统「建议收藏」电脑的硬盘应该是mbr模式1.正常安装windows10系统2.打开windows10系统,安装EaSYCBD2.243.右键系统菜单,打开磁盘管理选择一个硬盘压缩100g(自己定义,不少于50G)。4.打开电源选项,关闭快速启动5.插入Untunbu启动盘,重启进入BIOS,关闭SecureBOOT,并以USB为第一启动项6.进入Untunbu,选择installUtunbu,不要联网,然后选择挂载点。10-20G的“/”,主区,etx4;200MB的“/boot”逻辑分区,etx

  • 数仓建模与分析建模_数据仓库建模与数据挖掘建模

    数仓建模与分析建模_数据仓库建模与数据挖掘建模1.数仓概述数据仓库:数据仓库是一个面向主题的、集成的、非易失的、随时间变化的数据集合。重要用于组织积累的历史数据,并且使用分析方法(OLAP、数据分析)进行分析整理,进而辅助决策,为管理者、企业系统提供数据支持,构建商业智能。面向主题:为数据分析提供服务,根据主题将原始数据集合在一起。集成的:原始数据来源于不同的数据源,要整合成最终数据,需要经过ETL(抽取、清洗、转换)的过程。非易失:保存的数据是一系列历史快照,不允许被修改,只允许通过工具进行查询、分析。时变性:数仓会定期接收、集成新的

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号