精进Quartz源码—scheduler.start()启动源码分析(二)「建议收藏」

scheduler.start()是Quartz的启动方式!下面进行分析,方便自己查看! 我都是分析的jobStore 方式为jdbc的SimpleTrigger!RAM的方式类似分析方式!

大家好,又见面了,我是全栈君。

=================================================

对人工智能感兴趣的伙伴,分享一个我朋友的人工智能教程。零基础!通俗易懂!风趣幽默!大家可以看看是否对自己有帮助,点击这里查看教程

=================================================

欢迎关注我的公众号: Java编程技术乐园。分享技术,一起精进Quartz!

做一个积极的人

编码、改bug、提升自己

我有一个乐园,面向编程,春暖花开!

scheduler.start()是Quartz的启动方式!下面开始源码的分析!
说明:我都是分析的jobStore 方式为jdbc的SimpleTrigger!RAM的方式类似分析方式!


源码中相关sql中内容解释:

{0} : 表的前缀 ,如表qrtz_trigger ,{0}== qrtz_

{1} :quartz.properties 中配置的 org.quartz.scheduler.instanceName: myInstanceName ,{1} ==myInstanceName


scheduler.start() 调用 .QuartzScheduler.start();

Quartz 的启动要调用start()方法进行线程的启动,并执行需要出发的Trigger,start方法里面进行的操作:

  1. 启动的初始化
  2. 判断是否集群,对应不同的操作
  3. 若是非集群,首先有恢复机制,恢复任何失败或misfire的作业,并根据需要清理数据存储。
  4. 初始化线程管理,唤醒所有等待的线程!

线程中启动线程是调用start()方法,但是真正执行线程任务的操作在run()中!

开启 scheduler调用start();

QuartzScheduler.start();下面就是简单的源码分析:

public void start() throws SchedulerException { 
   

    if (shuttingDown|| closed) { 
   
        throw new SchedulerException(
            "The Scheduler cannot be restarted after shutdown() has been called.");
    }

    // QTZ-212 : calling new schedulerStarting() method on the listeners
    // right after entering start()
    notifySchedulerListenersStarting();

    if (initialStart == null) { 
   //初始化标识为null,进行初始化操作
        initialStart = new Date();
        this.resources.getJobStore().schedulerStarted();//1 主要分析的地方 
        startPlugins();
    } else { 
   

        resources.getJobStore().schedulerResumed();//2 如果已经初始化过,则恢复jobStore
    }

    schedThread.togglePause(false);//3 唤醒所有等待的线程

    getLog().info(
        "Scheduler " + resources.getUniqueIdentifier() + " started.");

    notifySchedulerListenersStarted();
}

一、启动调度任务

this.resources.getJobStore().schedulerStarted() ;主要分析的地方,实际上 是调用 QuartzSchedulerResources中的JobStore进行启动!看下面代码:

public void schedulerStarted() throws SchedulerException { 
   
    //是集群
    if (isClustered()) { 
   
        clusterManagementThread = new ClusterManager();
        if(initializersLoader != null)
            clusterManagementThread.setContextClassLoader(initializersLoader);
        clusterManagementThread.initialize();
    } else { 
   //不是集群
        try { 
   
            recoverJobs();/1、恢复job 
        } catch (SchedulerException se) { 
   
            throw new SchedulerConfigException(
                "Failure occured during job recovery.", se);
        }
    }

    misfireHandler = new MisfireHandler();
    if(initializersLoader != null)
        misfireHandler.setContextClassLoader(initializersLoader);
    misfireHandler.initialize();//2、 获取ThreadExecutor 线程管理
    schedulerRunning = true;

    getLog().debug("JobStore background threads started (as scheduler was started).");
}	

1 、恢复job recoverJobs();

//启动的时候 有一个恢复机制:
//recoverJobs ----- 将恢复任何失败或misfire的作业,并根据需要清理数据存储。
protected void recoverJobs() throws JobPersistenceException { 
   
    executeInNonManagedTXLock(
        LOCK_TRIGGER_ACCESS,
        new VoidTransactionCallback() { 
   
            public void executeVoid(Connection conn) throws JobPersistenceException { 
   
                recoverJobs(conn);//恢复job
            }
        }, null);
}

protected void recoverJobs(Connection conn) throws JobPersistenceException { 
   
    try { 
   1//更新不一致的作业状态 先修改状态,将 ACQUIRED 和 BLOCKED ---> WAITING
            int rows = getDelegate().updateTriggerStatesFromOtherStates(conn,
                                                                        STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED);

        rows += getDelegate().updateTriggerStatesFromOtherStates(conn,
                                                                 STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED);

        //----更新sql--- 
        //"UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND (TRIGGER_STATE = ? OR TRIGGER_STATE = ?)"

        getLog().info(
            "Freed " + rows
            + " triggers from 'acquired' / 'blocked' state.");

        // clean up misfired jobs
        //1.1 清理misfire的jobs
        recoverMisfiredJobs(conn, true);

        // recover jobs marked for recovery that were not fully executed
        //1.2 恢复未完全执行的标记为恢复的作业 --查询 qrtz_fire_trigger
        List<OperableTrigger> recoveringJobTriggers = getDelegate()
            .selectTriggersForRecoveringJobs(conn);
        getLog()
            .info(
            "Recovering "
            + recoveringJobTriggers.size()
            + " jobs that were in-progress at the time of the last shut-down.");

        for (OperableTrigger recoveringJobTrigger: recoveringJobTriggers) { 
   
            if (jobExists(conn, recoveringJobTrigger.getJobKey())) { 
   
                recoveringJobTrigger.computeFirstFireTime(null);
                storeTrigger(conn, recoveringJobTrigger, null, false,
                             STATE_WAITING, false, true);
            }
        }
        getLog().info("Recovery complete.");

        // remove lingering 'complete' triggers...
        //1.3 移除state == complete 
        List<TriggerKey> cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
        for(TriggerKey ct: cts) { 
   
            removeTrigger(conn, ct);
        }
        getLog().info(
            "Removed " + cts.size() + " 'complete' triggers.");

        // clean up any fired trigger entries
        //1.4 清理任何已触发的触发器条目
        int n = getDelegate().deleteFiredTriggers(conn);
        getLog().info("Removed " + n + " stale fired job entries.");
    } catch (JobPersistenceException e) { 
   
        throw e;
    } catch (Exception e) { 
   
        throw new JobPersistenceException("Couldn't recover jobs: "
                                          + e.getMessage(), e);
    }
}

1.1 清理misfire的jobs recoverMisfiredJobs(conn, true);

//是否有misfire的Trigger
//我们必须仍然寻找MISFIRED状态,以防触发器被遗忘
//在此状态下升级到此版本不支持
(a1)hasMisfiredTriggersInState(conn, STATE_WAITING, getMisfireTime(),    
                               maxMisfiresToHandleAtATime, misfiredTriggers);   
getMisfireTime() 当前时间 -(减去) 一分钟 ,maxMisfiresToHandleAtATime == -1 ,misfiredTriggers== null 

"SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? AND TRIGGER_STATE = ? ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC"

    上面sql查询出来结果是个list
				(aa1)若resultList.size() == count 返回 TRUE!! 否则 返回false(aa2)不等于 count ,封装数据,到resultList中,triggername  TriggerGroup

		//查询出来有misfire 的 Trigger 
	(b2) misfiredTriggers.size() > 0
    (bb1)输出日志信息   :getLog().info(
    "Handling " + misfiredTriggers.size() + 
    " trigger(s) that missed their scheduled fire-time.");

(bb2)循环 misfiredTriggers List集合
			for (TriggerKey triggerKey: misfiredTriggers) { 
   
				//retrieveTrigger ,检索Trigger,检索到进行数据封装
				OperableTrigger trig = 
					retrieveTrigger(conn, triggerKey);
					 //retrieveTrigger 执行的操作1"SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"2)关联Trigger对应的类型,如simpleTrigger等

				if (trig == null) { 
   
					continue;
				}

				//do 更新misfire的触发器
				doUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering); //recovering===TRUE1)cal = retrieveCalendar(conn, trig.getCalendarName()); 搞这个表,qrtz_calendar
					(2)trig.updateAfterMisfire(cal); //simpleTrigger默认的misfire 机制 
						 setNextFireTime(new Date()); //设置下次执行的时间(next_fire_time)为当前时间!这里比较重要!!!3) getNextFireTime != null
						 if (trig.getNextFireTime() == null) { 
   
							storeTrigger(conn, trig,
								null, true, STATE_COMPLETE, forceState, recovering);
							schedSignaler.notifySchedulerListenersFinalized(trig);
						} else { 
   
							storeTrigger(conn, trig, null, true, newStateIfNotComplete,
									forceState, false);

							// job == null replaceExisting ==true state==waitting forceState==false recovering==false
							storeTrigger(Connection conn,
								OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state,
								boolean forceState, boolean recovering)

								//Insert or update a trigger.
							   boolean existingTrigger = triggerExists(conn, newTrigger.getKey());
								if (existingTrigger) { 
   

									//state == waitting 
									getDelegate().updateTrigger(conn, newTrigger, state, job);

									//更新sql
									/* "UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"*/


								} else { 
   
									getDelegate().insertTrigger(conn, newTrigger, state, job);
									//插入sql
									/*"INSERT INTO {0}TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, DESCRIPTION, NEXT_FIRE_TIME, PREV_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, START_TIME, END_TIME, CALENDAR_NAME, MISFIRE_INSTR, JOB_DATA, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"*/
								}
						}

	(c3) long earliestNewTime = Long.MAX_VALUE; // long earliestNewTime = Long.MAX_VALUE; === 9223372036854775807
		  if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime){ 
   
				earliestNewTime = trig.getNextFireTime().getTime();
		  }

1.2、 恢复未完全执行的标记为恢复的作业

List<OperableTrigger> recoveringJobTriggers = getDelegate()
    .selectTriggersForRecoveringJobs(conn);					
// INSTANCE_NAME == dufy_test REQUESTS_RECOVERY == true 实际封装到数据库查询是 REQUESTS_RECOVERY== 1
"SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ? AND REQUESTS_RECOVERY = ?"
    //具体怎么是 true是怎么转换成为 1的见附1图片!

  Recovery complete.恢复完成!!	

附1:true 是如何转换为 1 的:
这里写图片描述

1.3 移除state == complete

List<TriggerKey> cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
-----------------------------------------------------------------------------
    "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ?"
    -----------------------------------------------------------------------------
    for(TriggerKey ct: cts) { 
   
        removeTrigger(conn, ct);
        ---------------------------------------------------------------------
            (a)删除前,先查询jobDetail
            JobDetail job = getDelegate().selectJobForTrigger(conn,getClassLoadHelper(), key, false);
        "SELECT J.JOB_NAME, J.JOB_GROUP, J.IS_DURABLE, J.JOB_CLASS_NAME, J.REQUESTS_RECOVERY FROM {0}TRIGGERS T, {0}JOB_DETAILS J WHERE T.SCHED_NAME = {1} AND J.SCHED_NAME = {1} AND T.TRIGGER_NAME = ? AND T.TRIGGER_GROUP = ? AND T.JOB_NAME = J.JOB_NAME AND T.JOB_GROUP = J.JOB_GROUP"

            (b)删除触发器,其侦听器及其Simple / Cron / BLOB子表条目。
            boolean removedTrigger = deleteTriggerAndChildren(conn, key);
        deleteTrigger(Connection conn, TriggerKey triggerKey)
            (b1)deleteTriggerExtension
            "DELETE FROM {0}SIMPLE_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
            "DELETE FROM {0}BLOB_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"

            (b2)"DELETE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"

            (c)是否删除jobdetail ,判断 isDurable 默认 为falseif (null != job && !job.isDurable()) { 
   
                int numTriggers = getDelegate().selectNumTriggersForJob(conn,
                                                                        job.getKey());
                ---------------------------------------------------------
                    "SELECT COUNT(TRIGGER_NAME) FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
                    ---------------------------------------------------------
                    if (numTriggers == 0) { 
   
                        // Don't call removeJob() because we don't want to check for
                        // triggers again.
                        //不要调用removeJob(),因为我们不想再次检查触发器。
                        deleteJobAndChildren(conn, job.getKey()); //删除作业及其侦听器。
                        -----------------------------------------------------
                            //deleteJobDetail(Connection conn, JobKey jobKey) 删除给定作业的作业明细记录。
                            "DELETE FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
                            -----------------------------------------------------
                    }
            }
    }

1.4 清理任何已触发的触发器条目

 int n = getDelegate().deleteFiredTriggers(conn);
 ----------------------------------------------------------------------------
 "DELETE FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1}"
 ----------------------------------------------------------------------------

2、获取ThreadExecutor 线程管理 misfireHandler.initialize();

public void initialize() {
		ThreadExecutor executor = getThreadExecutor();
		//getThreadExecutor ==  private ThreadExecutor threadExecutor = new DefaultThreadExecutor();
		executor.execute(MisfireHandler.this); //启动线程执行 对应job的 execute方法
		//MisfireHandler  ==  class MisfireHandler extends Thread  继承了Thread
}

二、 如果已经初始化过,则恢复

jobStoreresources.getJobStore().schedulerResumed();.如果已经初始化过,则恢复调度器运行 !

private volatile boolean schedulerRunning = false;//默认schedulerRunning = false
public void schedulerResumed() { 
   
    schedulerRunning = true;
}

三、 唤醒所有等待的线程

schedThread.togglePause(false);

schedThread.togglePause(false);
//指示主处理循环在下一个可能的点暂停。
void togglePause(boolean pause) { 
   
    synchronized (sigLock) { 
   
        paused = pause;

        if (paused) { 
   
            signalSchedulingChange(0);
            ------------------------------------------
                //发信号通知主要处理循环,已经进行了调度的改变 - 以便中断在等待misfire时间到达时可能发生的任何睡眠。
                public void signalSchedulingChange(long candidateNewNextFireTime) { 
   
                synchronized(sigLock) { 
   
                    signaled = true;
                    signaledNextFireTime = candidateNewNextFireTime;
                    sigLock.notifyAll();  // private final Object sigLock = new Object();
                }
            }
            ------------------------------------------
        } else { 
   
            sigLock.notifyAll();//唤醒所有等待的线程
        }
    }
}	

四、总结

在看源码的时候最好要自己能够结合源码,通过文章在debug一下,可以加深印象和理解!

Quartz专栏系列

1.精进Quartz——Quartz大致介绍(一)
2.精进Quartz——Quartz简单入门Demo(二)
3.精进Quartz——Spring和Quartz集成详解
4.精进Quartz——SSMM(Spring+SpringMVC+Mybatis+Mysql)和Quartz集成详解(四)
5.精进Quartz源码——JobStore保存JonDetail和Trigger源码分析(一)
6.精进Quartz源码——scheduler.start()启动源码分析(二)
7.精进Quartz源码——QuartzSchedulerThread.run() 源码分析(三)
8.精进Quartz源码——Quartz调度器的Misfire处理规则(四)http://blog.csdn.net/u010648555/article/details/53672738)


谢谢你的阅读,如果您觉得这篇博文对你有帮助,请点赞或者喜欢,让更多的人看到!祝你每天开心愉快!


不管做什么,只要坚持下去就会看到不一样!在路上,不卑不亢!

博客首页 : http://blog.csdn.net/u010648555

愿你我在人生的路上能都变成最好的自己,能够成为一个独挡一面的人
精进Quartz源码—scheduler.start()启动源码分析(二)「建议收藏」

© 每天都在变得更好的阿飞云

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/121204.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • Jlink或者stlink用于SWD接口下载程序

    Jlink或者stlink用于SWD接口下载程序最近要使用stm32f103c8t6最小系统板,直接ISP串口下载程序太麻烦,就想着使用swd接口来调试。结果:通过SWD接口下载程序成功,但调试失败,还不知原因,会的的人麻烦交流一下。SWD接口:3.3VDIO(数据)CLK(时钟)GND1.首先声明jlink和stlink都有jtag和swd调试功能。jlink接口如下:如图,我使用的就是VCC…

  • 判断端口通不通的几种方法「建议收藏」

    判断端口通不通的几种方法「建议收藏」判断端口通不通的几种方法

  • 二级域名

    二级域名

    2021年10月21日
  • vue项目引入外部原生js文件_php引入文件的四个方法

    vue项目引入外部原生js文件_php引入文件的四个方法在开发Vue项目的时候,有时需要使用一些非ES6格式的没有export的js库,可以有如下方法实现:1.在index.html页面使用script标签引入当然也可以使用cdn的地址。这样引入后的内容是全局的,可以在所有地方使用。<!DOCTYPEhtml><html> <head> <title>Map</title> <metacharset=”utf-8″> <meta

  • php开源在线客服系统_源码屋

    php开源在线客服系统_源码屋另外,客#服源码也不太适用于小规模的企业通讯场景,用客#服源码往企业通讯场景上套就有点“杀鸡用牛刀”,因为企业通讯更强调功能丰富而非极限性能,与客#服源码的目标不符,所以二者的上层设计也不同。另外,在产品对比章节下,我们也提到了客#服源码项目的缺点供您参考。1、(业务功能完善性)客#服源码支持几乎所有商用即时通讯产品所支持的即时通讯相关功能(甚至还有更多的业务功能),且无业务功能限制,同时也支持一些诸如敏感词过滤(基于双数组Trie的AC自动机算法实现)、消息冷热分离存储等高级IM功能。……….

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号