大家好,又见面了,我是你们的朋友全栈君。
大数据教程:Transformation和Action算子演示
一、Transformation算子演示
val conf = new SparkConf().setAppName(“Test”).setMaster(“local”)
//通过并行化生成rdd
val rdd = sc.parallelize(List(5,6,4,7,3,8,2,9,10))
//map:对rdd里面每一个元乘以2然后排序
val rdd2: RDD[Int] = rdd.map(_ * 2)
//collect以数组的形式返回数据集的所有元素(是Action算子)
println(rdd2.collect().toBuffer)
//filter:该RDD由经过func函数计算后返回值为true的输入元素组成
val rdd3: RDD[Int] = rdd2.filter(_ > 10) val rdd4 = sc.parallelize(Array(“a b c”,”b c d”))
//flatMap:将rdd4中的元素进行切分后压平
val rdd5: RDD[String] = rdd4.flatMap(_.split(” “))
//假如: List(List(” a,b” ,”b c”),List(“e c”,” i o”))
//压平 flatMap(_.flatMap(_.split(” “)))
//sample随机抽样
//withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样
//fraction抽样比例例如30% 即0.3 但是这个值是一个浮动的值不准确
//seed用于指定随机数生成器种子 默认参数不传
val rdd5_1 = sc.parallelize(1 to 10)
//union:求并集
val rdd6 = sc.parallelize(List(5,6,7,8))
//intersection:求交集
val rdd9 = rdd6 intersection rdd7
//distinct:去重出重复
println(rdd8.distinct.collect.toBuffer)
//join相同的key会被合并
val rdd10_1 = sc.parallelize(List((“tom”,1),(“jerry” ,3),(“kitty”,2)))
//左连接和右连接
//除基准值外是Option类型,因为可能存在空值所以使用Option
val rdd10_4 = rdd10_1 leftOuterJoin rdd10_2
//以左边为基准没有是null
val rdd10_5 = rdd10_1 rightOuterJoin rdd10_2
//以右边为基准没有是null
println(rdd10_4.collect().toList) val rdd11_1 = sc.parallelize(List((“tom”,1),(“jerry” ,3),(“kitty”,2)))
//笛卡尔积
val rdd11_3 = rdd11_1 cartesian rdd11_2
//根据传入的参数进行分组
val rdd11_5_1 = rdd11_4.groupBy(_._1)
//按照相同key进行分组,并且可以制定分区
val rdd11_5_2 = rdd11_4.groupByKey
//根据相同key进行分组[分组的话需要二元组]
//cogroup 和 groupBykey的区别
//cogroup不需要对数据先进行合并就以进行分组 得到的结果是 同一个key 和不同数据集中的数据集合
//groupByKey是需要先进行合并然后在根据相同key进行分组
val rdd11_6: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11_1 cogroup rdd11_2 |
二、Action算子演示
val conf = new SparkConf().setAppName(“Test”).setMaster(“local[*]”)
/* Action 算子*/
//集合函数
val rdd1 = sc.parallelize(List(2,1,3,6,5),2)
//以数组的形式返回数据集的所有元素
println(rdd1.collect().toBuffer)
//返回RDD的元素个数
println(rdd1.count())
//取出对应数量的值 默认降序, 若输入0 会返回一个空数组
println(rdd1.top(3).toBuffer)
//顺序取出对应数量的值
println(rdd1.take(3).toBuffer)
//顺序取出对应数量的值 默认生序
println(rdd1.takeOrdered(3).toBuffer)
//获取第一个值 等价于 take(1)
println(rdd1.first())
//将处理过后的数据写成文件(存储在HDFS或本地文件系统)
//rdd1.saveAsTextFile(“dir/file1”)
//统计key的个数并生成map k是key名 v是key的个数
val rdd2 = sc.parallelize(List((“key1”,2),(“key2”,1),(“key3”,3),(“key4”,6),(“key5”,5)),2)
//遍历数据
rdd1.foreach(x => println(x))
/*其他算子*/
//统计value的个数 但是会将集合中的一个元素看做是一个vluae
val value: collection.Map[(String, Int), Long] = rdd2.countByValue
//filterByRange:对RDD中的元素进行过滤,返回指定范围内的数据
val rdd3 = sc.parallelize(List((“e”,5),(“c”,3),(“d”,4),(“c”,2),(“a”,1)))
//包括开始和结束的
println(rdd3_1.collect.toList)
//flatMapValues对参数进行扁平化操作,是value的值
val rdd3_2 = sc.parallelize(List((“a”,”1 2″),(“b”,”3 4″)))
//foreachPartition 循环的是分区数据
// foreachPartiton一般应用于数据的持久化,存入数据库,可以进行分区的数据存储
val rdd4 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
//keyBy 以传入的函数返回值作为key ,RDD中的元素为value 新的元组
val rdd5 = sc.parallelize(List(“dog”,”cat”,”pig”,”wolf”,”bee”),3)
//keys获取所有的key values 获取所有的values
println(rdd5_1.keys.collect.toList)
//collectAsMap 将需要的二元组转换成Map
val map: collection.Map[String, Int] = rdd2.collectAsMap() |
转载于:https://juejin.im/post/5d07547d6fb9a07ef7107428
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/106770.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...