大家好,又见面了,我是你们的朋友全栈君。
?送给大数据小白的建议:
1️⃣大数据平台学习及相关技术介绍可参考:
2️⃣大数据工程师的日常工作内容:
3️⃣大数据面试资料推荐:
?大数据开发笔记系列:
➡️本文章目录
2、有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16字节,内存限制大小是1M。返回频数最高的100个词。
3、有10个文件,每个文件1G,每个文件的每一行存放的都是用户的query,每个文件的query都可能重复。要求你按照query的频度排序。
4、 给定a、b两个文件,各存放50亿个url,每个url各占64字节,内存限制是4G,让你找出a、b文件共同的url。
5、腾讯面试题:给40亿个不重复的unsigned int的整数,没排过序的,然后再给一个数,如何快速判断这个数是否在那40亿个数当中?
5.order by ,sort by , distribute by , cluster by 的区别?
一、HDFS
1.基本操作:
查看所有命令 hadoop fs
1、查看所有目录及其文件 hadoop fs -ls /
2、hdfs文件系统创建目录 hadoop fs -mkdir /input(用于测试代码)
2.1、hdfs文件系统创建目录(批量)
hadoop fs -mkdir -p /inout/tmp 在input文件夹内创建tmp文件夹
3、hdfs文件系统创建文件 hadoop fs -touchz /a.txt
4、hdfs文件系统删除文件
hadoop fs -rmr /a.txt
hadoop fs -rmr -skipTrash /a.txt(跳过回收站彻底删除)
5.hdfs上传本地文件 (注意:必须先建好hdfs上目录再put)
hadoop fs -put t.txt /test 将本地文件t.txt上传至hdfs上test文件夹内;
hadoop fs -put /a.txt
6.查看hdfs文件内容
hadoop fs -cat /a.txt
hadoop fs -tail /a.txt (从尾部开始看)
hadoop fs -text /a.txt (查看二进制数据)
7、hdfs下载文件
hadoop fs -get /a.txt .
注意最后有一点,这个.代表下载到本地命令行所在目录;
8、递归删除目录
hadoop fs -rmr /input/tmp
9、查看hdfs文件的大小
hadoop fs -du -h /b.txt
-du -s或者-du -h
10、查看hdfs文件行数
hadoop fs -cat /b.txt | wc -l
最后是字母l
cat或者text 都可以
实战:
查看集群ip情况 cat /etc/hosts
查看hadoop版本
echo $HADOOP_HOME/
which $HADOOP_HOME/
运行集群脚本
sh -x run.sh
运行run脚本,最好是 -X调试模式
2.HDFS的优缺点有哪些?
(1)HDFS的优点
高容错性
①:数据自动保存多个副本。它通过增加副本的形式,提高容错性
②:某一个副本丢失以后,它可以自动恢复
适合批处理即就近原则
①:移动计算而非非数据,数据位置暴露给计算机框架
②:本地化,数据不移动,代码(任务)移动。
适合处理大数据
①:数据规模:能够处理数据规模达到GB、TB、甚至PB级别的数据
②:文件规模:能够处理百万规模以上的文件数量,数量相当之大
可构建在廉价机器上,通过多副本机制,提高可靠性
(2)HDFS的缺点
不适合低延时数据访问,比如毫秒级的存储数据,是做不到的。寻址时间长,适合读取大文件,低延迟与高吞吐率。
不适合小文件存储
占用NameNode大量内存,寻找时间超过读取时间
不支持并发写入,文件随机修改
①:一个文件只能有一个写,不允许多个线程同时写
②:仅支持数据append(追加),不支持文件的修改
3.HDFS整体架构介绍
1)Client:就是客户端。
(1)文件切分。文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行存储;
(2)与NameNode交互,获取文件的位置信息;
(3)与DataNode交互,读取或者写入数据;
(4)Client提供一些命令来管理HDFS,比如启动或者关闭HDFS;
(5)Client可以通过一些命令来访问HDFS;
2)NameNode:就是Master,它是一个主管、管理者。
(1)管理HDFS的名称空间;namespace
(2)管理数据块(Block)映射信息;
(3)配置副本策略(默认);3
(4)处理客户端读写请求。
3) DataNode:就是Slave。NameNode下达命令,DataNode执行实际的操作。
(1)存储实际的数据块;
(2)执行数据块的读/写操作。
4) SecondaryNameNode:并非NameNode的热备。当NameNode挂掉的时候,它并不能马上替换NameNode并提供服务。
(1)辅助NameNode,分担其工作量;
(2)定期合并Fsimage和Edits,并推送给NameNode;
(3)在紧急情况下,可辅助恢复NameNode。
DataNode的工作机制?
-
一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
-
DataNode启动后向NameNode注册,通过后,周期性(1小时)的向NameNode上报所有的块信息。
-
心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。
什么是机架感知?什么时候会使用机架感知?
通俗的来说就是NN(NameNode)通过读取我们的配置来配置各个节点所在的机架信息,数据的流水线复制和HDFS复制副本时候。
4.HDFS数据写入(上传)流程是怎样的?
一 HDFS读流程概括:
-
client跟namenode通信查询元数据,namenode通过查询元数据,找到文件块所在的datanode服务器
-
挑选一台datanode(就近原则,然后随机)服务器,请求建立socket流
-
datanode开始发送数据(从磁盘里面读取数据放入流,以packet为单位来做校验,大小为64k)
-
客户端以packet为单位接收,现在本地缓存,然后写入目标文件
详细流程
-
Client 发起文件上传请求,通过 RPC 与 NameNode 建立通讯,NameNode检查目标文件是否已存在,父目录是否存在,返回是否可以上传;
-
Client 请求第一个 block 该传输到哪些 DataNode 服务器上;
-
NameNode 根据配置文件中指定的备份数量及副本放置策略进行文件分配,返回可用的 DataNode 的地址,如A,B,C
-
Client 请求 3 台 DataNode 中的一台 A 上传数据(本质上是一个 RPC 调用,建立 pipeline),A 收到请求会继续调用 B,然后 B 调用 C,将整个pipeline 建立完成,后逐级返回 client;
-
Client 开始往 A 上传第一个 block(先从磁盘读取数据放到一个本地内存缓存),以 packet 为单位(默认 64K),A 收到一个 packet 就会传给 B,B 传给 C;A 每传一个 packet 会放入一个应答队列等待应答
-
数据被分割成一个个 packet 数据包在 pipeline 上依次传输,在pipeline 反方向上,逐个发送 ack(命令正确应答),最终由 pipeline中第一个 DataNode 节点 A 将 pipeline ack 发送给 client;
-
当一个 block 传输完成之后,client 再次请求 NameNode 上传第二个block 到服务器
5.HDFS数据读取(下载)流程是怎样的?
HDFS写流程概括
-
客户端跟namenode通信请求上传文件,namenode检查目标文件是否已存在,父目录是否存在,用户是否有权限等
-
namenode返回是否可以上传
-
client请求第一个 block该传输到哪些datanode服务器上
-
namenode返回3个datanode服务器ABC
-
client请求3台dn中的一台A上传数据(本质上是一个RPC调用,建立pipeline),A收到请求会继续调用B,然后B调用C,将整个pipeline建立完成,逐级返回客户端
-
client开始往A上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,A收到一个packet就会传给B,B传给C;A每传一个packet会放入一个应答队列等待应答
-
当一个block传输完成之后,client再次请求namenode上传第二个block的服务器。
写流程详细:
-
Client 发起文件读取请求通过RPC与NameNode建立通讯,nameNode检查文件位置,来确定请求文件 block 所在的位置
-
NameNode会视情况返回文件的部分或者全部block列表,对于每个block,NameNode 都会返回含有该 block 副本的 DataNode 地址;
-
这些返回的 DN 地址,会按照集群拓扑结构得出 DataNode 与客户端的距离,然后进行排序,排序两个规则:网络拓扑结构中距离 Client 近的排靠前;心跳机制中超时汇报的 DN 状态为 STALE,这样的排靠后;
-
Client 选取排序靠前的 DataNode 来读取 block,如果客户端本身就是DataNode,那么将从本地直接获取数据
-
底层上本质是建立 Socket Stream(FSDataInputStream),重复的调用父类 DataInputStream 的 read 方法,直到这个块上的数据读取完毕;
-
当读完列表的 block 后,若文件读取还没有结束,客户端会继续向NameNode 获取下一批的 block 列表;
-
读取完一个 block 都会进行 checksum 验证,如果读取 DataNode 时出现错误,客户端会通知 NameNode,然后再从下一个拥有该 block 副本的DataNode 继续读。
-
Read 方法是并行的读取 block 信息,不是一块一块的读取;NameNode 只是返回Client请求包含块的DataNode地址,并不是返回请求块的数据;最终读取来所有的 block 会合并成一个完整的最终文件。
hdfs-site.xml的3个主要属性是?
-
dfs.name.dir→决定的是元数据存储的路径和DFS的存储方式(磁盘或远端)
-
dfs.data.dir→决定的是数据存储的路径
-
fs.checkpoint.dir→用于 SecondaryNameNode
NN和2NN工作机制
1. 第一阶段:NameNode启动
-
第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
-
客户端对元数据进行增删改的请求。
-
NameNode记录操作日志,更新滚动日志。
-
NameNode在内存中对数据进行增删改。
2. 第二阶段:Secondary NameNode工作
-
Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。
-
Secondary NameNode请求执行CheckPoint。
-
NameNode滚动正在写的Edits日志。
-
将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。
-
Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
-
生成新的镜像文件fsimage.chkpoint。
-
拷贝fsimage.chkpoint到NameNode。
-
NameNode将fsimage.chkpoint重新命名成fsimage。
NN和2NN工作机制详解:
Fsimage:NameNode内存中元数据序列化后形成的文件。
Edits:记录客户端更新元数据信息的每一步操作(可通过Edits运算出元数据)。
NameNode启动时,先滚动Edits并生成一个空的edits.inprogress,然后加载Edits和Fsimage到内存中,此时NameNode内存就持有最新的元数据信息。Client开始对NameNode发送元数据的增删改的请求,这些请求的操作首先会被记录到edits.inprogress中(查询元数据的操作不会被记录在Edits中,因为查询操作不会更改元数据信息),如果此时NameNode挂掉,重启后会从Edits中读取元数据的信息。然后,NameNode会在内存中执行元数据的增删改的操作。
由于Edits中记录的操作会越来越多,Edits文件会越来越大,导致NameNode在启动加载Edits时会很慢,所以需要对Edits和Fsimage进行合并(所谓合并,就是将Edits和Fsimage加载到内存中,照着Edits中的操作一步步执行,最终形成新的Fsimage)。SecondaryNameNode的作用就是帮助NameNode进行Edits和Fsimage的合并工作。
SecondaryNameNode首先会询问NameNode是否需要CheckPoint(触发CheckPoint需要满足两个条件中的任意一个,定时时间到和Edits中数据写满了)。直接带回NameNode是否检查结果。SecondaryNameNode执行CheckPoint操作,首先会让NameNode滚动Edits并生成一个空的edits.inprogress,滚动Edits的目的是给Edits打个标记,以后所有新的操作都写入edits.inprogress,其他未合并的Edits和Fsimage会拷贝到SecondaryNameNode的本地,然后将拷贝的Edits和Fsimage加载到内存中进行合并,生成fsimage.chkpoint,然后将fsimage.chkpoint拷贝给NameNode,重命名为Fsimage后替换掉原来的Fsimage。NameNode在启动时就只需要加载之前未合并的Edits和Fsimage即可,因为合并过的Edits中的元数据信息已经被记录在Fsimage中。
二、Mapreduce
1.MapReduce工作流程
Application运行流程:
mr程序最先启动MRAppMaster(AM),AM启动后根据本次application的信息,计算出需要的maptask实例数量,然后向RM申请机器启动相应数量的maptask进程,RM通过心跳感知目前集群container(容器)的工作繁忙情况,分配相应的container资源,相应containers的nodemanagr在各自节点上启动container。
2.Mapreduce具体流程总结:
1、inputformat:MR框架基础类之一,包含数据分割(Data Splits)和记录读取器(Record Reader)两部分。每个split包含后一个Block中开头部分的数据可以解决记录跨Block问题,每读取一条记录,调用一次map函数。
2、Map:每一个切片对应一个map,map输出的数据,放入环形溢写缓冲区,缓冲区默认100M,达到80M进行溢写,写入到本地文件。
3、Shuffle:shuffle是MapReduce计算框架的核心,包括了Partion, Sort, Spill, Meger, Combiner, Copy, Memery, Disk等分组动作;
3.1、partition对map的内容根据kv对进行分区
3.2、sort(快速排序),溢写到磁盘
3.3、数据合并combiner(①减少数据写入磁盘的数据量 ② 减少网络传输的数据量 , 数据压缩)
AM监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)。Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储。
3.4、fetch (通过RM,reduce找到指定的map主动fetch数据)
3.5、溢写,排序(归并排序)
3.6、merger(数据合并①减少数据量 ② 提高执行效率)
4、reduce(汇总,聚合的过程)
5、output(hdfs)
3.Shuffle过程中涉及两次排序:
1.快速排序:
sort阶段,环形缓冲区达到80%时,对数据进行快速排序,排序按照key的索引进行字典顺序排序,然后开始进行溢写,从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件 。
(1)算法步骤
1.从数列中挑出一个元素,称为 “基准”(pivot);
2.重新排序数列,所有元素比基准值小的摆放在基准前面,所有元素比基准值大的摆在基准的后面(相同的数可以到任一边)。
在这个分区退出之后,该基准就处于数列的中间位置。这个称为分区(partition)操作;
3. 递归地(recursive)把小于基准值元素的子数列和大于基准值元素的子数列排序;
2.归并排序
在小的文件merge成大文件时采用,归并排序在map端和reduce端都可能出现。
算法步骤:
1.申请空间,使其大小为两个已经排序序列之和,该空间用来存放合并后的序列;
2.设定两个指针,最初位置分别为两个已经排序序列的起始位置;
3.比较两个指针所指向的元素,选择相对小的元素放入到合并空间,并移动指针到下一位置;
4.重复步骤 3 直到某一指针达到序列尾;
5.将另一序列剩下的所有元素直接复制到合并序列尾。
MR流程中涉及的快排、归并排序,可参考以下文章
图文详解—十大经典排序算法_珞沫的博客-CSDN博客_排序算法
三、海量数据处理实例分析
1、海量日志数据,提取出某日访问百度次数最多的那个IP。
解决方案:首先是将这一天,并且是访问百度的日志中的IP取出来,逐个写入到一个大文件中。注意到IP是32位的,最多有个2^32个IP。同样可以采用映射的方法,比如模1000,把整个大文件映射为1000个小文件,再找出每个小文中出现频率最大的IP(可以采用hash_map进行频率统计,然后再找出频率最大的几个)及相应的频率。然后再在这1000个最大的IP中,找出那个频率最大的IP,即为所求。
2、有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16字节,内存限制大小是1M。返回频数最高的100个词。
解决方案:顺序读文件中,对于每个词x,取hash(x)%5000,然后按照该值存到5000个小文件(记为x0,x1,…x4999)中。这样每个文件大概是200k左右。如果其中的有的文件超过了1M大小,还可以按照类似的方法继续往下分,直到分解得到的小文件的大小都不超过1M。 对每个小文件,统计每个文件中出现的词以及相应的频率(可以采用trie树/hash_map等),并取出出现频率最大的100个词(可以用含100个结点的最小堆),并把100个词及相应的频率存入文件,这样又得到了5000个文件。下一步就是把这5000个文件进行归并(类似与归并排序)的过程了。
3、有10个文件,每个文件1G,每个文件的每一行存放的都是用户的query,每个文件的query都可能重复。要求你按照query的频度排序。
方案1: 顺序读取10个文件,按照hash(query)%10的结果将query写入到另外10个文件(记为)中。这样新生成的文件每个的大小大约也1G(假设hash函数是随机的)。 找一台内存在2G左右的机器,依次对用hash_map(query, query_count)来统计每个query出现的次数。利用快速/堆/归并排序按照出现次数进行排序。将排序好的query和对应的query_cout输出到文件中。这样得到了10个排好序的文件(记为)。
对这10个文件进行归并排序(内排序与外排序相结合)。
方案2: 一般query的总量是有限的,只是重复的次数比较多而已,可能对于所有的query,一次性就可以加入到内存了。这样,我们就可以采用trie树/hash_map等直接来统计每个query出现的次数,然后按出现次数做快速/堆/归并排序就可以了。
方案3: 与方案1类似,但在做完hash,分成多个文件后,可以交给多个文件来处理,采用分布式的架构来处理(比如MapReduce),最后再进行合并。
4、 给定a、b两个文件,各存放50亿个url,每个url各占64字节,内存限制是4G,让你找出a、b文件共同的url。
方案1:可以估计每个文件安的大小为5G×64=320G,远远大于内存限制的4G。所以不可能将其完全加载到内存中处理。考虑采取分而治之的方法。通读文件a,对每个url求取hash(url)%1000,然后根据所取得的值将url分别存储到1000个小文件(记为a0,a1,…,a999)中。这样每个小文件的大约为300M。通读文件b,采取和a相同的方式将url分别存储到1000小文件(记为b0,b1,…,b999)。这样处理后,所有可能相同的url都在对应的小文件(a0vsb0,a1vsb1,…,a999vsb999)中,不对应的小文件不可能有相同的url。然后我们只要求出1000对小文件中相同的url即可。求每对小文件中相同的url时,可以把其中一个小文件的url存储到hash_set中。然后遍历另一个小文件的每个url,看其是否在刚才构建的hash_set中,如果是,那么就是共同的url,存到文件里面就可以了。
方案2:如果允许有一定的错误率,可以使用Bloom filter,4G内存大概可以表示340亿bit。将其中一个文件中的url使用Bloom filter映射为这340亿bit,然后挨个读取另外一个文件的url,检查是否与Bloom filter,如果是,那么该url应该是共同的url(注意会有一定的错误率)。
5、腾讯面试题:给40亿个不重复的unsigned int的整数,没排过序的,然后再给一个数,如何快速判断这个数是否在那40亿个数当中?
方案1:申请512M的内存,一个bit位代表一个unsigned int值。读入40亿个数,设置相应的bit位,读入要查询的数,查看相应bit位是否为1,为1表示存在,为0表示不存在。
方案2:因为2^32为40亿多,所以给定一个数可能在,也可能不在其中;这里我们把40亿个数中的每一个用32位的二进制来表示假设这40亿个数开始放在一个文件中。然后将这40亿个数分成两类: 1.最高位为0 2.最高位为1 并将这两类分别写入到两个文件中,其中一个文件中数的个数<=20亿,而另一个>=20亿(这相当于折半了);与要查找的数的最高位比较并接着进入相应的文件再查找再然后把这个文件为又分成两类: 1.次最高位为0 2.次最高位为1,并将这两类分别写入到两个文件中,其中一个文件中数的个数<=10亿,而另一个>=10亿(这相当于折半了); 与要查找的数的次最高位比较并接着进入相应的文件再查找。 ……. 以此类推,就可以找到了,而且时间复杂度为O(logn),方案2完。
附:这里,再简单介绍下,位图方法: 使用位图法判断整形数组是否存在重复 判断集合中存在重复是常见编程任务之一,当集合中数据量比较大时我们通常希望少进行几次扫描,这时双重循环法就不可取了。位图法比较适合于这种情况,它的做法是按照集合中最大元素max创建一个长度为max+1的新数组,然后再次扫描原数组,遇到几就给新数组的第几位置上1,如遇到5就给新数组的第六个元素置1,这样下次再遇到5想置位时发现新数组的第六个元素已经是1了,这说明这次的数据肯定和以前的数据存在着重复。这种给新数组初始化时置零其后置一的做法类似于位图的处理方法故称位图法。它的运算次数最坏的情况为2N。如果已知数组的最大值即能事先给新数组定长的话效率还能提高一倍。
四、YARN
1.YARN核心组件
1.RM(ResourceManager)资源管理器
RM是一个全局的资源管理器,负责整个系统的资源管理和分配。
它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM),通俗讲是用于管理NodeManager节点的资源,包括cup、内存等。
2.NodeManager(NM)+ (DataNode 硬盘 CPU 内存)
NM是每个节点上的资源和任务管理器,一方面,它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,它接收并处理来自AM的Container启动/停止等各种请求。
3.Applications Manager(应用程序管理器)new ApplicationMaster 监控所有job的任务运行结果的监控 –> 客户端
负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。是AM的AM。
4.ApplicationMaster(AM)job过程监控
ApplicationMaster 管理在YARN内运行的每个应用程序实例。每个应用程序对应一个ApplicationMaster。ApplicationMaster 负责协调来自 ResourceManager 的资源,并通过 NodeManager 监视容器的执行和资源使用(CPU、内存等的资源分配),通俗讲是管理发起的任务,随着任务创建而创建,任务的完成而结束。
5.Container(重要) –> 资源接口 –> map reduce mapContainer reduceContainer spark –> sparkContainer
Container是YARN中的资源抽象,它封装了NodeManager节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示的。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。(Container的具有优先级别,包括:队列 普通用户 VIP会员, 权重高的一定是先执行)
2.YARN工作流程
概括:
步骤1:用户将应用程序提交到 ResourceManager 上;
步骤2:ResourceManager 为应用程序 ApplicationMaster 申请资源,并与某个 NodeManager 通信启动第一个 Container,以启动ApplicationMaster;
步骤3:ApplicationMaster 与 ResourceManager 注册进行通信,为内部要执行的任务申请资源,一旦得到资源后,将于 NodeManager 通信,以启动对应的 Task;
步骤4:所有任务运行完成后,ApplicationMaster 向 ResourceManager 注销,整个应用程序运行结束。
详细流程:
– client向RM提交应用程序,其中包括启动该应用的ApplicationMaster的必须信息,例如ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等
– ResourceManager启动一个NodeManager的一个container用于运行ApplicationMaster
– 启动中的ApplicationMaster向ResourceManager注册自己,启动成功后与RM保持心跳
– ApplicationMaster向ResourceManager发送请求,申请相应数目的container
– 申请成功的container,由ApplicationMaster进行初始化。container的启动信息初始化后,AM与对应的NodeManager通信,要求NM启动container
– NM启动container
– container运行期间,ApplicationMaster对container进行监控。container通过RPC协议向对应的AM汇报自己的进度和状态等信息
– 应用运行结束后,ApplicationMaster向ResourceManager注销自己,并允许属于它的container被收回
3.YARN默认的调度器分别是什么,及他们区别?
YARN调度器主要分为三类:
– 1、FIFO :先进先出,同一个队列中现先提交的先执行,后面等待
– 2、Capacity Scheduler(容量调度器): 允许创建多个任务队列,每个队列使用所有资源的一部分。多个任务队列可 以同时执行。但是一个队列内部还是先进先出。
– 3、Fair Scheduler(公平调度): 第一个程序在启动时可以占用其他队列资源(100%占用),当其他队列有 任务提交时,占用资源的队列需要将资源还给该任务。还资源的时候,效率 比较慢。
五、Zookeeper
参考我的这篇文章:Zookeeper_GoAl的博客-CSDN博客
六、Hive
Hive:基于Hadoop一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的 sql 查询功能,可以将 sql 语句转换为 MapReduce 任务进行运行。适合数据仓库的统计分析。
1、Hive架构
2、三大组件
1、用户接口:包括 CLI、JDBC/ODBC、WebGUI。
-
CLI(command line interface)为 shell 命令行,进行交互式执行SQL:直接与Driver进行交互。CLI启动的时候,会同时启动一个 Hive 副本
-
JDBC/ODBC 驱动是 Hive 的 JAVA 实现,作为JAVA的API:JDBC是通过Thift Server来接入,然后发送给Driver
-
WebGUI 是通过浏览器访问 Hive。
-
HiveServer2基于Thrift, 允许远程客户端使用多种编程语言如Java、Python向Hive提交请求
2、Metastore:存储元数据
-
Hive 将元数据存储在数据库中,如MySQL、derby
-
Hive 中的元数据包括表的名字、表的列、分区及其属性、表的属性(是否为外部表等)、表的数据所在目录等。
3、Driver(驱动模块):包括解释器、编译器、优化器、执行器
-
通过该模块对输入进行解析编译,对需求的计算进行优化,然后按照指定的步骤进行(通常启动多个MR任务来执行)
-
解释器、编译器、优化器、执行器完成 HQL 查询语句从词法分析、语法分析、编译、优化以及查询计划的生成。生成的查询计划存储在 HDFS 中,并在随后由 MapReduce 调用执行
3.Hive 内部表和外部表区别?:
(1)是否直接通过external
(2)删除外部表,元数据得到删除,但是数据不会真正删除,针对内部表,元数据和数据都被删除
(3)在导入数据到外部表,数据并没有移动到自己的数据仓库目录下,也就是说外部表中的数据并不是由它自己来管理的!而内部表则不一样
注意:内部表和外部表场景:
内部表:逻辑处理的中间过程生成的中间表,或者一些临时表,直接删除即可
外部表:可以用户存储一些日志信息,数据不会被删除
4.Hive与关系型数据库区别:
Hive表示纯逻辑,只有表定义,不存数据。读多写少,不支持数据的改写和删除
5.order by ,sort by , distribute by , cluster by 的区别?
1.Order by会对所给的全部数据进行全局排序,只启动一个reduce来处理。
Sort by是局部排序,它可以根据数据量的大小启动一到多个reducer来工作,并在每个reduce中单独排序。
3.Distribute by 类似于mr中的partition,采用hash算法,在map端将查询结果中hash值相同的结果分发到对应的reduce中,结合sort by使用。
4.Cluster by 可以看作是distribute by 和sort by的结合,当两者后面所跟的字段列名相同时,效果就等同于使用cluster by,但是cluster by最终的结果只能是降序,无法指定升序和降序。
注:不带count,sum这些聚合函数的,都不会走mapreduce。
6.Hive最全函数介绍:
UDF:普通函数:1对1的关系,select语句,例如:数据格式
UDAF:聚合函数:多对1的关系,结合group by联合使用
UDTF:生成函数:1对多
用 UDF 函数解析公共字段;用 UDTF 函数解析事件字段。
自定义 UDF:继承 UDF,重写 evaluate 方法
自定义 UDTF:继承自 GenericUDTF,重写 3 个方法:initialize(自定义输出的列名和类型),
process(将结果返回 forward(result),close 。
为什么要自定义 UDF/UDTF,因为自定义函数,可以自己埋点 Log 打印日志,出错或者数据异常,方便调试。
Hive窗口函数:
Hive命令行窗口查看函数定义 :desc function 函数名;
HIve窗口函数实战可参考:Hive开窗函数总结_Abysscarry的博客-CSDN博客_hive的开窗函数
Hive常用日期函数
unix_timestamp:返回当前或指定时间的时间戳
from_unixtime:将时间戳转为日期格式
current_date:当前日期
current_timestamp:当前的日期加时间
to_date:抽取日期部分
year:获取年
month:获取月
day:获取日
hour:获取时
minute:获取分
second:获取秒
weekofyear:当前时间是一年中的第几周
dayofmonth:当前时间是一个月中的第几天
months_between: 两个日期间的月份
add_months:日期加减月
datediff:两个日期相差的天数
date_add:日期加天数
date_sub:日期减天数
last_day:日期的当月的最后一天
日期处理函数实例
1)date_format函数(根据格式整理日期)
hive (gmall)> select date_format(‘2019-02-10′,’yyyy-MM’);
2019-02
2)date_add函数(加减日期)
hive (gmall)> select date_add(‘2019-02-10’,1);
2019-02-11
hive (gmall)> select date_add(‘2019-02-10’,-1);
2019-02-09
3)next_day函数
(1)取当前天的下一个周一
hive (gmall)> select next_day(‘2019-02-12′,’MO’);
2019-02-18
说明:星期一到星期日的英文(Monday,Tuesday、Wednesday、Thursday、Friday、Saturday、Sunday)
(2)取当前周的周一
hive (gmall)> select date_add(next_day(‘2019-02-12′,’MO’),-7);
2019-02-11
4)last_day函数(求当月最后一天日期)
hive (gmall)> select last_day(‘2019-02-10’);
2019-02-28
常用取整函数
round: 四舍五入
ceil: 向上取整
floor: 向下取整
常用字符串操作函数
upper: 转大写
lower: 转小写
length: 长度
trim: 前后去空格
lpad: 向左补齐,到指定长度
rpad: 向右补齐,到指定长度
regexp_replace: SELECT regexp_replace(‘100-200’, ‘(\\d+)’, ‘num’) ;
使用正则表达式匹配目标字符串,匹配成功后替换!
集合操作
size: 集合中元素的个数
map_keys: 返回map中的key
map_values: 返回map中的value
array_contains: 判断array中是否包含某个元素
sort_array: 将array中的元素排序
7.Hive的数据管理:
1)内表外表(2)Partition辅助查询,缩小查询范围,加快数据检索速度(3)Bucket 控制reduce数量
hive分区
参考我的这篇文章:CSDN
hive分桶:
分桶是将整个数据内容按照某列属性值去hash值进行区分,对取得的hash再做模运算(columnValue.hashCode % 桶数),具有相同结果的数据进入同一个文件中。
8.Hive实战数据分析
参考我的这两篇文章:Hive实战_GoAl的博客-CSDN博客
七、Sqoop实战
1 Mysql数据导入HDFS上.
1. 全量导入:
将mysql表中全部数据都导入HDFS,如果HDFS中存在这个目录的话就会报错,默认存储的HDFS目录是 /user/root/XXX.
bin/sqoop import (在sqoop的安装目录内,import表名是导入)
–connect jdbc:mysql://192.168.52.130:3306/userdb (连接:协议:数据库类型://ip地址:端口号/数据库)
–username root (用户名 root)
–password 123456 (密码 123456)
–table emp (表 emp)
–m 1 (–num-mappers:使用几个mapper,写1就可以)
若要导入到HDFS指定目录下,并指定字段之间的分隔符:
使用参数 –target-dir 来指定导出目的地,
使用参数 –delete-target-dir 来判断导出目录是否存在,如果存在就删掉.
使用参数 –fields-terminated-by ‘\t’ 使用”\t”来切割字段,sqoop默认是使用’,’逗号进行分割的.
bin/sqoop import (在sqoop的安装目录内,import表名是导入)
–connect jdbc:mysql://192.168.52.130:3306/userdb (连接:协议:数据库类型://ip地址:端口号/数据库)
–username root (用户名 root)
–password 123456 (密码 123456)
–table emp (表 emp)
–delete-target-dir (如果指定目录存在就删除它)
–target-dir /sqoop/emp (导入到指定目录)
–fields-terminated-by ‘\t’ (指定字段分割符为\t)
–m 1 (–num-mappers:使用几个mapper,写1就可以)
2.增量导入:
将数据库中某一字段,增加的导入,在HDFS上单独形成一个文件.
注意:增量导入的时候,一定不能加参数–delete-target-dir否则会报错
bin/sqoop import
–connect jdbc:mysql://192.168.52.130:3306/myhive
–username root
–password 123456
–table emp
–incremental append (表明增量导入)
–check-column id (检查哪个字段,这里检查的是mysql数据库表中的id字段)
–last-value 4 (id字段最后一个id是4,那增量导入的话就是从id=5开始往后导入)
–m 1
3.减量导入:
设置where条件,通过条件可以判断减少的数据或增加的数据,控制更加灵活一些,例如可以通过表创建时间来判断数据是哪一天生成的等,每个表均设置3个字段,create_time(表创建时间),update_time(表更新时间),is_delete(是否删除)
注意:where条件的地方需要使用“”双引号引起来,否则where条件失效
bin/sqoop import \
–connect jdbc:mysql://192.168.52.130:3306/userdb \
–username root \
–password admin \
–table emp \
–incremental append \
–where “create_time > ‘2019-02-14 00:00:00′ and is_delete=’1’ and create_time < ‘2019-02-14 23:59:59′” \
–target-dir /sqoop/incement \
–check-column id \
–m 1
4.SQL语句查找导入HDFS
我们还可以通过 –query参数来指定我们的sql语句,通过sql语句来过滤我们的数据进行导入
bin/sqoop import
–connect jdbc:mysql://192.168.52.130:3306/userdb
–username root
–password 123456
–delete-target-dir
–query ‘select phno from emp_conn where 1=1 and $CONDITIONS’
–target-dir /sqoop/emp_conn
–m 1
注意事项:
使用sql语句来进行查找是不能加参数–table
并且必须要添加where条件,
并且where条件后面必须带一个$CONDITIONS 这个字符串,
并且这个sql语句必须用单引号,不能用双引号.
2. Mysql数据导入Hive上.
1.将我们mysql表当中的数据直接导入到hive表中的话,需要将hive的一个叫做hive-exec-1.1.0-cdh5.14.0.jar的jar包拷贝到sqoop的lib目录下
cp /export/servers/hive-1.1.0-cdh5.14.0/lib/hive-exec-1.1.0-cdh5.14.0.jar /export/servers/sqoop-1.4.6-cdh5.14.0/lib/
2.将我们mysql当中的数据导入到hive表当中来,需要准备hive数据库与表
hive (default)> create database sqooptohive;
hive (default)> use sqooptohive;
hive (sqooptohive)> create external table emp_hive(id int,name string,deg string,salary int ,dept string) row format delimited fields terminated by ‘\t’;
3.导入语句
bin/sqoop import
–connect jdbc:mysql://192.168.52.130:3306/userdb
–username root
–password 123456
–table emp
–fields-terminated-by ‘\t’ (字段之间的分隔符)
–hive-import (将数据从mysql数据库中导入到hive表中)
–hive-table qooptohive.emp_hive (后面接要创建的hive表,数据库中的某个表,使用”.”连接)
–hive-overwrite (覆盖掉在hive表中已经存在的数据)
–delete-target-dir
–m 1
注意:我们还可以导入关系表到hive并自动创建hive表,导入
bin/sqoop import
–connect jdbc:mysql://192.168.52.130:3306/userdb
–username root
–password 123456
–table emp_conn
–hive-import
–hive-database sqooptohive (–hive-database 后面直接接数据库名)
–m 1
3.Sqoop的数据导出
将数据从HDFS把文件导出到mysql数据库,导出前,目标表必须存在于目标数据库中。
数据是在HDFS当中的如下目录/sqoop/emp,数据内容如下
1201,gopal,manager,50000,TP,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1
1202,manisha,Proof reader,50000,TP,2018-06-15 18:54:32.0,2018-06-17 20:26:08.0,1
1203,khalil,php dev,30000,AC,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1
1204,prasanth,php dev,30000,AC,2018-06-17 18:54:32.0,2018-06-17 21:05:52.0,0
1205,kranthi,admin,20000,TP,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1
1.创建mysql表
CREATE TABLE `emp_out` (
`id` INT(11) DEFAULT NULL,
`name` VARCHAR(100) DEFAULT NULL,
`deg` VARCHAR(100) DEFAULT NULL,
`salary` INT(11) DEFAULT NULL,
`dept` VARCHAR(10) DEFAULT NULL,
`create_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`is_delete` BIGINT(20) DEFAULT ‘1’
) ENGINE=INNODB DEFAULT CHARSET=utf8;
2.执行导出命令:通过export来实现数据的导出,将hdfs的数据导出到mysql当中去
bin/sqoop export
–connect jdbc:mysql://192.168.52.130:3306/userdb
–username root
–password 123456
–table emp_out
–export-dir /sqoop/emp
–input-fields-terminated-by “,”
3.验证mysql表数据
八、Hbase–分布式列存储NOSQL数据库
1、Hbase数据存储在hdfs,少量存内存
2、hbase适合海量稀疏数据存储
hbase属于nosql数据库,列存储
3、与传统关系型数据库对比:
行存储:传统关系型数据mysql、oracle
优点:保证数据完整性,写入检查
缺点:读的过程会产生冗余信息
列存储:Nosql数据库
优点:读的过程不会产生冗余
缺点:写入效率差,不保证完整性
4、Hbase优点:
(1)存储海量数据
(2)快速随机访问
(3)进行大量的改写操作
Hbase的优点及应用场景:
- 半结构化或非结构化数据:
对于数据结构字段不够确定或杂乱无章非常难按一个概念去进行抽取的数据适合用HBase,因为HBase支持动态添加列。
- 记录很稀疏:
RDBMS的行有多少列是固定的。为null的列浪费了存储空间。HBase为null的Column不会被存储,这样既节省了空间又提高了读性能。
- 多版本号数据:
依据Row key和Column key定位到的Value能够有随意数量的版本号值,因此对于须要存储变动历史记录的数据,用HBase是很方便的。比方某个用户的Address变更,用户的Address变更记录也许也是具有研究意义的。
- 仅要求最终一致性:
对于数据存储事务的要求不像金融行业和财务系统这么高,只要保证最终一致性就行。(比如HBase+elasticsearch时,可能出现数据不一致)
- 高可用和海量数据以及很大的瞬间写入量:
WAL解决高可用,支持PB级数据,put性能高
- 适用于插入比查询操作更频繁的情况。比如,对于历史记录表和日志文件。(HBase的写操作更加高效)
- 业务场景简单:
不需要太多的关系型数据库特性,列入交叉列,交叉表,事务,连接等。
Hbase的缺点:
- 单一RowKey固有的局限性决定了它不可能有效地支持多条件查询[2]
- 不适合于大范围扫描查询
- 不直接支持 SQL 的语句查询
5、Hbase结构:rowkey -> Column Family -> Column Qualifer列族具体列
rowkey行键
table的主键,table中的记录按照rowkey 的字典序进行排序
Column Family列族
hbase表中的每个列,都归属与某个列族。列族是表的schema的一部分(而列不是),必须在使用表之前定义。
Timestamp时间戳
每次数据操作对应的时间戳,可以看作是数据的version number版本号
Column列
列族下面的具体列
属于某一个ColumnFamily,类似于我们mysql当中创建的具体的列
cell单元格
由{row key, column( =<family> + <label>), version} 唯一确定的单元
cell中的数据是没有类型的,全部是以字节数组进行存储
6、Hbase逻辑模型:三维有序
Rowkey -> Column Family -> Column Qualifier -> Timestamp
rowkey行(正序, 从小到大)、column列(正序从小到大)、timestamp时间(倒叙从大到小)
面试点:为什么说hbase表的列族不宜超过3个?
a、列族数量决定store, 一个store至少有一个memstore,而memstore占内存
b、如果列族越多的话,造成更多的flush会产生更多IO
flush的最小单位是region, 一个region中的某个列族做flush , 其余的列族也会做flush
频繁的flush产生更多的storeFile,storeFile增多就会产生更多compaction操作
compaction操作和flush都是重IO操作
c、列族过多,split操作会出现数据不均匀的情况
散列原则:
前提:服务器的配置不是很好并且对查询速度要求不是很高
rowkey设计为:random+时间
目的:防止某一个或某几个regionserver成为热点
有序原则:
前提:服务器本身的配置要高一些, 会出现一个或是多个region热点效应
rowkey设计为:时间+random
Hbase shell 基础
list_namespace 查看所有数据,类似于show database;
scan ‘hbase:meta’ 查看元数据信息
–创建表 ‘cf1′,’cf2’ 表示列族
create ‘badou_20_a’,’cf1′,’cf2′
— 查看表的结构
describe ‘badou_20_a’
— 删除cf1列族
alter ‘badou_20_a’,{NAME=>’cf1′,METHOD=>’delete’}
— 查看存在哪些表
list
exists ‘badou_20_a’
— 保留两个版本的数据, IN_MEMORY数据保存到内存中
alter ‘badou_20_a’,{NAME=>’cf2′,VERSIONS=>2,IN_MEMORY=>true}
— 删除表
disable ‘badou_20_a’ : 将表转换为去激活的状态
drop ‘badou_20_a’ : 删除表
— 激活表
enable ‘badou_20_a’
— 插入记录
put ‘badou_20′,’1003′,’cf2:name’,’root’
put ‘badou_20′,’1004′,’cf2:name’,’scott’
— 获取记录
scan ‘badou_20’ 注意 hbase表的数据量特别大的时候, scan 慎用
— 根据rowkey 查询
get ‘badou_20′,’1001’
— 根据列族获取
get ‘badou_20′,’1001′,{COLUMN=>’cf2:name’}
— 根据列族和指定的时间戳进行获取
get ‘badou_20′,’1001′,{COLUMN=>’cf2:name’,TIMESTAMP=>1615465406738}
— 查询表的记录
count ‘badou_20’
— 强制刷出内存的数据到HDFS
flush ‘badou_20’
— 清除表的数据,保留表的结构
truncate ‘order’
9、hbase shell 进阶
— 修改 badou_20 版本为2
put ‘badou_20′,’1001′,’cf2:name’,’max’
put ‘badou_20′,’1001′,’cf2:name’,’avg’
alter ‘badou_20′,{NAME=>’cf2’,VERSIONS=>2}
如何显示两个版本?
scan ‘badou_20’,{VERSIONS=>2}
get ‘badou_20′,’1001′,{COLUMN=>’cf2:name’,VERSIONS=>2}
get ‘badou_20′,’1001′,{COLUMN=>’cf2’,VERSIONS=>2}
— 修改表的版本
alter ‘badou_20′,{NAME=>’cf2’,VERSIONS=>3}
alter ‘badou_20′,{NAME=>’cf2’,VERSIONS=>4}
— TTL 按照规定的时间对数据进行超时间设置
TTL => ‘FOREVER’ 表示数据永不过期
TTL => ’60 SECONDS 表示一分钟之前的数据会过期
create ‘tt_table’,{NAME=>’cf1′,TTL=>60}
1616311193758
put ‘tt_table’,’rowkey001′,’cf1:age’,’30’,1616311993900
九、Flume实战
1、采集目录到HDFS
采集需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去
根据需求,首先定义以下3大要素
采集源,即source——监控文件目录 : spooldir
下沉目标,即sink——HDFS文件系统 : hdfs sink
source和sink之间的传递通道——channel,可用file channel 也可以用内存channel
配置文件编写:
#定义三大组件的名称
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# 配置source组件
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /home/hadoop/logs/
agent1.sources.source1.fileHeader = false
#配置拦截器
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
# 配置sink组件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
Channel参数解释:
capacity:默认该通道中最大的可以存储的event数量
trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量
keep-alive:event添加到通道中或者移出的允许时间
2、采集文件到HDFS
采集需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs
根据需求,首先定义以下3大要素
采集源,即source——监控文件内容更新 : exec ‘tail -F file’
下沉目标,即sink——HDFS文件系统 : hdfs sink
Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel
配置文件编写:
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log
agent1.sources.source1.channels = channel1
#configure host for source
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
# Describe sink1
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
更多source和sink组件:
Flume支持众多的source和sink类型,详细手册可参考官方文档
Flume 1.9.0 User Guide — Apache Flume
十、Kafka介绍
1.Apache Kafka简介
Kakfa最初由Linkedin公司开发,使用 Scala 编写,拥有高吞吐、可持久化、可水平扩展的基于发布/订阅模式的分布式消息队列,支持分区策略、多副本策略,基于zookeeper协调的分布式消息系统,主要应用于大数据的实时或离线数据处理、日志收集以及实时指标监控等领域。
2.Kafka常用术语
消息:message,消息是kafka的基本数据单元,代表着一条一条的数据,为了提高网络和存储的利用率,生产者会批量发送消息到Kafka,并在发送之前对消息进行压缩。
主题:topic,主题是kafka对消息的分类,是一个逻辑概念,可以看作消息集合,用于接收不同业务的消息。
分区:partition,类似数据库的分区表,通常topic下会多个分区,每个分区内的数据是有序的,同一个topic的多个分区kafka不保证消息的顺序性,一个分区在逻辑上对应一个Log,对应磁盘上的一个文件夹。
偏移量:offset,偏移量是表示消息在分区中的位置,kafka存储的文件是按照offset.log的格式来命名的,便于快速查找。
副本:replicas,副本是针对分区而言的,kafka对消息做了冗余备份,目的就是为了容灾,每个分区可以指定多个副本来冗余数据,分为leader partition和follower partition,只有leader partition对外提供服务,follower partition单纯是从leader partition同步数据,因此会存在多份相同的数据。
生产者:producer,生产者是kafka集群的上游,顾名思义就是往kafka输入数据的客户端。
消费者:comsumer,消费者是kafka集群的下游,与生产者相辅相成,kafka类似一个仓库,生产者负责生产消息往仓库放,自然得有消费者从仓库里拿消息,不然仓库容易爆满。
消费者组:Comsumer Group,简称CG,这个比较容易理解,就是将多个消费者捆绑起来,组团消费消息,一个Consumer只能属于一个Consumer Group,Kafka还通过Consumer Group实现了消费者的水平扩展和故障转移。
节点:broker,一个broker就是一个kafka server实例,多个broker组成kafka集群,主要用于接收生产者发送过来的消息,写入磁盘,同时可以接收消费者和其他broker的请求。
重新负载均衡:rebalance,当消费者组的消费者实例出现变化时,例如新增消费者或者减少消费者,都会触发kafka的Rebalance机制,这个过程比较耗性能,要尽量避免这个过程被触发。
Kafka架构
我们把架构分主从架构和对等架构,主从架构就是分为管理节点和工作节点,职责不同,如HDFS 、spark、flink;对等架构则不区分节点属性,所有的实例职责都是一样的,kafka的架构有点类似于对等架构,但又不完全是。Kafka的设计理念之一就是同时提供离线处理和实时处理。
Kafka ACK消息确认机制有三个值,分别为1,0和-1,默认是1,对应不同的状态:
-
ack=1,意味着producer要等待leader成功收到数据并得到确认,才发送下一条message,安全性较高但是性能相对较低。
-
ack=0,意思就是说,我只管发送消息,不用你给我回复,成就成,不成我也不管,这种策略的性能是最高的,但是容易丢失数据。
-
ack=-1,这种情况下,生产者只有收到所有副本写入成功的通知后,才会认为消息写入成功,安全性最高,但是性能是三者里面最差的。
kafka分区和副本机制
kafka分区机制:
实现kafka高吞吐量的重要手段,实现流量分发和负载均衡,试想一下,如果所有的消息都往同一个数据写,对于服务器来说会造成极高的负载,特别是出现热点数据的时候容易崩溃,对于多个生产者和多个消费者来说,只有一个分区可以用于生产和消费,这显然是非常受限的。
kafka提供了三种分区策略:
-
轮询策略:Round-robin,轮询策略是Kafka默认的分区策略,根据主题分区数量从头到尾进行轮询,目的就是为了将消息均匀地分布在分区中,保证消息最大限度地被平均分配到所有分区上。
-
随机策略:Range Strategy,所谓随机就是我们随意地将消息放置到任意一个分区上,随机分发,这样有可能会造成消息分发不均匀,相比之下,轮询策略显得更加合理,旧版本默认是用随机策略,新版本默认用的是轮询策略。
-
按key分发策略:顾名思义就是根据消息的key指定分区写入,这种方式主观性比较强,相对比较灵活。
除此之外,kafka支持自定义分区器,实现更多复杂的逻辑处理消息。
kafka副本机制:
为了提供数据冗余、数据备份的安全策略,等同于备份,实际上,基本所有的分布式消息队列都会存在副本机制,不光是消息队列,HDFS也是如此。
前面在kafka常用术语中说到,Kafka 是有主题概念的,主题下划分成若干个分区,副本是分区的逻辑概念,分区可以指定多个副本。本质上副本就是一个只能不断追加的日志文件,在实际的生产中,为了保障数据安全,通常会配置多个副本,根据算法分散在不同的broker上,一份数据(leader和副本)不会同时出现在一台服务器上,这样当服务器出现故障时,能够最大程度保证数据不丢失,如下图。
其中leader partition和 follower partition的工作原理如下,正常情况下,只有leader partition对外提供服务,follower partition负责从leader partition拉取数据,当leader发送故障时,follower拥有被选举为新leader的权利。
3.Kafka的优势
-
支持数据离线和实时处理
-
能保证消息的可靠性传递
-
支持消息的持久化存储,并通过多副本分布式的存储方案来保证消息的容错
-
高吞吐率,每秒处理百万级的消息量
-
高并发,支持数千个客户端同时读写
-
支持在线水平扩展
kafka为什么能实现读写都这么快呢?
答:离不开kafka顺序读写机制和零拷贝数据传输,减少了寻址的时间消耗,降低了读写延迟,同时有利于快到定位消息偏移量,零拷贝机制可以提高数据传输的效率,减少IO资源的占用。
十一、Spark
1.Spark介绍
1.1 什么是spark
- 基于内存的分布式计算框架
- 只负责算 不负责存
- spark 在离线计算 功能上 类似于mapreduce的作用
1.2 为什么用spark
-
MapReduce的缺点
- 运行速度慢 (没有充分利用内存)
- 接口比较简单,仅支持Map Reduce
- 功能比较单一 只能做离线计算
- 不适合迭代计算(如机器学习、图计算等等),交互式处理(数据挖掘)
- 不适合流式处理(点击日志分析)
-
需要一种灵活的框架可同时进行批处理、流式计算、交互式计算
- 内存计算引擎,提供cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销
- DAG引擎,减少多次计算之间中间结果写到HDFS的开销
- 使用多线程模型来减少task启动开销,shuffle过程中避免不必要的sort操作以及减少磁盘IO
- spark的缺点是:吃内存,不太稳定
-
Spark优势
- 速度快(比mapreduce在内存中快100倍,在磁盘中快10倍)spark中的job中间结果可以不落地,可以存放在内存中。 mapreduce中map和reduce任务都是以进程的方式运行着,而spark中的job是以线程方式运行在进程中。
- 易用性(可以通过java/scala/python/R开发spark应用程序)
- 通用性(可以使用spark sql/spark streaming/mlib/Graphx)
- 兼容性(spark程序可以运行在standalone/yarn/mesos)
2. RDD 的概念
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合.
-
Dataset:一个数据集,简单的理解为集合,用于存放数据的
-
Distributed:它的数据是分布式存储,并且可以做分布式的计算
-
Resilient:弹性的
- 它表示的是数据可以保存在磁盘,也可以保存在内存中
- 数据分布式也是弹性的
- 弹性:并不是指他可以动态扩展,而是容错机制。
- RDD会在多个节点上存储,就和hdfs的分布式道理是一样的。hdfs文件被切分为多个block存储在各个节点上,而RDD是被切分为多个partition。不同的partition可能在不同的节点上
- spark读取hdfs的场景下,spark把hdfs的block读到内存就会抽象为spark的partition。
- spark计算结束,一般会把数据做持久化到hive,hbase,hdfs等等。我们就拿hdfs举例,将RDD持久化到hdfs上,RDD的每个partition就会存成一个文件,如果文件小于128M,就可以理解为一个partition对应hdfs的一个block。反之,如果大于128M,就会被且分为多个block,这样,一个partition就会对应多个block。
-
所有spark中对数据的操作最终都会转换成RDD的操作
- spark sql
- spark streaming
- spark ml 、spark mllib
-
RDD是不可变的
- 父RDD 生成一个子 RDD 父RDD的状态不会变化
- 从容错的角度去做这样的设计
2.1 RDD的创建
-
创建RDD之前先要有spark context
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
-
通过内存中的数据创建RDD
- data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
- data = [1, 2, 3, 4, 5]
-
创建RDD时可以指定 partition的数量(RDD会分成几份)一个partition会对应一个task,根据CPU的内核数来指定partition (1核对应2~4个partition)
-
从文件创建RDD 可以是HDFS支持的任何一种存储介质
- 可以从 hdfs、 数据库(mysql) 、本地文件系统、 hbase 这些地方加载数据创建RDD
- rdd = sc.textFile(‘file:///root/tmp/test.txt’)
2.2 RDD的三类算子
-
transformation
-
所有的transformation 都是延迟执行的,只要不调用action 不会执行,只是记录过程
-
transformation 这一类算子返回值还是 rdd
-
rdd.transformation 还会得到新的rdd
-
map(func) 将func函数作用到数据集的每一个元素上,生成一个新的RDD返回
-
filter(func) 选出所有func返回值为true的元素,生成一个新的RDD返回
-
flatMap(func) 会先执行map的操作,再将所有对象合并为一个对象
-
union() 取并集
-
intersection() 交集
-
groupByKey() 以元组中的第0个元素作为key,进行分组,返回一个新的RDD 结果中 value是一个Iterable
-
reducebykey(func) 将key相同的键值对,按照Function进行计算
-
sortbykey(ascending=True, numPartitions=None, keyfunc=<function RDD.>)Sorts this RDD, which is assumed to consist of (key, value) pairs.按照关键词排序,排完后函数操作
-
-
action
- 会触发之前的rdd所有的transformation
- 获取最终的结果
- collect 所有的结果都会加载到内存中
- reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。
- fitst 第一个
- take(num) 前n个
- count()
-
persist
- 数据存储,可以存到内存,也可以是磁盘
3. spark-core 实战
详情见spark-core 实战 通过spark实现ip地址查询
4. spark集群架构
-
Application
用户自己写的Spark应用程序,批处理作业的集合。Application的main方法为应用程序的入口,用户通过Spark的API,定义了RDD和对RDD的操作。
-
Client:客户端进程,负责提交作业到Master。
-
Master(类比与ResourceManager)
- Standalone模式中主控节点,负责接收Client提交的作业,管理Worker,并命令Worker启动Driver和Executor。
-
Worker(类比于NodeManager)
- Standalone模式中slave节点上的守护进程,负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,启动Driver和Executor。
-
Driver(类比于ApplicationMaster)
- 一个Spark作业运行时包括一个Driver进程,也是作业的主进程,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,TaskScheduler。
- DAGScheduler: 实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到TaskScheduler中。
- TaskScheduler:实现Task分配到Executor上执行。
- Stage:一个Spark作业一般包含一到多个Stage。
- Task:一个Stage包含一到多个Task,通过多个Task实现并行运行的功能。
-
Executor(类比于Container):即真正执行作业的地方,一个集群一般包含多个Executor,每个Executor接收Driver的命令Launch Task,一个Executor可以执行一到多个Task。
Spark考点总结
参考:大数据面试杀招——Spark高频考点,必知必会!_Alice菌的博客-CSDN博客
一、你是怎么理解Spark,它的特点是什么?
Spark是一个基于内存的,用于大规模数据处理(离线计算、实时计算、快速查询(交互式查询))的统一分析引擎。
它内部的组成模块,包含SparkCore,SparkSQL,SparkStreaming,SparkMLlib,SparkGraghx等…
它的特点:
- 快
Spark计算速度是MapReduce计算速度的10-100倍
- 易用
MR支持1种计算模型,Spsark支持更多的计算模型(算法多)
- 通用
Spark 能够进行离线计算、交互式查询(快速查询)、实时计算、机器学习、图计算
- 兼容性
Spark支持大数据中的Yarn调度,支持mesos。可以处理hadoop计算的数据。
1) Local:运行在一台机器上,通常是练手或者测试环境。
2)Standalone:构建一个基于Mster+Slaves的资源调度集群,Spark任务提交给Master运行。是Spark自身的一个调度系统。
3)Yarn: Spark客户端直接连接Yarn,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。
4)Mesos:国内大环境比较少用。
因为我们Spark任务是采用的Shell脚本进行提交,所以一定会涉及到几个重要的参数,而这个也是在面试的时候容易被考察到的“细节”。
<span style="color:#000000"><span style="background-color:#f5f7ff"><code class="language-shell">executor-cores —— 每个executor使用的内核数,默认为1,官方建议2-5个,我们企业是4个
num-executors —— 启动executors的数量,默认为2
executor-memory —— executor内存大小,默认1G
driver-cores —— driver使用内核数,默认为1
driver-memory —— driver内存大小,默认512M
</code></span></span>
Spark的任务提交方式实际上有两种,分别是YarnClient模式和YarnCluster模式。大家在回答这个问题的时候,也需要分类去介绍。千万不要被冗长的步骤吓到,一定要学会总结差异,发现规律,通过图形去增强记忆。
- YarnClient 运行模式介绍
在YARN Client模式下,Driver在任务提交的本地机器上运行,Driver启动后会和ResourceManager通讯申请启动ApplicationMaster,随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster的功能相当于一个ExecutorLaucher,只负责向ResourceManager申请Executor内存。
ResourceManager接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程,Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行。
- YarnCluster 模式介绍
在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster,随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver。
Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程,Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行。
五、你是如何理解Spark中血统(RDD)的概念?它的作用是什么?
- 概念
RDD是弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算 的集合。
- 作用
提供了一个抽象的数据模型,将具体的应用逻辑表达为一系列转换操作(函数)。另外不同RDD之间的转换操作之间还可以形成依赖关系,进而实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘IO和序列化开销,并且还提供了更多的API(map/reduec/filter/groupBy…)
如果还想锦上添花,可以添上这一句:
RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies,用来解决数据容错时的高效性以及划分任务时候起到重要作用
六、简述Spark的宽窄依赖,以及Spark如何划分stage,每个stage又根据什么决定task个数?
Spark的宽窄依赖问题是SparkCore部分的重点考察内容,多数出现在笔试中,大家需要注意。
窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖
宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)
那Stage是如何划分的呢?
根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。
每个stage又根据什么决定task个数?
Stage是一个TaskSet,将Stage根据分区数划分成一个个的Task。
这里为了方便大家理解,贴上一张过程图
七、列举Spark常用的transformation和action算子,有哪些算子会导致Shuffle?
我们在Spark开发过程中,避不开与各种算子打交道,其中Spark 算子分为transformation 和 action 算子,下面列出一些常用的算子,具体的功能还需要小伙伴们自行去了解。
-
transformation
-
map
-
mapRartition
-
flatMap
-
filter
-
…
-
action
-
reduce
-
collect
-
first
-
take
如果面试官问你,那小伙叽,有哪些会引起Shuffle过程的Spark算子呢?
你只管自信的回答:
- reduceByKey
- groupByKey
- …ByKey
八、reduceByKey与groupByKey的区别,哪一种更具优势?
既然你上面都提到 reduceByKey 和groupByKey ,那哪一种更具优势,你能简单分析一下吗?
能问这样的问题,已经暗示面试官的水平不低了,那么我们该如何回答呢:
reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。
groupByKey:按照key进行分组,直接进行shuffle
所以,在实际开发过程中,reduceByKey比groupByKey,更建议使用。但是需要注意是否会影响业务逻辑。
九、Repartition和Coalesce 的关系与区别,能简单说说吗?
这道题就已经开始参和有“源码”的味道了,为什么呢?
1)关系:
两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
2)区别:
repartition一定会发生shuffle,coalesce 根据传入的参数来判断是否发生shuffle。
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。
十、简述下Spark中的缓存(cache和persist)与checkpoint机制,并指出两者的区别和联系
关于Spark缓存和检查点的区别,大致可以从这3个角度去回答:
- 位置
Persist 和 Cache将数据保存在内存,Checkpoint将数据保存在HDFS
- 生命周期
Persist 和 Cache 程序结束后会被清除或手动调用unpersist方法,Checkpoint永久存储不会被删除。
- RDD依赖关系
Persist 和 Cache,不会丢掉RDD间的依赖链/依赖关系,CheckPoint会斩断依赖链。
十一、简述Spark中共享变量(广播变量和累加器)的基本原理与用途
关于Spark中的广播变量和累加器的基本原理和用途,答案较为固定,大家无需刻意去记忆。
累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。
广播变量是在每个机器上缓存一份,不可变,只读的,相同的变量,该节点每个任务都能访问,起到节省资源和优化的作用。它通常用来高效分发较大的对象。
十二、当Spark涉及到数据库的操作时,如何减少Spark运行中的数据库连接数?
嗯,有点“调优”的味道,感觉真正的“风暴”即将到来,这道题还是很好回答的,我们只需要减少连接数据库的次数即可。
使用foreachPartition代替foreach,在foreachPartition内获取数据库的连接。
资源参数调优
- num-executors:设置Spark作业总共要用多少个Executor进程来执行
- executor-memory:设置每个Executor进程的内存
- executor-cores:设置每个Executor进程的CPU core数量
- driver-memory:设置Driver进程的内存
- spark.default.parallelism:设置每个stage的默认task数量
- …
开发调优
- 避免创建重复的RDD
- 尽可能复用同一个RDD
- 对多次使用的RDD进行持久化
- 尽量避免使用shuffle类算子
- 使用map-side预聚合的shuffle操作
- 使用高性能的算子
①使用reduceByKey/aggregateByKey替代groupByKey
②使用mapPartitions替代普通map
③使用foreachPartitions替代foreach
④使用filter之后进行coalesce操作
⑤使用repartitionAndSortWithinPartitions替代repartition与sort类操作
- 广播大变量
在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC(垃圾回收),都会极大地影响性能。
- 使用Kryo优化序列化性能
- 优化数据结构
在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。
十四、如何使用Spark实现TopN的获取(描述思路或使用伪代码)?
能让你使用伪代码来描述这已经非常“苛刻”了,但是不慌,这里提供3种思路供大家参考:
- 方法1:
(1)按照key对数据进行聚合(groupByKey)
(2)将value转换为数组,利用scala的sortBy或者sortWith进行排序(mapValues)
注意:当数据量太大时,会导致OOM
- 方法2:
(1)取出所有的key
(2)对key进行迭代,每次取出一个key利用spark的排序算子进行排序
- 方法3:
(1)自定义分区器,按照key进行分区,使不同的key进到不同的分区
(2)对每个分区运用spark的排序算子进行排序
Spark面试干货
Spark面试干货总结!(8千字长文、27个知识点、21张图)
Spark实战
参考:Spark实践_GoAl的博客-CSDN博客_外国spark实践
十二、Flink
Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。并且 Flink 提供了数据分布、容错机制以及资源管理等核心功能。Flink提供了诸多高抽象层的API以便用户编写分布式任务:
DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。
DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。
Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。
此外,Flink 还针对特定的应用领域提供了领域库,例如: Flink ML,Flink 的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法, Gelly,Flink 的图计算库,提供了图计算的相关API及多种图计算算法实现。
2、Flink相比传统的Spark Streaming区别?
这个问题是一个非常宏观的问题,因为两个框架的不同点非常之多。但是在面试时有非常重要的一点一定要回答出来:Flink 是标准的实时处理引擎,基于事件驱动。而 Spark Streaming 是微批(Micro-Batch)的模型 。
下面我们就分几个方面介绍两个框架的主要区别:
- 架构模型Spark Streaming 在运行时的主要角色包括:Master、Worker、Driver、Executor,Flink 在运行时主要包含:Jobmanager、Taskmanager和Slot。
- 任务调度Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图DAG,Spark Streaming 会依次创建 DStreamGraph、JobGenerator、JobScheduler。Flink 根据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph,然后提交给 JobManager进行处理,JobManager 会根据 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 调度最核心的数据结构,JobManager 根据 ExecutionGraph 对 Job 进行调度。
- 时间机制Spark Streaming 支持的时间机制有限,只支持处理时间。 Flink 支持了流处理程序在时间上的三个定义:处理时间、事件时间、注入时间。同时也支持 watermark 机制来处理滞后数据。
- 容错机制对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰好一次处理语义。Flink 则使用两阶段提交协议来解决这个问题。
根据 Flink 官网描述,Flink 是一个分层架构的系统,每一层所包含的组件都提供了特定的抽象,用来服务于上层组件。
自下而上,每一层分别代表:Deploy 层:该层主要涉及了Flink的部署模式,在上图中我们可以看出,Flink 支持包括local、Standalone、Cluster、Cloud等多种部署模式 。
Runtime 层:Runtime层提供了支持 Flink 计算的核心实现,比如:支持分布式 Stream 处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务 。
API层: API 层主要实现了面向流(Stream)处理和批(Batch)处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API,后续版本,Flink有计划将DataStream和DataSet API进行统一 。
Libraries层: 该层称为Flink应用框架层,根据API层的划分,在API层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类。面向流处理支持:CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作);面向批处理支持:FlinkML(机器学习库)、Gelly(图处理)。
Flink可以完全独立于Hadoop,在不依赖Hadoop组件下运行。但是做为大数据的基础设施,Hadoop体系是任何大数据框架都绕不过去的。Flink可以集成众多Hadooop 组件,例如Yarn、Hbase、HDFS等等。例如,Flink可以和Yarn集成做资源调度,也可以读写HDFS,或者利用HDFS做检查点。
大家注意,这个问题看起来是问你实际应用中的Flink集群规模,其实还隐藏着另一个问题:Flink可以支持多少节点的集群规模?在回答这个问题时候,可以将自己生产环节中的集群规模、节点、内存情况说明,同时说明部署模式(一般是Flink on Yarn),除此之外,用户也可以同时在小集群(少于5个节点)和拥有 TB 级别状态的上千个节点上运行 Flink 任务。
上图是来自Flink官网的运行流程图。通过上图我们可以得知,Flink 程序的基本构建是数据输入来自一个 Source,Source 代表数据的输入端,经过 Transformation 进行转换,然后在一个或者多个Sink接收器中结束。数据流(stream)就是一组永远不会停止的数据记录流,而转换(transformation)是将一个或多个流作为输入,并生成一个或多个输出流的操作。执行时,Flink程序映射到 streaming dataflows,由流(streams)和转换操作(transformation operators)组成。
Flink 程序在运行时主要有 TaskManager,JobManager,Client 三种角色。
其中JobManager扮演着集群中的管理者Master的角色,它是整个集群的协调者,负责接收Flink Job,协调检查点,Failover 故障恢复等,同时管理Flink集群中从节点TaskManager。
TaskManager是实际负责执行计算的Worker,在其上执行Flink Job的一组Task,每个TaskManager负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向JobManager汇报。
Client是Flink程序提交的客户端,当用户提交一个Flink程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群中处理,所以Client需要从用户提交的Flink程序配置中获取JobManager的地址,并建立到JobManager的连接,将Flink Job提交给JobManager。
8、说说 Flink 资源管理中 Task Slot 的概念
在Flink架构角色中我们提到,TaskManager是实际负责执行计算的Worker,TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个task或多个subtask。为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念。简单的说,TaskManager会将自己节点上管理的资源分为不同的Slot:固定大小的资源子集。这样就避免了不同Job的Task互相竞争内存资源,但是需要主要的是,Slot只会做内存的隔离。没有做CPU的隔离。
Flink 最常用的常用算子包括:Map:DataStream → DataStream
,输入一个参数产生一个参数,map的功能是对输入的参数进行转换操作。Filter
:过滤掉指定条件的数据。KeyBy
:按照指定的key进行分组。Reduce
:用来进行结果汇总合并。Window
:窗口函数,根据某些特性将每个key的数据进行分组(例如:在5s内到达的数据)
要搞懂什么是分区策略,需要清楚分区策略是用来决定数据如何发送至下游。目前 Flink 支持了8种分区策略的实现。
上图是整个Flink实现的分区策略继承图:GlobalPartitioner 数据会被分发到下游算子的第一个实例中进行处理。ShufflePartitioner 数据会被随机分发到下游算子的每一个实例中进行处理。RebalancePartitioner 数据会被循环发送到下游的每一个实例中进行处理。RescalePartitioner 这种分区器会根据上下游算子的并行度,循环的方式输出到下游算子的每个实例。这里有点难以理解,假设上游并行度为2,编号为A和B。下游并行度为4,编号为1,2,3,4。那么A则把数据循环发送给1和2,B则把数据循环发送给3和4。假设上游并行度为4,编号为A,B,C,D。下游并行度为2,编号为1,2。那么A和B则把数据发送给1,C和D则把数据发送给2。BroadcastPartitioner 广播分区会将上游数据输出到下游算子的每个实例中。适合于大数据集和小数据集做Jion的场景。ForwardPartitioner ForwardPartitioner 用于将记录输出到下游本地的算子实例。它要求上下游算子并行度一样。简单的说,ForwardPartitioner用来做数据的控制台打印。KeyGroupStreamPartitioner Hash分区器。会将数据按 Key 的 Hash 值输出到下游算子实例中。CustomPartitionerWrapper 用户自定义分区器。需要用户自己实现Partitioner接口,来定义自己的分区逻辑。
11、Flink的并行度了解吗?Flink的并行度设置是怎样的?
Flink中的任务被分为多个并行任务来执行,其中每个并行的实例处理一部分数据。这些并行实例的数量被称为并行度。我们在实际生产环境中可以从四个不同层面设置并行度:
操作算子层面(Operator Level)
执行环境层面(Execution Environment Level)
客户端层面(Client Level)
系统层面(System Level)
需要注意的优先级:算子层面>环境层面>客户端层面>系统层面。
12、 Flink的Slot和parallelism有什么区别?
官网上十分经典的图:
slot是指 taskmanager 的并发执行能力,假设我们将 taskmanager.numberOfTaskSlots 配置为3 那么每一个 taskmanager 中分配3个 TaskSlot, 3个 taskmanager 一共有9个TaskSlot。
parallelism是指taskmanager实际使用的并发能力。假设我们把 parallelism.default 设置为1,那么9个 TaskSlot 只能用1个,有8个空闲。
Flink 实现了多种重启策略。
- 固定延迟重启策略(Fixed Delay Restart Strategy)
- 故障率重启策略(Failure Rate Restart Strategy)
- 没有重启策略(No Restart Strategy)
- Fallback重启策略(Fallback Restart Strategy)
Flink实现的分布式缓存和Hadoop有异曲同工之妙。目的是在本地读取文件,并把他放在 taskmanager 节点中,防止task重复拉取。
<span style="color:#000000"><span style="background-color:#f5f7ff"><code class="language-java">val env <span style="color:#ac9739">=</span> ExecutionEnvironment<span style="color:#999999">.</span>getExecutionEnvironment
<span style="color:#6b7394">// register a file from HDFS</span>
env<span style="color:#999999">.</span><span style="color:#3d8fd1">registerCachedFile</span><span style="color:#999999">(</span><span style="color:#ac9739">"hdfs:///path/to/your/file"</span><span style="color:#999999">,</span> <span style="color:#ac9739">"hdfsFile"</span><span style="color:#999999">)</span>
<span style="color:#6b7394">// register a local executable file (script, executable, ...)</span>
env<span style="color:#999999">.</span><span style="color:#3d8fd1">registerCachedFile</span><span style="color:#999999">(</span><span style="color:#ac9739">"file:///path/to/exec/file"</span><span style="color:#999999">,</span> <span style="color:#ac9739">"localExecFile"</span><span style="color:#999999">,</span> <span style="color:#c76b29">true</span><span style="color:#999999">)</span>
<span style="color:#6b7394">// define your program and execute</span>
<span style="color:#999999">.</span><span style="color:#999999">.</span><span style="color:#999999">.</span>
val input<span style="color:#ac9739">:</span> DataSet<span style="color:#999999">[</span>String<span style="color:#999999">]</span> <span style="color:#ac9739">=</span> <span style="color:#999999">.</span><span style="color:#999999">.</span><span style="color:#999999">.</span>
val result<span style="color:#ac9739">:</span> DataSet<span style="color:#999999">[</span>Integer<span style="color:#999999">]</span> <span style="color:#ac9739">=</span> input<span style="color:#999999">.</span><span style="color:#3d8fd1">map</span><span style="color:#999999">(</span><span style="color:#6679cc">new</span> MyMapper<span style="color:#999999">(</span><span style="color:#999999">)</span><span style="color:#999999">)</span>
<span style="color:#999999">.</span><span style="color:#999999">.</span><span style="color:#999999">.</span>
env<span style="color:#999999">.</span><span style="color:#3d8fd1">execute</span><span style="color:#999999">(</span><span style="color:#999999">)</span>
</code></span></span>
我们知道Flink是并行的,计算过程可能不在一个 Slot 中进行,那么有一种情况即:当我们需要访问同一份数据。那么Flink中的广播变量就是为了解决这种情况。我们可以把广播变量理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。
说说Flink中的窗口?
Flink 支持两种划分窗口的方式,按照time和count。如果根据时间划分窗口,那么它就是一个time-window 。如果根据数据划分窗口,那么它就是一个 count-window。flink支持窗口的两个重要属性(size和interval)如果size=interval,那么就会形成tumbling-window(无重叠数据) ;如果size>interval,那么就会形成sliding-window(有重叠数据) 如果size< interval, 那么这种窗口将会丢失数据。比如每5秒钟,统计过去3秒的通过路口汽车的数据,将会漏掉2秒钟的数据。通过组合可以得出四种基本窗口:
- time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5))
- time-sliding-window 有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3))
- count-tumbling-window无重叠数据的数量窗口,设置方式举例:countWindow(5)
- count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)
Flink在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。选择的状态存储策略不同,会影响状态持久化如何和 checkpoint 交互。
Flink提供了三种状态存储方式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。
Flink 中的时间和其他流式计算系统的时间一样分为三类:事件时间,摄入时间,处理时间三种。如果以 EventTime
为基准来定义时间窗口将形成EventTimeWindow,要求消息本身就应该携带EventTime。如果以IngesingtTime
为基准来定义时间窗口将形成 IngestingTimeWindow,以 source 的systemTime为准。如果以 ProcessingTime
基准来定义时间窗口将形成 ProcessingTimeWindow,以 operator 的systemTime 为准。
Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制, 本质上是一种时间戳。 一般来讲Watermark经常和Window一起被用来处理乱序事件。
20、Flink Table & SQL 熟悉吗?TableEnvironment这个类有什么作用
TableEnvironment是Table API和SQL集成的核心概念。这个类主要用来:
- 在内部 catalog 中注册表
- 注册外部 catalog
- 执行SQL查询
- 注册用户定义(标量,表或聚合)函数
- 将DataStream或DataSet转换为表
- 持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
21、Flink SQL的实现原理是什么?是如何实现 SQL 解析的呢?
首先大家要知道 Flink 的SQL解析是基于Apache Calcite这个开源框架。
基于此,一次完整的SQL解析过程如下:
- 用户使用对外提供Stream SQL的语法开发业务应用
- 用calcite对StreamSQL进行语法检验,语法检验通过后,转换成calcite的逻辑树节点;最终形成calcite的逻辑计划
- 采用Flink自定义的优化规则和calcite火山模型、启发式模型共同对逻辑树进行优化,生成最优的Flink物理计划
- 对物理计划采用janino codegen生成代码,生成用低阶API DataStream 描述的流应用,提交到Flink平台执行
Flink中级
本道面试题考察的其实就是一句话:Flink的开发者认为批处理是流处理的一种特殊情况。批处理是有限的流处理。Flink 使用一个引擎支持了DataSet API 和 DataStream API。
在一个Flink Job中,数据需要在不同的task中进行交换,整个数据交换是由 TaskManager 负责的,TaskManager 的网络组件首先从缓冲buffer中收集records,然后再发送。Records 并不是一个一个被发送的,二是积累一个批次再发送,batch 技术可以更加高效的利用网络资源。
Flink实现容错主要靠强大的CheckPoint机制和State机制。Checkpoint 负责定时制作分布式快照、对程序中的状态进行备份;State 用来存储计算过程中的中间状态。
Flink的分布式快照是根据Chandy-Lamport算法量身定做的。简单来说就是持续创建分布式数据流及其状态的一致快照。
核心思想是在 input source 端插入 barrier,控制 barrier 的同步来实现 snapshot 的备份和 exactly-once 语义。
Flink通过实现两阶段提交和状态保存来实现端到端的一致性语义。 分为以下几个步骤:
- 开始事务(beginTransaction)创建一个临时文件夹,来写把数据写入到这个文件夹里面
- 预提交(preCommit)将内存中缓存的数据写入文件并关闭
- 预提交(preCommit)将内存中缓存的数据写入文件并关闭
- 预提交(preCommit)将内存中缓存的数据写入文件并关闭
- 若失败发生在预提交成功后,正式提交前。可以根据状态来提交预提交的数据,也可删除预提交的数据
Flink源码中有一个独立的connector模块,所有的其他connector都依赖于此模块,Flink 在1.9版本发布的全新kafka连接器,摒弃了之前连接不同版本的kafka集群需要依赖不同版本的connector这种做法,只需要依赖一个connector即可。
Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上。此外,Flink大量的使用了堆外内存。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink 为了直接操作二进制数据实现了自己的序列化框架。理论上Flink的内存管理分为三部分:
Network Buffers:这个是在TaskManager启动的时候分配的,这是一组用于缓存网络数据的内存,每个块是32K,默认分配2048个,可以通过“taskmanager.network.numberOfBuffers”修改。
Memory Manage pool:大量的Memory Segment块,用于运行时的算法(Sort/Join/Shuffle等),这部分启动的时候就会分配。下面这段代码,根据配置文件中的各种参数来计算内存的分配方法。(heap or off-heap,这个放到下节谈),内存的分配支持预分配和lazy load,默认懒加载的方式。
User Code,这部分是除了Memory Manager之外的内存用于User code和TaskManager本身的数据结构。
Java本身自带的序列化和反序列化的功能,但是辅助信息占用空间比较大,在序列化对象时记录了过多的类信息。Apache Flink摒弃了Java原生的序列化方法,以独特的方式处理数据类型和序列化,包含自己的类型描述符,泛型类型提取和类型序列化框架。TypeInformation 是所有类型描述符的基类。它揭示了该类型的一些基本属性,并且可以生成序列化器。TypeInformation 支持以下几种类型:
BasicTypeInfo: 任意Java 基本类型或 String 类型
BasicArrayTypeInfo: 任意Java基本类型数组或 String 数组
WritableTypeInfo: 任意 Hadoop Writable 接口的实现类
WritableTypeInfo: 任意 Hadoop Writable 接口的实现类
CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)
CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)
GenericTypeInfo: 任意无法匹配之前几种类型的类
针对前六种类型数据集,Flink皆可以自动生成对应的TypeSerializer,能非常高效地对数据集进行序列化和反序列化。
29、Flink中的Window出现了数据倾斜,你有什么解决办法?
window产生数据倾斜指的是数据在不同的窗口内堆积的数据量相差过多。本质上产生这种情况的原因是数据源头发送的数据量速度不同导致的。出现这种情况一般通过两种方式来解决:
- 在数据进入窗口前做预聚合
- 重新设计窗口聚合的key
30、Flink中在使用聚合函数 GroupBy、Distinct、KeyBy 等函数时出现数据热点该如何解决?
数据倾斜和数据热点是所有大数据框架绕不过去的问题。处理这类问题主要从3个方面入手:
- 在业务上规避这类问题
例如一个假设订单场景,北京和上海两个城市订单量增长几十倍,其余城市的数据量不变。这时候我们在进行聚合的时候,北京和上海就会出现数据堆积,我们可以单独数据北京和上海的数据。
- Key的设计上
把热key进行拆分,比如上个例子中的北京和上海,可以把北京和上海按照地区进行拆分聚合。
- 参数设置
Flink 1.9.0 SQL(Blink Planner) 性能优化中一项重要的改进就是升级了微批模型,即 MiniBatch。原理是缓存一定的数据后再触发处理,以减少对State的访问,从而提升吞吐和减少数据的输出量。
在Flink的后台任务管理中,我们可以看到Flink的哪个算子和task出现了反压。最主要的手段是资源调优和算子调优。资源调优即是对作业中的Operator的并发数(parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。作业参数调优包括:并行度的设置,State的设置,checkpoint的设置。
Flink 内部是基于 producer-consumer 模型来进行消息传递的,Flink的反压设计也是基于这个模型。Flink 使用了高效有界的分布式阻塞队列,就像 Java 通用的阻塞队列(BlockingQueue)一样。下游消费者消费变慢,上游就会受到阻塞。
Storm 是通过监控 Bolt 中的接收队列负载情况,如果超过高水位值就会将反压信息写到 Zookeeper ,Zookeeper 上的 watch 会通知该拓扑的所有 Worker 都进入反压状态,最后 Spout 停止发送 tuple。Flink中的反压使用了高效有界的分布式阻塞队列,下游消费变慢会导致发送端阻塞。二者最大的区别是Flink是逐级反压,而Storm是直接从源头降速。
34、 Operator Chains(算子链)这个概念你了解吗?
为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。每个task在一个线程中执行。将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。这就是我们所说的算子链。
- 支持hive读写,支持UDF
- Flink SQL TopN和GroupBy等优化
- Checkpoint跟savepoint针对实际业务场景做了优化
- Flink state查询
可以在处理前加一个fliter算子,将不符合规则的数据过滤出去。
Flink高级
用户提交的Flink Job会被转化成一个DAG任务运行,分别是:StreamGraph、JobGraph、ExecutionGraph,Flink中JobManager与TaskManager,JobManager与Client的交互是基于Akka工具包的,是通过消息驱动。整个Flink Job的提交还包含着ActorSystem的创建,JobManager的启动,TaskManager的启动和注册。
一个Flink任务的DAG生成计算图大致经历以下三个过程:
StreamGraph 最接近代码所表达的逻辑层面的计算拓扑结构,按照用户代码的执行顺序向StreamExecutionEnvironment添加StreamTransformation构成流式图。
JobGraph 从StreamGraph生成,将可以串联合并的节点进行合并,设置节点之间的边,安排资源共享slot槽位和放置相关联的节点,上传任务所需的文件,设置检查点配置等。相当于经过部分初始化和优化处理的任务图。
ExecutionGraph 由JobGraph转换而来,包含了任务具体执行所需的内容,是最贴近底层实现的执行图。
JobManager 负责整个 Flink 集群任务的调度以及资源的管理,从客户端中获取提交的应用,然后根据集群中 TaskManager 上 TaskSlot 的使用情况,为提交的应用分配相应的 TaskSlot 资源并命令 TaskManager 启动从客户端中获取的应用。JobManager 相当于整个集群的 Master 节点,且整个集群有且只有一个活跃的 JobManager ,负责整个集群的任务管理和资源管理。JobManager 和 TaskManager 之间通过 Actor System 进行通信,获取任务执行的情况并通过 Actor System 将应用的任务执行情况发送给客户端。同时在任务执行的过程中,Flink JobManager 会触发 Checkpoint 操作,每个 TaskManager 节点 收到 Checkpoint 触发指令后,完成 Checkpoint 操作,所有的 Checkpoint 协调过程都是在 Fink JobManager 中完成。当任务完成后,Flink 会将任务执行的信息反馈给客户端,并且释放掉 TaskManager 中的资源以供下一次提交任务使用。
JobManager的职责主要是接收Flink作业,调度Task,收集作业状态和管理TaskManager。它包含一个Actor,并且做如下操作:
RegisterTaskManager: 它由想要注册到JobManager的TaskManager发送。注册成功会通过AcknowledgeRegistration消息进行Ack。
SubmitJob: 由提交作业到系统的Client发送。提交的信息是JobGraph形式的作业描述信息。
CancelJob: 请求取消指定id的作业。成功会返回CancellationSuccess,否则返回CancellationFailure。
UpdateTaskExecutionState: 由TaskManager发送,用来更新执行节点(ExecutionVertex)的状态。成功则返回true,否则返回false。
RequestNextInputSplit: TaskManager上的Task请求下一个输入split,成功则返回NextInputSplit,否则返回null。
JobStatusChanged: 它意味着作业的状态(RUNNING, CANCELING, FINISHED,等)发生变化。这个消息由ExecutionGraph发送。
TaskManager 相当于整个集群的 Slave 节点,负责具体的任务执行和对应任务在每个节点上的资源申请和管理。客户端通过将编写好的 Flink 应用编译打包,提交到 JobManager,然后 JobManager 会根据已注册在 JobManager 中 TaskManager 的资源情况,将任务分配给有资源的 TaskManager节点,然后启动并运行任务。TaskManager 从 JobManager 接收需要部署的任务,然后使用 Slot 资源启动 Task,建立数据接入的网络连接,接收数据并开始数据处理。同时 TaskManager 之间的数据交互都是通过数据流的方式进行的。可以看出,Flink 的任务运行其实是采用多线程的方式,这和 MapReduce 多 JVM 进行的方式有很大的区别,Flink 能够极大提高 CPU 使用效率,在多个任务和 Task 之间通过 TaskSlot 方式共享系统资源,每个 TaskManager 中通过管理多个 TaskSlot 资源池进行对资源进行有效管理。
TaskManager的启动流程较为简单:
启动类:org.apache.flink.runtime.taskmanager.TaskManager
核心启动方法 : selectNetworkInterfaceAndRunTaskManager
启动后直接向JobManager
注册自己,注册完成后,进行部分模块的初始化。
TaskManager中最细粒度的资源是Task slot,代表了一个固定大小的资源子集,每个TaskManager会将其所占有的资源平分给它的slot。
通过调整 task slot 的数量,用户可以定义task之间是如何相互隔离的。每个 TaskManager 有一个slot,也就意味着每个task运行在独立的 JVM 中。每个 TaskManager 有多个slot的话,也就是说多个task运行在同一个JVM中。
通过调整 task slot 的数量,用户可以定义task之间是如何相互隔离的。每个 TaskManager 有一个slot,也就意味着每个task运行在独立的 JVM 中。每个 TaskManager 有多个slot的话,也就是说多个task运行在同一个JVM中。
Flink 为了避免JVM的固有缺陷例如java对象存储密度低,FGC影响吞吐和响应等,实现了自主管理内存。MemorySegment就是Flink的内存抽象。默认情况下,一个MemorySegment可以被看做是一个32kb大的内存块的抽象。这块内存既可以是JVM里的一个byte[],也可以是堆外内存(DirectByteBuffer)。在MemorySegment这个抽象之上,Flink在数据从operator内的数据对象在向TaskManager上转移,预备被发给下个节点的过程中,使用的抽象或者说内存对象是Buffer。对接从Java对象转为Buffer的中间对象是另一个抽象StreamRecord。
Flink的容错机制的核心部分是制作分布式数据流和操作算子状态的一致性快照。 这些快照充当一致性checkpoint,系统可以在发生故障时回滚。 Flink用于制作这些快照的机制在“分布式数据流的轻量级异步快照”中进行了描述。 它受到分布式快照的标准Chandy-Lamport算法的启发,专门针对Flink的执行模型而定制。
barriers在数据流源处被注入并行数据流中。快照n的barriers被插入的位置(我们称之为Sn)是快照所包含的数据在数据源中最大位置。例如,在Apache Kafka中,此位置将是分区中最后一条记录的偏移量。 将该位置Sn报告给checkpoint协调器(Flink的JobManager)。然后barriers向下游流动。当一个中间操作算子从其所有输入流中收到快照n的barriers时,它会为快照n发出barriers进入其所有输出流中。 一旦sink操作算子(流式DAG的末端)从其所有输入流接收到barriers n,它就向checkpoint协调器确认快照n完成。在所有sink确认快照后,意味快照着已完成。一旦完成快照n,job将永远不再向数据源请求Sn之前的记录,因为此时这些记录(及其后续记录)将已经通过整个数据流拓扑,也即是已经被处理结束。
Flink 将 SQL 校验、SQL 解析以及 SQL 优化交给了Apache Calcite。Calcite 在其他很多开源项目里也都应用到了,譬如 Apache Hive, Apache Drill, Apache Kylin, Cascading。Calcite 在新的架构中处于核心的地位,如下图所示。
构建抽象语法树的事情交给了 Calcite 去做。SQL query 会经过 Calcite 解析器转变成 SQL 节点树,通过验证后构建成 Calcite 的抽象语法树(也就是图中的 Logical Plan)。另一边,Table API 上的调用会构建成 Table API 的抽象语法树,并通过 Calcite 提供的 RelBuilder 转变成 Calcite 的抽象语法树。然后依次被转换成逻辑执行计划和物理执行计划。在提交任务后会分发到各个 TaskManager 中运行,在运行时会使用 Janino 编译器编译代码后运行。
本段参考文章:大数据面试杀招 | Flink,大数据时代的“王者”_Alice菌的博客
Flink面试:
史上最全干货!Flink面试大全总结(全文6万字、110个知识点、160张图)史上最全Flink面试大全总结https://mp.weixin.qq.com/s/fCXwf9r3wP4rplDhM_fuvA
十二、大数据优秀博客总结:
Alice菌的博客_大数据梦想家_CSDN博客-Hadoop,Scala,大数据实战项目领域博主
yuan_more的博客_五分钟学大数据_CSDN博客-大数据,hive,大数据面试领域博主
王傲旗的大数据之路_大数据面试宝典_CSDN博客-大数据面试,大数据,Scala领域博主
是故事啊~关注我~_大数据学习僧_CSDN博客-Hive,Mysql,Hadoop领域博主
珞沫的博客_CSDN博客-leetcode,数据结构与算法,Hadoop领域博主
不温卜火_CSDN博客-Hadoop,Spark,Hive领域博主
我的祖传代码_大数据私房菜_CSDN博客-数据仓库,Hive,Spark领域博主
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/136169.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...