luigi框架–关于python运行spark程序

luigi框架–关于python运行spark程序

大家好,又见面了,我是全栈君。

首先,目标是写个python脚本,跑spark程序来统计hdfs中的一些数据。参考了别人的代码,故用了luigi框架。

至于luigi的原理 底层的一些东西Google就好。本文主要就是聚焦快速使用,知其然不知其所以然。

python写Spark或mapreduce还有其他的方法,google上很多,这里用luigi只是刚好有参考的代码,而且理解起来还是简单,就用了。

上代码:

import luigi, sys
from datetime import datetime, timedelta
from luigi.contrib.spark import PySparkTask

class luigiBase(PySparkTask):
date = luigi.DateParameter(default=datetime.now())
def main(self, sc, *args):
log_rdd = sc.textFile(self.input()[0].path)
#要做的spark操作
  log_rdd.repartition(1).saveAsTextFile(self.output().path)
@property
  def name(self):
return "luigi_test_{}_username".format(format_date(self.date))
def requires(self):
return [HdfsFiles(date=self.date)]
def output(self):
return luigi.hdfs.HdfsTarget(Files().path,format=luigi.hdfs.PlainDir)

class luigiStats(luigi.Task):
now = datetime.now()
date = luigi.DateParameter(default=datetime(now.year, now.month, now.day) )
def requires(self):
return luigiBase(self.date)

if __name__ == '__main__':
luigi.run(main_task_cls=luigiStats)

1.对于普通的luigi任务,关键是要按需实现requires、output和run三个函数;对于luigi封装好的spark任务,关键是要按需实现requires、output和main三个函数

2.base类继承PySparkTask类,该类还有很多参数可以设置,但作为最最简单的luigi例子,就都剔除了,只要在意requires、output和main三个函数就好。可以把requires理解成输入,output输出,main是要实现的逻辑。name函数之所以也写出来,是因为在将代码pushonline的时候,每个Job都要取名字,而公司对job的名字是有规定的,如果name结尾不是你的用户名,Spark程序是会报错的,就是不让你跑的意思。

3.代码有两个类,base和stats类,执行逻辑是这样的:主函数调用stats,然后发现stats类requires(依赖于)base类,就看看这个依赖的输出存不存在,如果存在就作为自己的输入,然后执行自己类中的代码。如果不存在就执行base类。上面代码中我的stats类中不需要执行上面,就没写main,只是用来检查下base执行了没,没执行就执行base去。

3.该base类中requires和ouput都是hdfs文件,逻辑和stats类一样。base类需要继承PySparkTask类,而luigi.run()的参数需要时继承了luigi.Task的类,所以才分开写成两个类了,我自己是这样理解的。

4.requires函数的返回值不能是个target对象,这里具体的理解就是不能是一个直接读取的hdfs文件,可以封装到一个类中去,这个类可以有个属性是path,是用来返回一个hdfs文件的地址的。依赖不仅限一个,可以是多个,生成一个列表返回。

5.如果不是在自己的电脑上安装的Spark,要注意:由于PySparkTask调用的spark集群不在本地,好像不支持对本地文件的一些操作,开始的时候想把结果写在本地,一直找不到输出结果。

6.一般公司都有相对应得网页可以查看spark和hadoop程序的运行的情况,可以查看日志什么的。

7.base类中可以设置下queue 参数,选择你程序的运行队列,有时候默认的队列好像特别慢,可以设置个其他的。

转载于:https://www.cnblogs.com/qingjiaowoyc/p/6995097.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/108420.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 字典树模板及例题_模板计算公式

    字典树模板及例题_模板计算公式转载:Trie树的常见应用大总结(面试+附代码实现)(一)Trie的简介Trie树,又称字典树,单词查找树或者前缀树,是一种用于快速检索的多叉树结构,如英文字母的字典树是一个26叉树,数字的字典树是一个10叉树。他的核心思想是空间换时间,空间消耗大但是插入和查询有着很优秀的时间复杂度。(二)Trie的定义Trie树的键不是直接保存在节点中,而是由节点在树中的位置决定。一个节点的所有子…

  • navicat premium 15 for mac 激活码【中文破解版】[通俗易懂]

    (navicat premium 15 for mac 激活码)本文适用于JetBrains家族所有ide,包括IntelliJidea,phpstorm,webstorm,pycharm,datagrip等。IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.cn/100143.html…

  • 子网划分介绍以及如何划分子网(例题详解)

    子网划分介绍以及如何划分子网(例题详解)子网划分这项技术用来把一个单一的IP网络地址划分成多个更小的子网(subnet)。这种技术可使一个较大的分类IP地址能够被进一步划分为几个子网。这样就可以让使用一个大的分类地址(classfuladdress)的企业能给该企业中处于不同地理位置的分公司分配不同的子网,对外整个企业是一个网络地址,而在内部,不同分公司则有不同的子网地址,因而不需要为每个站点都分别申请一个网络地址。子网划分通常是把IP地址中主机标识部分划出一定的位数用作本网的各个子网,剩余的主机标识作为相应子网的主机标识部分。

  • 数字电路实验(一)——译码器

    数字电路实验(一)——译码器1、实验步骤:异或门过程1、 新建,编写源代码。(1).选择保存项和芯片类型:【File】-【newprojectwizard】-【next】(设置文件路径+设置projectname为【C:\Users\lenovo\Desktop\笔记\大二上\数字电路\实验课\实验一\异或门】)-【next】(设置文件名【gg】)-【next】(设置芯片类型为【cyclone-EP1CT144C…

  • idea激活码2021.12【2021最新】

    (idea激活码2021.12)最近有小伙伴私信我,问我这边有没有免费的intellijIdea的激活码,然后我将全栈君台教程分享给他了。激活成功之后他一直表示感谢,哈哈~IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.cn/100143.htmlFN…

  • 控制误差_自动控制原理校正

    控制误差_自动控制原理校正计算机实时控制加工误差的时滞问题.pdf第18卷薯4月J.Hu中azho理ngUU工nniv杰·ofSS学cci.·&TechhVAOpIr.i18IN19o9.021990档计算机实时控制加工误差的时滞问题薯宾鸿赞(机械工程一系)提要本文从计算机控制的原理分析八手…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号