流数据_数据回流是什么意思

流数据_数据回流是什么意思恢复内容开始特征:持续到达,数据量大,注重数据整体价值,数据顺序可能颠倒,丢失,实时计算,海量,分布,实时,快速部署,可靠linkedinKafkasparkstreaming:微小批

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺

————恢复内容开始————

特征:

持续到达,数据量大,注重数据整体价值,数据顺序可能颠倒,丢失,实时计算,

海量,分布,实时,快速部署,可靠

linked in Kafka

spark streaming:微小批处理,模拟流计算,秒级响应

DStream 一系列RDD 的集合

支持批处理

流数据_数据回流是什么意思

 

 流数据_数据回流是什么意思

 

 流数据_数据回流是什么意思

 

 创建文件流

流数据_数据回流是什么意思

 

 10代表每10s启动一次流计算

textFileStream 定义了一个文件流数据源

 任务: 寻找并跑demo代码 搭建环境 压力测试 产品

 

套接字流

流数据_数据回流是什么意思

 

 插播: futrue使用(为了兼容老版本python)

https://www.liaoxuefeng.com/wiki/897692888725344/923030465280480

 客户端进行刺频统计,并显示结果。

#!/usr/bin/env python3


from __future__ import print_function

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv)!=3:
        print("Usage: NetworkWordCount.py <hostname><port>",file=sys.stderr)
        exit(-1)
# this is for two arg plus itself        
    sc=SparkContext(appName="PythonStreamingNetworkWordCount")
    ssc=StreamingContext(sc,1)
    lines=ssc.socketTextStream(sys.argv[1],int(sys.argv[2]))
    counts=lines.flatMap(lambda line:line.split(""))\
            .map(lambda word:(word,1))\
            .reduceByKey(lambda a,b:a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

客户端从服务端接收流数据:

# 用客户端向服务端发送流数据 $ /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost <端口>

 

服务端,发送

(a) 系统自带服务端 nc。

# 打开服务端 $nc -lk <端口号>

 

 #!/usr/bin/env python3
# NetworkWordCount.py

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == “__main__”:
    if len(sys.argv) != 3:
        print(“Usage: NetworkWordCount.py <hostname> <port>”, file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName = “PythonStreamingNetworkWordCount”)
    ssc = StreamingContext(sc, 1)
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

    counts = lines.flatMap(lambda line: line.split(” “)) \
            .map(lambda word: (word, 1))\
            .reduceByKey(lambda a,b: a+b)

    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

 

 

 

 

 

import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc=SparkContext(appName=”RDDstream”)
ssc=StreamingContext(sc,2)

rddQueue = []
for i in range(5):
        rddQueue += [ssc.sparkContext.parallelize([j for j in range(1,1001)],10)]
        time.sleep(1)

inputStream = ssc.queueStream(rddQueue)
mappedStream = inputStream.map(lambda x:(x%10,1))
reducedStream=mappedStream.reduceByKey(lambda a,b:a+b)
reducedStream.pprint()
ssc.start()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

 kafka作为高级数据源

1。安装

先查看spark版本,spark-shell查看

version2。4。4   scala 2。11。12

 具体参见课程64 以及

Spark2.1.0+入门:Apache Kafka作为DStream数据源(Python版)

Kafka的安装和简单实例测试

需要安装jar包到spark内

流数据_数据回流是什么意思

 

 

 Dstream(Discreted stream 离散的)无状态转换

https://www.cnblogs.com/jesse123/p/11452388.html

https://www.cnblogs.com/jesse123/p/11460101.html

只统计当前批次,不会去管历史数据

Dstream 有状态转换

流数据_数据回流是什么意思

 

 (windowLength,slideInterval)滑动窗口长度,滑动窗口间隔

流数据_数据回流是什么意思

 

 流数据_数据回流是什么意思

 

 名称一样 但function不一样 逆函数减少计算量

流数据_数据回流是什么意思

 

 流数据_数据回流是什么意思

 

 新进来的x+y,离开的x-y,当中的数据(几百万条)不动  30 (应该是秒为单位)滑动窗口大小 10秒间隔

 

有状态转换upstatebykey操作

跨批次之间维护

 

 https://www.cnblogs.com/luotianshuai/p/5206662.html#autoid-0-3-0

这篇blog很详细 kafka相关概念 集群搭建

 

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/167477.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • Ngixn动静分离详细配置方法

    Ngixn动静分离详细配置方法目录前言:准备工作一.静态主机配置二.动态主机配置三.动静分离配置四.配置文件目录结构五.测试前言:  为了加快网站的解析速度,可以把动态页面和静态页面由不同的服务器来解析,加快解析速度。降低原来单个服务器的压力。在动静分离的tomcat的时候比较明显,因为tomcat解析静态很慢,其实这些原理的话都很好理解,简单来说,使用正则表达式匹配过滤,然后交给不同的服务器。  静态页面一般直接由Nginx来处理,动态页面则是通过反向代理,代理到后端的Tomcat,然后在做负载均衡,是选择本地静态页面,还是后

  • Msfconsole_msfconsole渗透

    Msfconsole_msfconsole渗透msfconsole理论msfconsole理论‍‍在MSF里面msfconsole可以说是最流行的一个接口程序。很多人一开始碰到msfconsole的时候就害怕了。那么多复杂的命令语句需要学习,但是msfconsole真的是一个强大的接口程序。Msfconsole提供了一个一体化的集中控制台。通过msfconsole,你可以访问和使用所有的metasploit的插件,payloa

  • chegg网站_chunked

    chegg网站_chunked服务端给浏览器发送报文时,必须告诉浏览器报文的大小,这样浏览器可以根据报文大小来判断报文的完整性以及在长连接中确定报文的截尾。但是很多服务器的报文是动态创建的,在发送之前是无法确定其大小的。服务器只有等待内容全部创建后,计算出主体的大小,才能响应客户端的请求,这样的处理方法大大延迟了响应。传输编码中的分块编码为这种困难提供了解决方案,服务器可以逐块发送主体,并说明每块的大小就可以了。HTTP协议中

    2022年10月24日
  • 私有IP地址_ipv6私有地址

    私有IP地址_ipv6私有地址私有IP地址:在ABC三类网络中,如下三段网络地址为私有IP地址,如何人都可以自行在自己的局域网中使用这些IP地址.A类私有:10.0.0.110.255.255.254B类私有:172.16

  • centos linux mysql 10060远程错误代码

    centos linux mysql 10060远程错误代码

    2021年10月19日
  • JRebel热部署

    JRebel热部署

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号