golang 大数据平台_golang 如何处理大数据[通俗易懂]

golang 大数据平台_golang 如何处理大数据[通俗易懂]Golang被证明非常适合并发编程,goroutine比异步编程更易读、优雅、高效。本文提出一个适合由Golang实现的Pipeline执行模型,适合批量处理大量数据(ETL)的情景。想象这样的应用情景:(推荐学习:go)从数据库A(Cassandra)加载用户评论(量巨大,例如10亿条);根据每条评论的用户ID、从数据库B(MySQL)关联用户资料;调用…

大家好,又见面了,我是你们的朋友全栈君。

golang 大数据平台_golang 如何处理大数据[通俗易懂]

Golang被证明非常适合并发编程,goroutine比异步编程更易读、优雅、高效。本文提出一个适合由Golang实现的Pipeline执行模型,适合批量处理大量数据(ETL)的情景。

想象这样的应用情景: (推荐学习:go)

从数据库A(Cassandra)加载用户评论(量巨大,例如10亿条);根据每条评论的用户ID、从数据库B(MySQL)关联用户资料;调用NLP服务(自然语言处理),处理每条评论;将处理结果写入数据库C(ElasticSearch)。

由于应用中遇到的各种问题,归纳出这些需求:

需求一:应分批处理数据,例如规定每批100条。出现问题时(例如任意一个数据库故障)则中断,下次程序启动时使用checkpoint从中断处恢复。

需求二:每个流程设置合理的并发数、让数据库和NLP服务有合理的负载(不影响其它业务的基础上,尽可能占用更多资源以提高ETL性能)。例如,步骤(1)-(4)分别设置并发数1、4、8、2。

这就是一个典型的Pipeline(流水线)执行模型。把每一批数据(例如100条)看作流水线上的产品,4个步骤对应流水线上4个处理工序,每个工序处理完毕后就把半成品交给下一个工序。每个工序可以同时处理的产品数各不相同。

你可能首先想到启用1+4+8+2个goroutine,使用channel来传递数据。我也曾经这么干,结论就是这么干会让程序员疯掉:流程并发控制代码非常复杂,特别是你得处理异常、执行时间超出预期、可控中断等问题,你不得不加入一堆channel,直到你自己都不记得有什么用。

可重用的Pipeline模块

为了更高效完成ETL工作,我将Pipeline抽象成模块。我先把代码粘贴出来,再解析含义。模块可以直接使用,主要使用的接口是:NewPipeline、Async、Wait。

使用这个Pipeline组件,我们的ETL程序将会简单、高效、可靠,让程序员从繁琐的并发流程控制中解放出来:package main

import “log”

func main() {

//恢复上次执行的checkpoint,如果是第一次执行就获取一个初始值。

checkpoint := loadCheckpoint()

//工序(1)在pipeline外执行,最后一个工序是保存checkpoint

pipeline := NewPipeline(4, 8, 2, 1)

for {

//(1)

//加载100条数据,并修改变量checkpoint

//data是数组,每个元素是一条评论,之后的联表、NLP都直接修改data里的每条记录。

data, err := extractReviewsFromA(&checkpoint, 100)

if err != nil {

log.Print(err)

break

}

//这里有个Golang著名的坑。

//“checkpoint”是循环体外的变量,它在内存中只有一个实例并在循环中不断被修改,所以不能在异步中使用它。

//这里创建一个副本curCheckpoint,储存本次循环的checkpoint。

curCheckpoint := checkpoint

ok := pipeline.Async(func() error {

//(2)

return joinUserFromB(data)

}, func() error {

//(3)

return nlp(data)

}, func() error {

//(4)

return loadDataToC(data)

}, func() error {

//(5)保存checkpoint

log.Print(“done:”, curCheckpoint)

return saveCheckpoint(curCheckpoint)

})

if !ok { break }

if len(data) < 100 { break } //处理完毕

}

err := pipeline.Wait()

if err != nil { log.Print(err) }

}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/142596.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • windows10 Linux子系统(wsl)文件目录

    windows10 Linux子系统(wsl)文件目录简介使用window中的Linux子系统创建的文件究竟放在什么地方,既然作为子系统文件肯定是可以互相访问的目录ubuntuLinux子系统的目录是在这个目录下C:\Users\用户名\AppData\Local\Packages\CanonicalGroupLimited.UbuntuonWindows_79rhkp1fndgsc\LocalState\rootfs现在在…

  • docker离线安装mysql镜像_安装rabbitmq

    docker离线安装mysql镜像_安装rabbitmqDocker离线安装RabbitMQ1、进入dockerhub,搜索rabbit镜像https://registry.hub.docker.com/_/rabbitmq/2、进入官方的镜像,我们选择带有“management”的版本(包含web管理界面)https://hub.docker.com/_/rabbitmq?tab=tags#通过以下方式无法查询到,问题未知[root@localhost~]#dockersearch3.7.7-managementErrorres

  • Vim编辑器的安装与使用[通俗易懂]

    Vim编辑器的安装与使用[通俗易懂]1、导读下面给大家讲解一下Linux/ununtu系统下的文本编辑神器——vim编辑器的安装及基本使用方法。2、安装vim编辑器如果没有安装vim编辑器的,可在终端下输入一下命令进行安装:sudoapt-getinstallvim安装过程中提示:[y/n]?时,回复“y”即可!3、简单理解vim编辑器的工作模式vim编辑器可以简单分为“命令…

    2022年10月28日
  • 协同过滤推荐算法(一)原理与实现

    协同过滤推荐算法(一)原理与实现一、协同过滤算法原理协同过滤推荐算法是诞生最早,并且较为著名的推荐算法。主要的功能是预测和推荐。算法通过对用户历史行为数据的挖掘发现用户的偏好,基于不同的偏好对用户进行群组划分并推荐品味相似的商品。协同过滤推荐算法分为两类,分别是基于用户的协同过滤算法(user-basedcollaboratIvefiltering),和基于物品的协同过滤算法(item-basedcollaborati…

  • centos systemctl_正在不使用中

    centos systemctl_正在不使用中CentOS7.x开始,CentOS开始使用systemd服务来代替daemon,原来管理系统启动和管理系统服务的相关命令全部由systemctl命令来代替。1、原来的service命令与systemctl命令对比daemon命令systemctl命令说明service[服务]startsystemctlstart[unit…

  • 【组合数求模】 转自AekdyCoin

    【组合数求模】 转自AekdyCoin大家都在中学阶段学习了组合数的定义:这个表示的是从n个元素中选取m个元素的方案数。(PS.组合数求模似乎只用在信息学竞赛和ACM竞赛等计算机编程设计大赛中……,求在现实中的运用) 可以知道当n,m 取得比较大的时候,组合数可能很大很大(天文数字?无法度量?)例如C(100,50)=100891344545564193334812497256, 于是计算机的64

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号