重写ThreadFactory方法和拒绝策略
2020-01-28 16:01:51来源:博客园 阅读 ()
重写ThreadFactory方法和拒绝策略
最近项目中要用到多线程处理任务,自然就用到了ThreadPoolTaskExecutor这个对象,这个是spring对于Java的concurrent包下的ThreadPoolExecutor类的封装,对于超出等待队列大小的任务默认是使用RejectedExecutionHandler去处理拒绝的任务,而这个Handler的默认策略是AbortPolicy,直接抛出RejectedExecutionException异常,这个不符合我们的业务场景,
业务需求:我希望是对于超出的任务,主线程进行阻塞,直到有可用线程,简单的代码如下
package com.quant.dev.modules.dev.enetity;
import lombok.extern.slf4j.Slf4j;
import java.net.URL;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @program: dev
* @description:
* @author: Mr.EternityZhang
* @create: 2019-07-08 17:41
*/
@Slf4j
public class TestThread {
static class ThreadFactoryCustom implements ThreadFactory{
private final AtomicInteger threadNum=new AtomicInteger(1);
private final String namePrefix;
private ThreadFactoryCustom(String namePrefix){
this.namePrefix=namePrefix+"-";
}
@Override
public Thread newThread(Runnable r) {
Thread t=new Thread(r,namePrefix+threadNum.getAndIncrement());
if(t.isDaemon()){
t.setDaemon(true);
}
if(t.getPriority()!=Thread.NORM_PRIORITY){
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
static class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
private final String threadName;
private final URL url;
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Provider端线程池满!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)" ,
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
log.warn(msg);
if (!e.isShutdown()) {
try {
log.info("start get queue");
e.getQueue().put(r);
log.info("end get queue");
} catch (InterruptedException ee) {
log.error(ee.toString(), ee);
Thread.currentThread().interrupt();
}
}
}
}
public static ThreadFactory getThreadFactoryCustom(String name){
return new ThreadFactoryCustom(name);
}
public static void main(String[] args) {
String poolName="eternity";
ThreadFactory factory=getThreadFactoryCustom(poolName);
log.info("核数={}",Runtime.getRuntime().availableProcessors());
ThreadPoolExecutor executor=
new ThreadPoolExecutor(100,400,5,
TimeUnit.SECONDS,new LinkedBlockingDeque<>(400),factory,new AbortPolicyWithReport(poolName,null));
Long begin=System.currentTimeMillis();
CountDownLatch count=new CountDownLatch(2000);
AtomicInteger integer=new AtomicInteger(1);
for(int i=0;i<2000;i++){
executor.execute(()->{
try {
log.info("当前线程为={},数值={}",Thread.currentThread().getName(),integer);
integer.getAndIncrement();
Thread.sleep(500);
count.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});
}
try {
count.await();
log.info("阻塞数值={}",count.getCount());
log.info("活跃数量={}",executor.getActiveCount());
if(executor.getActiveCount()==0){
executor.shutdown();
}
} catch (Exception e) {
e.printStackTrace();
}
log.info("耗时={}------结果={}",System.currentTimeMillis()-begin,integer);
}
}
阻塞原理:
之所以能实现阻塞,是基于BlockingQueue的put方法来实现的,当阻塞队列满时,put方法会一直等待
参考
https://www.jianshu.com/p/3cfd943996a1
原文链接:https://www.cnblogs.com/eternityz/p/12238755.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
下一篇:事务总结
- 类的继承,方法重新中修饰符如何定义 2020-06-10
- java里面main方法中的String[]args 2020-06-07
- 错误: 在类中找不到 main 方法, 请将 main 方法定义为: & 2020-06-06
- 学习笔记之方法引用 2020-06-06
- Java连载120-反射机制获取构造方法和父类、父接口 2020-06-05
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash