关于java.util.concurrent.RejectedExecutionException: event executor terminated-程序员宅基地

技术标签: java  开发语言  

多线程报了个java.util.concurrent.RejectedExecutionException: event executor terminated 

线程池的拒绝策略

ThreadPoolExecutor内部有实现4个拒绝策略,默认为AbortPolicy策略

  • CallerRunsPolicy:由调用execute方法提交任务的线程来执行这个任务
  • AbortPolicy:抛出异常RejectedExecutionException拒绝提交任务
  • DiscardPolicy:直接抛弃任务,不做任何处理
  • DiscardOldestPolicy:去除任务队列中的第一个任务,重新提交

线程池中,有三个重要的参数,决定影响了拒绝策略:corePoolSize - 核心线程数,也即最小的线程数。workQueue - 阻塞队列 。 maximumPoolSize - 最大线程数

当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到 maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。

总结起来,也就是一句话,当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。

当你的线程跑满,并且等待队列也满了以后,再执行任务,就会报错,你也可以抛弃掉新增的任务
你的线程池最大是100,队列是200,所以你任务跑多了,就会报这个错误

拒绝策略的源码

CallerRunsPolicy

 /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     * 用于拒绝任务的处理程序,
     * 可以直接在{@code execute}方法的调用线程中运行被拒绝的任务
     * 除非执行器已被关闭,否则将丢弃该任务。
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         * 创建一个{@code CallerRunsPolicy}。
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         * 除非执行器已关闭,否则在调用者线程中执行任务,
         * r 在这种情况下,该任务将被丢弃。
         *
         * @param r the runnable task requested to be executed
         *          r 请求执行的可运行任务
         * @param e the executor attempting to execute this task
         *          e 尝试执行此任务的执行者
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

分析:

CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)

这样生产者虽然没有被阻塞,但提交任务也会被暂停。

但这种策略也有隐患,当生产者较少时,生产者消费任务的时间里,消费者可能已经把任务都消费完了,队列处于空状态,当生产者执行完任务后才能再继续生产任务,这个过程中可能导致消费者线程的饥饿。

AbortPolicy

 /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     * 抛出{@code RejectedExecutionException}的拒绝任务处理程序。
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         * 总是抛出RejectedExecutionException
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

分析:

该策略是默认饱和策略。

使用该策略时在饱和时会抛出RejectedExecutionException(继承自RuntimeException),调用者可捕获该异常自行处理。

DiscardPolicy

   /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     * 拒绝任务的处理程序,默认丢弃拒绝任务。
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         * 不执行任何操作,这具有丢弃任务 r 的作用。
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

分析:

如代码所示,不做任何处理直接抛弃任务

DiscardOldestPolicy

   /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     * 处理被拒绝任务的处理程序,它丢弃最旧的未处理请求,
     * 然后重试{@code execute},
     * 除非执行器*被关闭,在这种情况下,该任务将被丢弃。
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         * 获取并忽略执行者*会立即执行的下一个任务(如果一个任务立即可用),
         * 然后重试任务r的执行,除非执行者*被关闭,在这种情况下,任务r会被丢弃。
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

分析:

如代码,先将阻塞队列中的头元素出队抛弃,再尝试提交任务。如果此时阻塞队列使用PriorityBlockingQueue优先级队列,将会导致优先级最高的任务被抛弃,因此不建议将该种策略配合优先级队列使用。

自定义策略

看完发现默认的几个拒绝策略并不是特别的友好,那么可不可以咱们自己搞个呢?

可以发现,所有的拒绝策略都是实现了 RejectedExecutionHandler 接口

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

这个接口只有一个 rejectedExecution 方法。

r 为待执行任务;executor 为线程池;方法可能会抛出拒绝异常。

那么咱们就可以通过实现 RejectedExecutionHandler 接口扩展

两个栗子:一

netty自己实现的线程池里面私有的一个拒绝策略。单独启动一个新的临时线程来执行任务。

  private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                final Thread t = new Thread(r, "Temporary task executor");
                t.start();
            } catch (Throwable e) {
                throw new RejectedExecutionException(
                        "Failed to start a new thread", e);
            }
        }
    }

两个栗子:二

dubbo的一个例子,它直接继承的 AbortPolicy ,加强了日志输出,并且输出dump文件

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                        " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                        " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        dumpJStack();
        throw new RejectedExecutionException(msg);
    }
}

自己玩

参考类似的思路,最简单的做法,我们可以直接定义一个RejectedExecutionHandler,当队列满时改为调用BlockingQueue.put来实现生产者的阻塞:

 new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                if (!executor.isShutdown()) {
                    try {
                        executor.getQueue().put(r);
                    } catch (InterruptedException e) {
                        // should not be interrupted
                    }
                }
            }
        };

这样,我们就无需再关心Queue和Consumer的逻辑,只要把精力集中在生产者和消费者线程的实现逻辑上,只管往线程池提交任务就行了。

相比最初的设计,这种方式的代码量能减少不少,而且能避免并发环境的很多问题。当然,你也可以采用另外的手段,例如在提交时采用信号量做入口限制等,但是如果仅仅是要让生产者阻塞,那就显得复杂了。

总结

四种线程池拒绝策略,具体使用哪种策略,还得根据实际业务场景才能做出抉择。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/baidu_37366055/article/details/125535868

智能推荐

sqli-labs:less-32(宽字节注入)_sqli labs less32 宽字节注入-程序员宅基地

文章浏览阅读3k次。宽字节注入产生原因:1、mysql 在使用 GBK 编码的时候,会认为两个字符为一个汉字,例如%aa%5c 就是一个汉字(前一个 ascii 码大于 128 才能到汉字的范围)2、mysqli_real_escape_string() 函数转义在 SQL 语句中使用的字符串中的特殊字符。mysqli_real_escape_string(*connection,escapestring*)connection 必需。规定要使用的 MySQL 连接 escapestr..._sqli labs less32 宽字节注入

HTML5 游戏开发快速提升_html5游戏开发 slg游戏-程序员宅基地

文章浏览阅读3.1k次。带你快速掌握html5游戏开发基本功,助你快速进阶游戏开发_html5游戏开发 slg游戏

Windows下kafka生成消费者报错:Missing required argument “[zookeeper]“_missing required argument "[zookeeper]-程序员宅基地

文章浏览阅读4.3k次。.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic kafka-test-topic --from-beginning上面的语句就是在Windos下 kafka生成消费者的命令行,这命令行是我从百度上找的,但一直是报错。仔细看报错是缺少一个参数–zookeeper,然后我W3Cshool关于kafka的教程上看到Linux系统下生成消费者的命令行。如下:bin/kafka-console-consumer.sh._missing required argument "[zookeeper]

Qt Creator中,include路径包含过程(或如何找到对应的头文件)_qt include路径-程序员宅基地

文章浏览阅读5.9w次,点赞11次,收藏29次。Qt Creator中,include路径包含过程(或如何找到对应的头文件)利用Qt Creator开发程序时,需要包含利用#include来添加头文件。大家都知道,#include 对于后者,路径比较直观,容易理解。如#include "lyc/daniel.h",路径在当前目录的lyc文件夹下。(文件包含是可以嵌套的。)下面重点追溯一下Qt的标准库头文件的路径包含情况。_qt include路径

Fortify代码扫描问题及修复_input validation and representation-程序员宅基地

文章浏览阅读1.6w次,点赞8次,收藏39次。静态代码扫描常见问题及修复风险类型原因Code Correctness: Erroneous String Compare字符串的对比使用错误方法Cross-Site ScriptingWeb浏览器发送非法数据,导致浏览器执行恶意代码Dead Code: Expression is Always true表达式的判断总是trueDead Code: Unused Method没有使用的方法HTTP Response Splitting含有未验证的数据_input validation and representation

探索Svelte SVG Patterns:创新SVG图案生成器-程序员宅基地

文章浏览阅读278次,点赞4次,收藏9次。探索Svelte SVG Patterns:创新SVG图案生成器项目地址:https://gitcode.com/catchspider2002/svelte-svg-patterns该项目[[链接]][1]是一个基于Svelte框架构建的SVG图案生成工具,由开发者catchspider2002维护。它允许用户通过简单的交互式界面创建独特且可自定义的SVG图案,这些图案可以用于网页背景、图标...

随便推点

nohup后台执行不打印_nohub有些打印不出来-程序员宅基地

文章浏览阅读4.3k次。①后台执行,但是打印log在nohup.log在你当前目录下,随着执行这个文件越来越大,直到执行完nohup 你要执行的命令 &amp;例如:nohup bash -x test.sh &amp;②后台执行,不打印lognohup 你要执行的命令 &gt;/dev/null 2&gt;log &amp;例如:nohup bash -x test.sh &gt;/dev/..._nohub有些打印不出来

RadioGroup动态添加RadioButton,并且获得事件_radiogroup 动态-程序员宅基地

文章浏览阅读3.6w次,点赞5次,收藏20次。由于有许多的RadioButton是动态的,不是固定的一些,所以需要在代码中,动态的添加到RadioGroup中,下面是我的实现方法。1、添加RadioButton到RadioGroup中RadioGroup group;for(int i=0; i<10; i++){ RadioButton tempButton = new RadioButton(this);_radiogroup 动态

IDEA 1_intellij idea 2018 xms xmx-程序员宅基地

文章浏览阅读246次。"D:\IDEAINSTALL\IntelliJ IDEA 2018.2.1\bin\idea64.exe.vmoptions" -Xms128m-Xmx750m-XX:ReservedCodeCacheSize=240m-XX:+UseConcMarkSweepGC-XX:SoftRefLRUPolicyMSPerMB=50-ea-Dsun.io.useCanonCache..._intellij idea 2018 xms xmx

项目管理中冲突的六种解决方法_项目 冲突 解决的案例-程序员宅基地

文章浏览阅读2.8w次,点赞3次,收藏17次。解决项目冲突的主要责任在于项目经理,项目经理可以使用以下六种方法来解决冲突:面对/解决问题(confronting/problem solving):通过审查备选方案,把冲突当作需要解决的问题来处理;需要以“取舍”的态度进行公开对话。问题解决就是冲突各方一起积极地定义问题、收集问题的信息、制定解决方案,最后直到选择一个最合适的方案来解决冲突,此时为双赢或多赢。但在这个过程中,需要公开地协商_项目 冲突 解决的案例

VM虚拟机安装Windows XP Service Pack 3 (x86)_windows_xp_professional_with_service_pack_3_x86-程序员宅基地

文章浏览阅读2.9k次,点赞2次,收藏3次。Windows版本  Windows XP发行于2001年10月25日,于2014年4月8日停止支持,XP取自Experience。Windows 7发行于2009年10月22日。  Windows XP Service Pack 3 (x86):SP(service pack)是指服务补丁包,01年发布的XP不带补丁,第一次补丁版本叫SP2,14年最后一个补丁版本叫SP3。下载ISO  操作系统站:https://msdn.itellyou.cn/。值得一提,MSDN是原装系统,跟ghost系统_windows_xp_professional_with_service_pack_3_x86

vue-cli指定版本安装_vue-cli-service 选哪个版本-程序员宅基地

文章浏览阅读1.2w次,点赞16次,收藏32次。安装新的版本前,需要先把之前安装的版本卸载掉。vue卸载:npm uninstall vue-cli -g(3.0以下版本卸载)npm uninstall -g @vue/cli(3.0以上版本卸载)vue安装:npm install -g @vue/cli (安装的是最新版)npm install [email protected] (指定版本安装【指定版本为3.0以下版本】,其中2.9.6为版本号)npm install -g @vue/[email protected](指定版本安装【指定版本为3.0以上版本】,_vue-cli-service 选哪个版本

推荐文章

热门文章

相关标签