调度服务 ScheduledExecutorService 经常卡顿问题的排查及解决方法

调度服务 ScheduledExecutorService 经常卡顿问题的排查及解决方法如上述代码所示,启动10个调度线程,延迟10秒,开始执行定时逻辑,然后每隔2秒执行一次定时任务。定时任务类为`TaskWorker`,其要做的事就是根据`offset`和`rows`参数,到数据库捞取指定范围的待处理记录,然后送到`TaskService`的`processTask`方法中进行处理。从逻辑上来看,该定时没有什么毛病,但是在执行定时任务的时候,却经常出现卡顿的问题,表现出来的现象就是:**定时任务不执行了**。

大家好,又见面了,我是你们的朋友全栈君。

问题描述

首先,给出调度服务的 Java 代码示例:

@Slf4j
@Component
public class TaskProcessSchedule { 
   

    // 核心线程数
    private static final int THREAD_COUNT = 10;

    // 查询数据步长
    private static final int ROWS_STEP = 30;

    @Resource
    private TaskDao taskDao;

    @Resource
    private TaskService taskService;

    private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(THREAD_COUNT);

    public TaskProcessSchedule() { 
   
        for (int i = 0; i < THREAD_COUNT; i++) { 
   
            scheduledExecutorService.scheduleAtFixedRate(
                    new TaskWorker(i * ROWS_STEP, ROWS_STEP),
                    10,
                    2,
                    TimeUnit.SECONDS
            );
        }
        log.info("TaskProcessSchedule scheduleAtFixedRate start success.");
    }
 
    class TaskWorker implements Runnable { 
   
        private int offset;
        private int rows;

        TaskWorker(int offset, int rows) { 
   
            this.offset = offset;
            this.rows = rows;
        }

        @Override
        public void run() { 
   
            List<Task> taskList = taskDao.selectProcessingTaskByLimitRange(offset, rows);
            if (CollectionUtils.isEmpty(taskList)) { 
   
                return;
            }
            log.info("TaskWorker: current schedule thread name is {}, taskList is {}", Thread.currentThread().getName(), JsonUtil.toJson(taskList));
            taskService.processTask(taskList);         
        }
    }
}

如上述代码所示,启动 10 个调度线程,延迟 10 秒,开始执行定时逻辑,然后每隔 2 秒执行一次定时任务。定时任务类为TaskWorker,其要做的事就是根据offsetrows参数,到数据库捞取指定范围的待处理记录,然后送到TaskServiceprocessTask方法中进行处理。从逻辑上来看,该定时没有什么毛病,但是在执行定时任务的时候,却经常出现卡顿的问题,表现出来的现象就是:定时任务不执行了

问题定位

既然已经知道问题的现象了,现在我们就来看看如果定位问题。

  • 使用jps命令,查询当前服务器运行的 Java 进程PID

当然,也可以直接使用jps | grep "ServerName"查询指定服务的PID,其中ServerName为服务名称。

  • 使用jstack PID | grep "schedule"命令,查询调度线程的状态

jstack-schedule

如上图所示,发现我们启动的 10 个调度线程均处于WAITING状态。

  • 使用jstack PID | grep "schedule-task-10" -A 50命令,查询指定线程的详细信息

schedule-task-10

如上图所示,我们可以知道调度线程在执行DelayedWorkQueuetake()方法的时候被卡主了。

深入分析

通过上面的问题定位,我们已经知道了代码卡在了这里:

at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)

那么接下来,我们就详细分析一下出问题的代码。

        public RunnableScheduledFuture<?> take() throws InterruptedException { 
   
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try { 
   
                for (;;) { 
   
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else { 
   
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await(); // 1088 行代码
                        else { 
   
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try { 
   
                                available.awaitNanos(delay);
                            } finally { 
   
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally { 
   
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

由于上述代码可知,当延迟队列的任务为空,或者当任务不为空且leader线程不为null的时候,都会调用await方法;而且,就算leadernull,后续也会调用awaitNanos方法进行延迟设置。下面, 我们再来看看提交任务的方法scheduleAtFixedRate

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) { 
   
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

scheduleAtFixedRate方法中会调用decorateTask方法装饰任务t,然后再将该任务扔到delayedExecute方法中进行处理。

    private void delayedExecute(RunnableScheduledFuture<?> task) { 
   
        if (isShutdown())
            reject(task);
        else { 
   
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

delayedExecute方法中,主要是检查线程池中是否可以创建线程,如果不可以,则拒绝任务;否则,向任务队列中添加任务并调用ensurePrestart方法。

    void ensurePrestart() { 
   
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

ensurePrestart方法中,主要就是判断工作线程数量是否大于核心线程数,然后根据判断的结果,使用不同的参数调用addWorker方法。

    private boolean addWorker(Runnable firstTask, boolean core) { 
   
        retry:
        for (;;) { 
   
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) { 
   
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try { 
   
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) { 
   
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try { 
   
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) { 
   
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally { 
   
                    mainLock.unlock();
                }
                if (workerAdded) { 
   
                    t.start();
                    workerStarted = true;
                }
            }
        } finally { 
   
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker方法中,主要目的就是将任务添加到workers工作线程池并启动工作线程。接下来,我们再来看看Worker的执行逻辑,也就是run方法:

        public void run() { 
   
            runWorker(this);
        }

run方法中,主要就是将调用转发到外部的runWorker方法:

    final void runWorker(Worker w) { 
   
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try { 
   
            while (task != null || (task = getTask()) != null) { 
   
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try { 
   
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try { 
   
                        task.run(); // 执行调度任务
                    } catch (RuntimeException x) { 
   
                        thrown = x; throw x;
                    } catch (Error x) { 
   
                        thrown = x; throw x;
                    } catch (Throwable x) { 
   
                        thrown = x; throw new Error(x);
                    } finally { 
   
                        afterExecute(task, thrown);
                    }
                } finally { 
   
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally { 
   
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker方法中,核心操作就是调用task.run(),其中taskRunnable类型,其实现类为ScheduledFutureTask,而ScheduledFutureTask继承了FutureTask类。对于FutureTask类,如果在执行run方法的过程中抛出异常,则这个异常并不会显示抛出,而是需要我们调用FutureTaskget方法来获取,因此如果我们在执行调度任务的时候没有进行异常处理,则异常会被吞噬。

特别地,在FutureTask类中,大量操作了sun.misc.Unsafe LockSupport类,而这个类的park方法,正是上面我们排查问题时定位到调度任务卡住的地方。除此之外,如果我们详细阅读了ScheduledExecutorServicescheduleAtFixedRate的 doc 文档,如下所示:

/** * Creates and executes a periodic action that becomes enabled first * after the given initial delay, and subsequently with the given * period; that is executions will commence after * {@code initialDelay} then {@code initialDelay+period}, then * {@code initialDelay + 2 * period}, and so on. * If any execution of the task * encounters an exception, subsequent executions are suppressed. * Otherwise, the task will only terminate via cancellation or * termination of the executor. If any execution of this task * takes longer than its period, then subsequent executions * may start late, but will not concurrently execute. * * @param command the task to execute * @param initialDelay the time to delay first execution * @param period the period between successive executions * @param unit the time unit of the initialDelay and period parameters * @return a ScheduledFuture representing pending completion of * the task, and whose {@code get()} method will throw an * exception upon cancellation * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if command is null * @throws IllegalArgumentException if period less than or equal to zero */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

我们会发现这样一句话:

If any execution of the task encounters an exception, subsequent executions are suppressed.

翻译过来,就是:

如果任务的任何执行遇到异常,则禁止后续的执行

说白了,就是在执行调度任务的时候,如果遇到了(未捕获)的异常,则后续的任务都不会执行了。

解决方法

到这里,我们已经知道了问题产生的原因。下面,我们就修改开篇的示例代码,进行优化:

@Slf4j
@Component
public class TaskProcessSchedule { 
   

    // 核心线程数
    private static final int THREAD_COUNT = 10;

    // 查询数据步长
    private static final int ROWS_STEP = 30;

    @Resource
    private TaskDao taskDao;

    @Resource
    private TaskService taskService;

    private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(THREAD_COUNT);

    public TaskProcessSchedule() { 
   
        for (int i = 0; i < THREAD_COUNT; i++) { 
   
            scheduledExecutorService.scheduleAtFixedRate(
                    new TaskWorker(i * ROWS_STEP, ROWS_STEP),
                    10,
                    2,
                    TimeUnit.SECONDS
            );
        }
        log.info("TaskProcessSchedule scheduleAtFixedRate start success.");
    }
 
    class TaskWorker implements Runnable { 
   
        private int offset;
        private int rows;

        TaskWorker(int offset, int rows) { 
   
            this.offset = offset;
            this.rows = rows;
        }

        @Override
        public void run() { 
   
            List<Task> taskList = taskDao.selectProcessingTaskByLimitRange(offset, rows);
            if (CollectionUtils.isEmpty(taskList)) { 
   
                return;
            }
            log.info("TaskWorker: current schedule thread name is {}, taskList is {}", Thread.currentThread().getName(), JsonUtil.toJson(taskList));
            try { 
    // 新增异常处理
            	taskService.processTask(taskList);         
            } catch (Throwable e) { 
   
                log.error("TaskWorker come across a error {}", e);
            }
        }
    }
}

如上述代码所示,我们对任务的核心逻辑进行了try-catch处理,这样当任务再抛出异常的时候,仅会忽略抛出异常的任务,而不会影响后续的任务。这也说明一件事,那就是:我们在编码的时候,要特别注意对异常情况的处理

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/137015.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • Spring事务管理[通俗易懂]

    Spring事务管理[通俗易懂]1、Spring的事务管理主要包括3个接口TransactionDefinition:封装事务的隔离级别,超时时间,是否为只读事务和事务的传播规则等事务属性,可通过XML配置具体信息。Platfo

  • Java——数组转换为List集合

    Java——数组转换为List集合packageday04;importjava.util.ArrayList;importjava.util.Arrays;importjava.util.List;/***数组转换为List集合*不能转换为Set集合,原因在于Set集合不能存放重复元素*@authorAdministrator**/publicclassToListDemo…

  • html5教程单摆,Flash动画—单摆的制作教程

    html5教程单摆,Flash动画—单摆的制作教程想起当初作这个动画时,真是不知如何下手,所以,这是一篇献给初学者的教程的单摆动画的制作,应该要解决两个方面的问题:一、单摆本身的制作,这一点只要用好flash的绘图工具即可二、单摆振动,这一点将是教程的重点也是难点下面就先解决第一个问题,制作单摆(这一步的制作注意注册点的选择)首先要弄清,单摆有三部分组成:摆线、摆球、悬挂点(天花板)(一)、摆线:1、选取工具区的线条工具,线条粗细默认,在主场景按…

  • mysql数据库的字段数据类型有哪些_mysql数据类型详解

    mysql数据库的字段数据类型有哪些_mysql数据类型详解mysql数据类型有:1、数值类型;2、日期和时间类型;3、字符串类型是CHAR、VARCHAR、BINARY【二进制数据类型】、BLOB、TEXT【文本类型】、ENUM【枚举类型】和SET【数据集合】。mysql数据类型有:MySQL支持的数据类型在几类:数值类型,日期和时间类型和字符串(字符)类型。数据类型:注意:在定义字段是我们经常使用numint(10)unsigned这种写法…

  • Java环境变量配置详细步骤

    Java环境变量配置详细步骤引言很多初学Java的小伙伴可能都会听别人说想要编译运行Java程序需要配置环境变量,所以在这里我就手把手教给你如何配置Java环境变量;再多说一句,可能会有小伙伴想:我编译运行Java程序干嘛要配置环境变量呢,直接用IDEA等开发工具不好嘛;其实对于Java初学者,学习Java最好开始不要使用这些开发工具,因为这些工具功能实在是太强大了,并不适合开始学习Java,不利于打好基础;所以开始最好还是老老实实用DOS编译运行Java程序吧!;注:电脑系统是win10下载JDK至于什么是JDK还有到底有

  • centos7 输入 ifconfig 不显示 ip 地址 连接不上的解决方法(亲测成功)「建议收藏」

    centos7 输入 ifconfig 不显示 ip 地址 连接不上的解决方法(亲测成功)「建议收藏」最近又把自己的虚拟机打开了玩玩集群,遇到一个小问题,我发现虚拟机的内存不够了,就把虚拟机关机加大了内存,谁知道开机后,ifconfig或者ipaddr显示没有ip地址,只显示一个lo,没有ens33,没有ip地址就没法用xshell连接,很蛋疼,网上也有很多解决方案,但都写的乱七八糟的,而且很多都不好使,今天就来介绍一下我最后解决的方法.我说一下我的虚拟机的情况,我三台虚拟机,之前是mas………

    2022年10月23日

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号