Spark Streaming Join「建议收藏」

Spark Streaming Join「建议收藏」多数据源Join思路多数据源Join大致有以下三种思路:数据源端Join,如Android/IOS客户端在上报用户行为数据时就获取并带上用户基础信息。计算引擎上Join,如用SparkStreaming、Flink做Join。结果端Join,如用HBase/ES做Join,Join键做Rowkey/_id,各字段分别写入列簇、列或field。三种思路各有优劣,使用时注意…

大家好,又见面了,我是你们的朋友全栈君。

多数据源Join思路

多数据源Join大致有以下三种思路:

  • 数据源端Join,如Android/IOS客户端在上报用户行为数据时就获取并带上用户基础信息。

  • 计算引擎上Join,如用Spark Streaming、Flink做Join。

  • 结果端Join,如用HBase/ES做Join,Join键做Rowkey/_id,各字段分别写入列簇、列或field。

三种思路各有优劣,使用时注意一下。这里总结在计算引擎Spark Streaming上做Join。

Stream-Static Join

流与完全静态数据Join

流与完全静态数据Join。有两种方式,一种是RDD Join方式,另一种是Broadcast Join(也叫Map-Side Join)方式。

RDD Join 方式

思路:RDD Join RDD 。

package com.bigData.spark
import com.alibaba.fastjson.{ 
JSON, JSONException, JSONObject}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{ 
Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ 
ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{ 
Durations, StreamingContext}
/** * Author: Wang Pei * License: Copyright(c) Pei.Wang * Summary: * * Stream-Static Join * * spark 2.2.2 * */
case class UserInfo(userID:String,userName:String,userAddress:String)
object StreamStaicJoin { 

def main(args: Array[String]): Unit = { 

//设置日志等级
Logger.getLogger("org").setLevel(Level.WARN)
//Kafka 参数
val kafkaParams= Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (true: java.lang.Boolean),
"group.id" -> "testTopic3_consumer_v1")
//spark环境
val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.replace("$","")).setMaster("local[3]")
val ssc = new StreamingContext(sparkConf,Durations.seconds(10))
/** 1) 静态数据: 用户基础信息*/
val userInfo=ssc.sparkContext.parallelize(Array(
UserInfo("user_1","name_1","address_1"),
UserInfo("user_2","name_2","address_2"),
UserInfo("user_3","name_3","address_3"),
UserInfo("user_4","name_4","address_4"),
UserInfo("user_5","name_5","address_5")
)).map(item=>(item.userID,item))
/** 2) 流式数据: 用户发的tweet数据*/
/** 数据示例: * eventTime:事件时间、retweetCount:转推数、language:语言、userID:用户ID、favoriteCount:点赞数、id:事件ID * {"eventTime": "2018-11-05 10:04:00", "retweetCount": 1, "language": "chinese", "userID": "user_1", "favoriteCount": 1, "id": 4909846540155641457} */
val kafkaDStream=KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](Set("testTopic3"),kafkaParams)
).map(item=>parseJson(item.value())).map(item=>{ 

val userID = item.getString("userID")
val eventTime = item.getString("eventTime")
val language= item.getString("language")
val favoriteCount = item.getInteger("favoriteCount")
val retweetCount = item.getInteger("retweetCount")
(userID,(userID,eventTime,language,favoriteCount,retweetCount))
})
/** 3) 流与静态数据做Join (RDD Join 方式)*/
kafkaDStream.foreachRDD(_.join(userInfo).foreach(println))
ssc.start()
ssc.awaitTermination()
}
/**json解析*/
def parseJson(log:String):JSONObject={ 

var ret:JSONObject=null
try{ 

ret=JSON.parseObject(log)
}catch { 

//异常json数据处理
case e:JSONException => println(log)
}
ret
}
}

stream_static_rdd_join.png

Broadcast Join 方式

思路:RDD遍历每一条数据,去匹配广播变量中的值。

package com.bigData.spark
import com.alibaba.fastjson.{ 
JSON, JSONException, JSONObject}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{ 
Level, Logger}
import org.apache.spark.{ 
SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010.{ 
ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{ 
Durations, StreamingContext}
/** * Author: Wang Pei * License: Copyright(c) Pei.Wang * Summary: * * Stream-Static Join * * spark 2.2.2 * */
case class UserInfo(userID:String,userName:String,userAddress:String)
object StreamStaticJoin2 { 

def main(args: Array[String]): Unit = { 

//设置日志等级
Logger.getLogger("org").setLevel(Level.WARN)
//Kafka 参数
val kafkaParams= Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (true: java.lang.Boolean),
"group.id" -> "testTopic3_consumer_v1")
//spark环境
val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.replace("$","")).setMaster("local[3]")
val ssc = new StreamingContext(sparkConf,Durations.seconds(10))
/** 1) 静态数据: 用户基础信息。 将用户基础信息广播出去。*/
val broadcastUserInfo=ssc.sparkContext.broadcast(
Map(
"user_1"->UserInfo("user_1","name_1","address_1"),
"user_2"->UserInfo("user_2","name_2","address_2"),
"user_3"->UserInfo("user_3","name_3","address_3"),
"user_4"->UserInfo("user_4","name_4","address_4"),
"user_5"->UserInfo("user_5","name_5","address_5")
))
/** 2) 流式数据: 用户发的tweet数据*/
/** 数据示例: * eventTime:事件时间、retweetCount:转推数、language:语言、userID:用户ID、favoriteCount:点赞数、id:事件ID * {"eventTime": "2018-11-05 10:04:00", "retweetCount": 1, "language": "chinese", "userID": "user_1", "favoriteCount": 1, "id": 4909846540155641457} */
val kafkaDStream=KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](List("testTopic3"),kafkaParams)
).map(item=>parseJson(item.value())).map(item=>{ 

val userID = item.getString("userID")
val eventTime = item.getString("eventTime")
val language= item.getString("language")
val favoriteCount = item.getInteger("favoriteCount")
val retweetCount = item.getInteger("retweetCount")
(userID,(userID,eventTime,language,favoriteCount,retweetCount))
})
/** 3) 流与静态数据做Join (Broadcast Join 方式)*/
val result=kafkaDStream.mapPartitions(part=>{ 

val userInfo = broadcastUserInfo.value
part.map(item=>{ 

(item._1,(item._2,userInfo.getOrElse(item._1,null)))})
})
result.foreachRDD(_.foreach(println))
ssc.start()
ssc.awaitTermination()
}
/**json解析*/
def parseJson(log:String):JSONObject={ 

var ret:JSONObject=null
try{ 

ret=JSON.parseObject(log)
}catch { 

//异常json数据处理
case e:JSONException => println(log)
}
ret
}
}

stream_static_rdd_join2.png

流与半静态数据Join

半静态数据指的是放在Redis等的数据,会被更新。

思路:RDD 每个Partition连接一次Redis,遍历Partition中每条数据,根据k,去Redis中查找v。

package com.bigData.spark
import com.alibaba.fastjson.{ 
JSON, JSONException, JSONObject}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{ 
Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ 
ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{ 
Durations, StreamingContext}
import redis.clients.jedis.Jedis
/** * Author: Wang Pei * License: Copyright(c) Pei.Wang * Summary: * * Stream-Static Join * * spark 2.2.2 * */
object StreamStaicJoin3 { 

def main(args: Array[String]): Unit = { 

//设置日志等级
Logger.getLogger("org").setLevel(Level.WARN)
//Kafka 参数
val kafkaParams= Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (true: java.lang.Boolean),
"group.id" -> "testTopic3_consumer_v1")
//spark环境
val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.replace("$","")).setMaster("local[3]")
val ssc = new StreamingContext(sparkConf,Durations.seconds(10))
/** 1) 半静态数据: 用户基础信息,在Redis中*/
/** HMSET user_1 userID "user_1" name "name_1" address "address_1" */
/** HMSET user_2 userID "user_2" name "name_2" address "address_2" */
/** 2) 流式数据: 用户发的tweet数据*/
/** 数据示例: * eventTime:事件时间、retweetCount:转推数、language:语言、userID:用户ID、favoriteCount:点赞数、id:事件ID * {"eventTime": "2018-11-05 10:04:00", "retweetCount": 1, "language": "chinese", "userID": "user_1", "favoriteCount": 1, "id": 4909846540155641457} */
val kafkaDStream=KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](Set("testTopic3"),kafkaParams)
).map(item=>parseJson(item.value())).map(item=>{ 

val userID = item.getString("userID")
val eventTime = item.getString("eventTime")
val language= item.getString("language")
val favoriteCount = item.getInteger("favoriteCount")
val retweetCount = item.getInteger("retweetCount")
(userID,(userID,eventTime,language,favoriteCount,retweetCount))
})
/** 3) 流与半静态数据做Join (RDD Join 方式)*/
val result=kafkaDStream.mapPartitions(part=>{ 

val redisCli=connToRedis("localhost",6379,3000,10)
part.map(item=>{ 

(item._1,(item._2,redisCli.hmget(item._1,"userID","name","address")))
})
})
result.foreachRDD(_.foreach(println))
ssc.start()
ssc.awaitTermination()
}
/**json解析*/
def parseJson(log:String):JSONObject={ 

var ret:JSONObject=null
try{ 

ret=JSON.parseObject(log)
}catch { 

//异常json数据处理
case e:JSONException => println(log)
}
ret
}
/**连接到redis*/
def connToRedis(redisHost:String,redisPort:Int,timeout:Int,dbNum:Int): Jedis ={ 

val redisCli=new Jedis(redisHost,redisPort,timeout)
redisCli.connect()
redisCli.select(dbNum)
redisCli
}
}

stream_static_join3.png

Stream-Stream Join

流与流Join。

思路:DStream Join DStream。

package com.bigData.spark
import com.alibaba.fastjson.{ 
JSON, JSONException, JSONObject}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{ 
Level, Logger}
import org.apache.spark.{ 
SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010.{ 
ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{ 
Durations, StreamingContext}
/** * Author: Wang Pei * License: Copyright(c) Pei.Wang * Summary: * * Stream-Stream Join * * spark 2.2.2 * */
object StreamStreamJoin { 

def main(args: Array[String]): Unit = { 

//设置日志等级
Logger.getLogger("org").setLevel(Level.WARN)
//Kafka 参数
val kafkaParams1= Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (true: java.lang.Boolean),
"group.id" -> "testTopic3_consumer_v1")
val kafkaParams2= Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (true: java.lang.Boolean),
"group.id" -> "testTopic4_consumer_v1")
//spark环境
val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.replace("$","")).setMaster("local[3]")
val ssc = new StreamingContext(sparkConf,Durations.seconds(10))
/** 1) 流式数据: 用户发的tweet数据*/
/** 数据示例: * eventTime:事件时间、retweetCount:转推数、language:语言、userID:用户ID、favoriteCount:点赞数、id:事件ID * {"eventTime": "2018-11-05 10:04:00", "retweetCount": 1, "language": "chinese", "userID": "user_1", "favoriteCount": 1, "id": 4909846540155641457} */
val kafkaDStream1=KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](List("testTopic3"),kafkaParams1)
).map(item=>parseJson(item.value())).map(item=>{ 

val userID = item.getString("userID")
val eventTime = item.getString("eventTime")
val language= item.getString("language")
val favoriteCount = item.getInteger("favoriteCount")
val retweetCount = item.getInteger("retweetCount")
(userID,(userID,eventTime,language,favoriteCount,retweetCount))
})
/** 2) 流式数据: 用户发的tweet数据*/
/** 数据示例: * eventTime:事件时间、retweetCount:转推数、language:语言、userID:用户ID、favoriteCount:点赞数、id:事件ID * {"eventTime": "2018-11-05 10:04:00", "retweetCount": 1, "language": "chinese", "userID": "user_1", "favoriteCount": 1, "id": 4909846540155641457} */
val kafkaDStream2=KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](List("testTopic4"),kafkaParams2)
).map(item=>parseJson(item.value())).map(item=>{ 

val userID = item.getString("userID")
val eventTime = item.getString("eventTime")
val language= item.getString("language")
val favoriteCount = item.getInteger("favoriteCount")
val retweetCount = item.getInteger("retweetCount")
(userID,(userID,eventTime,language,favoriteCount,retweetCount))
})
/** 3) Stream-Stream Join*/
val joinedDStream = kafkaDStream1.leftOuterJoin(kafkaDStream2)
joinedDStream.foreachRDD(_.foreach(println))
ssc.start()
ssc.awaitTermination()
}
/**json解析*/
def parseJson(log:String):JSONObject={ 

var ret:JSONObject=null
try{ 

ret=JSON.parseObject(log)
}catch { 

//异常json数据处理
case e:JSONException => println(log)
}
ret
}
}

stream_stream_join.png

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/147403.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • python_sklearn库的使用

    python_sklearn库的使用sklearn库的使用

    2022年10月17日
  • 超分辨率重建总结(超分辨率重建算法程序)

    1.SRCNN:—2,3改进开山之作,三个卷积层,输入图像是低分辨率图像经过双三次(bicubic)插值和高分辨率一个尺寸后输入CNN。图像块的提取和特征表示,特征非线性映射和最终的重建。使用均方误差(MSE)作为损失函数。2.FSRCNN特征提取:低分辨率图像,选取的核9×9设置为5×5。收缩:1×1的卷积核进行降维。非线性映射:用两个串联的3×3的卷积核可以替代一个5×5…

  • 多线程模型下的无锁编程「建议收藏」

    多线程模型下的无锁编程「建议收藏」多线程模式是比较流行的一种并发编程模型,多线程编程的一个特点就是线程间共享内存空间;这可以降低线程间通信的开销,但却引来了另外的一个难缠的问题:竟态条件!,因此,甚至有人对多线程模型提出了质疑,看这里。在多线程编程模型下,解决竟态条件的传统方法就是加锁保护临界区,但这存在影响系统性能、优先级反转等问题.因此又有人提出了,多线程模型下无锁编程的一些方式:1.线程内通信框架:Di

  • java请求C# asmx接口[通俗易懂]

    java请求C# asmx接口[通俗易懂]packagecom.example.demo.controller;importorg.apache.axis.client.Call;importorg.apache.axis.client.Service;importorg.springframework.stereotype.Controller;importorg.springframework.web.bind….

  • 桌面太单调?一起用Python做个自定义动画挂件,好玩又有趣!

    桌面太单调?一起用Python做个自定义动画挂件,好玩又有趣!前言前段时间,写了篇博客关于Python自制一款炫酷音乐播放器。有粉丝问我,音乐播放器为什么要用PyQt5,效果是不是比Tkinter赞?PyQt5真的可以实现这些炫酷的UI画面吗?之前没接触过PyQt5,能不能多分享一些这方面的开发案例?今天就带大家,一起用Python的PyQt5开发一个有趣的自定义桌面动画挂件,看看实现的动画挂件效果!下面,我们开始介绍这个自定义桌面动画挂件的制作过程。一、核心功能设计总体来说,我们需要实现将自己喜欢的动态图gif或者视频转成一个桌面动画挂件,并且可以通过鼠

  • C# TransactionScope「建议收藏」

    C# TransactionScope「建议收藏」TransactionScopeTransactionScope事务处理经常用到,老是用了又忘,做点记录。TransactionScope的定义跟使用介绍。TransactionScopeOptionTransactionScopeOption枚举型用来决定一个TransactionScope是用已有的事务,还是定义TransactionScope的新做一个事务,还是完全不用事务。默认是Required,Required表示如果已有事务,就加入该事务,否则新建一个事务。TransactionOp

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号