精进Quartz源码—Quartz中JobStore保存JonDetail和Trigger源码分析(一)

我都是分析的jobStore 方式为jdbc的SimpleTrigger!RAM的方式类似分析方式!要使用定时器,并讲任务持久到数据库,我们一定明白JobDetail和Trigger是如何操作进入数据库,如何注册到Scheduler中!前面我分析了Scheduler的start()和QuartzSchedulerThread的run()!这些都是要在下面讲的这个流程的基础上!那么下面就开始分析源码,

大家好,又见面了,我是全栈君。

=================================================

对人工智能感兴趣的伙伴,分享一个我朋友的人工智能教程。零基础!通俗易懂!风趣幽默!大家可以看看是否对自己有帮助,点击这里查看教程

=================================================

欢迎关注我的公众号: Java编程技术乐园。分享技术,一起精进Quartz!

做一个积极的人

编码、改bug、提升自己

我有一个乐园,面向编程,春暖花开!

我都是分析的jobStore 方式为jdbc的SimpleTrigger!RAM的方式类似分析方式!

说明:

  • {0} :表的前缀 ,如表qrtz_trigger ,{0}== qrtz_

  • {1}:quartz.properties 中配置的 org.quartz.scheduler.instanceName: myInstanceName ,{1} ==myInstanceName

如果要使用定时器,那么正常企业级开发中一定是要将任务持久到数据库,那么我们一定明白JobDetail和Trigger是如何操作进入数据库,如何注册到Scheduler中!

一、从一个小Demo开始

那么下面就开始分析源码,分析源码前首先加一个小的demo,方面解释!

//1.创建Scheduler的工厂
SchedulerFactory sf = new StdSchedulerFactory();
//2.从工厂中获取调度器实例
Scheduler scheduler = sf.getScheduler();
//3.创建JobDetail
JobDetail jb = JobBuilder.newJob(RAMJob.class)
    .withDescription("this is a ram job") //job的描述
    .withIdentity("ramJob", "ramGroup") //job 的name和group
    .build();

//任务运行的时间,SimpleSchedle类型触发器有效
long time=  System.currentTimeMillis() + 3*1000L; //3秒后启动任务
Date statTime = new Date(time);

//4.创建Trigger
//使用SimpleScheduleBuilder或者CronScheduleBuilder
Trigger t = TriggerBuilder.newTrigger()
    .withDescription("")
    .withIdentity("ramTrigger", "ramTriggerGroup")
    //.withSchedule(SimpleScheduleBuilder.simpleSchedule())
    .startAt(statTime)  //默认当前时间启动
    .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?")) //两秒执行一次
    .build();

//5.注册任务和定时器
scheduler.scheduleJob(jb, t);//源码分析

//6.启动 调度器
scheduler.start();

二、开始源码分析

主要分析入口 scheduleJob ,从这里进去,看看源码里面到底是怎么进行处理的!

//5.注册任务和定时器
scheduler.scheduleJob(jb, t);//源码分析

2.1、scheduler.scheduleJob(jb, t);

开始一个任务调度 。scheduler.scheduleJob(jb, t);//源码分析

/** * <p> * Calls the equivalent method on the 'proxied' <code>QuartzScheduler</code>. * 调用“代理”<QuartzScheduler>上的等效方法。 * * 实现 在 StdScheduler.scheduleJob(JobDetail jobDetail, Trigger trigger) * </p> */
public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
    throws SchedulerException { 
   
    //这里的sched 是 QuartzScheduler 对象,Quartz和核心类,Quartz调度器
    return sched.scheduleJob(jobDetail, trigger);//保存JobDetail和Trigger
}

2.2、ched.scheduleJob(jobDetail, trigger);

保存JobDetail和Trigger。ched.scheduleJob(jobDetail, trigger);//保存JobDetail和Trigger

/** * <p> * Add the <code>{@link org.quartz.Job}</code> identified by the given * <code>{@link org.quartz.JobDetail}</code> to the Scheduler, and * associate the given <code>{@link org.quartz.Trigger}</code> with it. * </p> * * <p> * If the given Trigger does not reference any <code>Job</code>, then it * will be set to reference the Job passed with it into this method. * </p> * * @throws SchedulerException * if the Job or Trigger cannot be added to the Scheduler, or * there is an internal Scheduler error. * 将给定org.quartz.JobDetail标识的org.quartz.Job添加到Scheduler, * 并将给定的org.quartz.Trigger与其关联。 * 如果给定的触发器不引用任何作业,则它将被设置为引用与其一起传递的作业到此方法中。 * * 实现在 QuartzScheduler.scheduleJob(JobDetail jobDetail, * Trigger trigger) */
    public Date scheduleJob(JobDetail jobDetail,
            Trigger trigger) throws SchedulerException { 
   
        validateState();//验证调度器是否关闭,关闭抛出异常

		//检查 jobDetail和trigger 
        if (jobDetail == null) { 
   
            throw new SchedulerException("JobDetail cannot be null");
        }
        
        if (trigger == null) { 
   
            throw new SchedulerException("Trigger cannot be null");
        }
        
        if (jobDetail.getKey() == null) { 
   
            throw new SchedulerException("Job's key cannot be null");
        }

        if (jobDetail.getJobClass() == null) { 
   
            throw new SchedulerException("Job's class cannot be null");
        }
        
        OperableTrigger trig = (OperableTrigger)trigger;
        
		//getJobKey 获取 getJobName(), getJobGroup() 
        if (trigger.getJobKey() == null) { 
   
            trig.setJobKey(jobDetail.getKey());
        } else if (!trigger.getJobKey().equals(jobDetail.getKey())) { 
   
            throw new SchedulerException(
                "Trigger does not reference given job!");
        }
		//验证trigger
        trig.validate();

        Calendar cal = null;
        if (trigger.getCalendarName() != null) { 
   
            cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());//检索Calendar
           "SELECT * FROM {0}CALENDARS WHERE SCHED_NAME = {1} AND CALENDAR_NAME = ?"
        }
        //在触发器首次添加到调度程序时由调度程序调用,以便让触发器基于任何关联的日历计算
        //其第一次触发时间。调用此方法后,getNextFireTime()应返回有效的答案。
        Date ft = trig.computeFirstFireTime(cal);

        if (ft == null) { 
   
            throw new SchedulerException(
                    "Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
        }

		//存储给定的org.quartz.JobDetail和org.quartz.Trigger。
        resources.getJobStore().storeJobAndTrigger(jobDetail, trig);// 主要看这一行
        notifySchedulerListenersJobAdded(jobDetail);
        notifySchedulerThread(trigger.getNextFireTime().getTime());
        notifySchedulerListenersSchduled(trigger);

        return ft;
    }

2.3、 resources.getJobStore().storeJobAndTrigger(jobDetail, trig);

存储给定的org.quartz.JobDetail和org.quartz.Trigger。 resources.getJobStore().storeJobAndTrigger(jobDetail, trig);

public void storeJobAndTrigger(final JobDetail newJob,
            final OperableTrigger newTrigger) 
        throws JobPersistenceException { 
   
        executeInLock(
            (isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null,
            new VoidTransactionCallback() { 
   
                public void executeVoid(Connection conn) throws JobPersistenceException { 
   
	                //(1)保存JobDetail
                    storeJob(conn, newJob, false);
                    //(2)保存Trigger
                    storeTrigger(conn, newTrigger, newJob, false,
                            Constants.STATE_WAITING, false, false);
                }
            });
    }

2.3.1 storeJob(conn, newJob, false);

保存JobDetail 。storeJob(conn, newJob, false);

首先判断JobDetail是否已经存在,存在则更新,否则插入!

protected void storeJob(Connection conn, 
                        JobDetail newJob, boolean replaceExisting)
    throws JobPersistenceException { 
   

    //判断JobDetail是否已经存在,根据name和group
    boolean existingJob = jobExists(conn, newJob.getKey());
    try { 
   
        if (existingJob) { 
   
            if (!replaceExisting) { 
    
                throw new ObjectAlreadyExistsException(newJob); 
            }
            //更新JobDetail
            getDelegate().updateJobDetail(conn, newJob);
            "UPDATE {0}JOB_DETAILS SET DESCRIPTION = ?, JOB_CLASS_NAME = ?, IS_DURABLE = ?, IS_NONCONCURRENT = ?, IS_UPDATE_DATA = ?, REQUESTS_RECOVERY = ?, JOB_DATA = ? WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
        } else { 
   
            //插入JobDetail
            getDelegate().insertJobDetail(conn, newJob);
            "INSERT INTO {0}JOB_DETAILS (SCHED_NAME, JOB_NAME, JOB_GROUP, DESCRIPTION, JOB_CLASS_NAME, IS_DURABLE, IS_NONCONCURRENT, IS_UPDATE_DATA, REQUESTS_RECOVERY, JOB_DATA) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
        }
    } catch (IOException e) { 
   
        throw new JobPersistenceException("Couldn't store job: "
                                          + e.getMessage(), e);
    } catch (SQLException e) { 
   
        throw new JobPersistenceException("Couldn't store job: "
                                          + e.getMessage(), e);
    }
}

2.3.2 storeTrigger(conn, newTrigger, newJob, false,Constants.STATE_WAITING, false, false);

保存Trigger。 storeTrigger(conn, newTrigger, newJob, false,Constants.STATE_WAITING, false, false);

protected void storeTrigger(Connection conn,
                            OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state,
                            boolean forceState, boolean recovering)
    throws JobPersistenceException { 
   

    //判断Trigger是否已经存在,根据name和group
    boolean existingTrigger = triggerExists(conn, newTrigger.getKey());

    if ((existingTrigger) && (!replaceExisting)) { 
    
        throw new ObjectAlreadyExistsException(newTrigger); 
    }

    try { 
   

        boolean shouldBepaused;
        //进行一些状态的判断
        if (!forceState) { 
   
            shouldBepaused = getDelegate().isTriggerGroupPaused(
                conn, newTrigger.getKey().getGroup());

            if(!shouldBepaused) { 
   
                shouldBepaused = getDelegate().isTriggerGroupPaused(conn,
                                                                    ALL_GROUPS_PAUSED);

                if (shouldBepaused) { 
   
                    getDelegate().insertPausedTriggerGroup(conn, newTrigger.getKey().getGroup());
                }
            }

            if (shouldBepaused && (state.equals(STATE_WAITING) || state.equals(STATE_ACQUIRED))) { 
   
                state = STATE_PAUSED;
            }
        }
        //若job为null,重新获取!
        if(job == null) { 
   
            job = getDelegate().selectJobDetail(conn, newTrigger.getJobKey(), getClassLoadHelper());

            "SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
        }
        if (job == null) { 
   
            throw new JobPersistenceException("The job ("
                                              + newTrigger.getJobKey()
                                              + ") referenced by the trigger does not exist.");
        }
        //判断是否有DisallowConcurrentExecution注解,recovering恢复
        if (job.isConcurrentExectionDisallowed() && !recovering) { 
    
            state = checkBlockedState(conn, job.getKey(), state);
            "if (jobName != null) { 
   
                ps = conn.prepareStatement(rtp(SELECT_FIRED_TRIGGERS_OF_JOB));
            "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
                ps.setString(1, jobName);
            ps.setString(2, groupName);
        } else { 
   
            ps = conn
                .prepareStatement(rtp(SELECT_FIRED_TRIGGERS_OF_JOB_GROUP));
            "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND JOB_GROUP = ?"
                ps.setString(1, groupName);
        }"
    }

    if (existingTrigger) { 
   
        //更新trigger
        getDelegate().updateTrigger(conn, newTrigger, state, job);
        "  大概贴出一些,具体想看完整的进入源码看
            if(updateJobData) { 
   
                ps = conn.prepareStatement(rtp(UPDATE_TRIGGER));
                "UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ?, JOB_DATA = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
            } else { 
   
                ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_SKIP_DATA));
                "UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
            }
        "
    } else { 
   
        //插入trigger
        getDelegate().insertTrigger(conn, newTrigger, state, job);
        "INSERT INTO {0}TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, DESCRIPTION, NEXT_FIRE_TIME, PREV_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, START_TIME, END_TIME, CALENDAR_NAME, MISFIRE_INSTR, JOB_DATA, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
    }
} catch (Exception e) { 
   
    throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '" 
                                      + newTrigger.getJobKey() + "' job:" + e.getMessage(), e);
}
}


到这里,保存就结束了!将任务保存成功后,就可以启动了,期待下一篇源码的分析:Scheduler的start()和QuartzSchedulerThread的run()!

三、参考资料

quartz2.2源码分析4-JobStore

Quartz专栏系列

1.精进Quartz——Quartz大致介绍(一)
2.精进Quartz——Quartz简单入门Demo(二)
3.精进Quartz——Spring和Quartz集成详解
4.精进Quartz——SSMM(Spring+SpringMVC+Mybatis+Mysql)和Quartz集成详解(四)
5.精进Quartz源码——JobStore保存JonDetail和Trigger源码分析(一)
6.精进Quartz源码——scheduler.start()启动源码分析(二)
7.精进Quartz源码——QuartzSchedulerThread.run() 源码分析(三)
8.精进Quartz源码——Quartz调度器的Misfire处理规则(四)


谢谢你的阅读,如果您觉得这篇博文对你有帮助,请点赞或者喜欢,让更多的人看到!祝你每天开心愉快!


不管做什么,只要坚持下去就会看到不一样!在路上,不卑不亢!

博客首页 : http://blog.csdn.net/u010648555

愿你我在人生的路上能都变成最好的自己,能够成为一个独挡一面的人
精进Quartz源码—Quartz中JobStore保存JonDetail和Trigger源码分析(一)

© 每天都在变得更好的阿飞云

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/121201.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 谷歌学术搜索文献_谷歌学术论文翻译

    谷歌学术搜索文献_谷歌学术论文翻译谷歌学术文献信息爬取及文献下载

    2022年10月11日
  • 失去焦点和获得焦点发生事件(js)「建议收藏」

    失去焦点和获得焦点发生事件(js)「建议收藏」失去焦点:onblur=”hanshu(this)”获得焦点:onfocus=”hanshu(this)”{     alert(‘请确认您输入格式是否正确!’);   }//函数名:chksafe//功能介绍:检查是否含有,//,///参数说明:要检查的字符串//返回值:0:是 1:不是functionchksafe(a)

  • 提高系统可用性

    提高系统可用性计算机网络发展史首先前解释一下什么是计算机网络,计算机网络是指将地理位置不同的具有独立功能的多台计算机及其外部设备,通过通信线路连接起来,在网络操作系统,网络管理软件及网络通信协议的管理和协调下,实现资源共享和信息传递的计算机系统。计算机网络也称计算机通信网。关于计算机网络的最简单定义是:一些相互…

  • 纯HTML CSS制作导航栏 下拉菜单

    纯HTML CSS制作导航栏 下拉菜单纯HTMLCSS制作导航栏下拉菜单

  • c++实现简单的web服务器搭建

    c++实现简单的web服务器搭建本文使用c++socket编程进行简单的web服务器搭建,出来了GET请求

  • Python 标识符与关键字[通俗易懂]

    Python 标识符与关键字[通俗易懂]Python标识符与关键字标识符是编程语言中允许作为名字的有效字符串集合。其中有一部分是关键字,构成语言的标识符。这种标识符是不能做它用的标识符的,否则会引起语法错误(SyntaxError异常)。标识符就是一个名字,作为变量、函数、类、模块以及其他对象的名称。1.Python标识符第一个字符必须是字母(A~Z和a~z)或下划线(_),剩下的字符可以是字母和数字或下划线,大小写敏感。标识符由字母、下划线和数字(0~9)组成,且不能以数字开头,Python中的标识符是区分大

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号