线程池简陋小记

Updated on with 0 views and 0 comments

总所周知Java的异步是通过多线程实现的。

我们可以这样开启一个线程实现一个异步任务

        new Thread(new Runnable() {
            @Override
            public void run() {
                // TODO Auto-generated method stub 
            }
        }).start();

ok~能满足需求但弊端较多如下:

  1. 每次new Thread新建对象性能差。
  2. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
  3. 缺乏更多功能,如定时执行、定期执行、线程中断。

线程的创建与销毁对于程序来说是很耗能的操作,线程也是程序中十分珍贵的资源,一般珍贵的资源 我们会第一时间考虑复用。

池化技术——线程池、连接池、内存池

池化技术:池化技术简单点来说,就是提前保存大量的资源,以备不时之需

这里主要对JDK的线程池的使用做下记录,加深印象

使用线程池的好处
  • 减少在创建和销毁线程上所花的时间以及系统资源的开销
  • 线程是稀缺资源,如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存,使用线程池可以进行统一的分配,调优和监控
  • 重复利用。 线程用完,再放回,可以达到重复利用的效果,节省资源。

线程池的创建(自定义线程池)

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

核心参数的作用:

  • corePoolSize: 线程池核心线程数最大值
  • maximumPoolSize: 线程池最大线程数大小
  • keepAliveTime: 线程池中非核心线程空闲的存活时间大小
  • unit: 线程空闲存活时间单位
  • workQueue: 存放任务的阻塞队列
  • threadFactory: 用于设置创建线程的工厂,可以定义线程名,优先级等,可方便排查问题。
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
              .setNameFormat("demo-pool-%d").build();
  • handler: 线城池的饱和策略事件,主要有四种类型。

任务执行流程如下(对应execute方法)
image.png

四种拒绝策略

  • AbortPolicy -- 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。(默认)
  • DiscardPolicy(直接丢弃任务)
  • DiscardOldestPolicy(丢弃队列里最老的任务,将当前这个任务继续提交给线程池)
  • CallerRunsPolicy(交给线程池调用所在的线程进行处理)

创建任务

任务分为两种:一种是有返回值的( callable ),一种是没有返回值的( runnable ). Callable与 Future 两功能是Java在后续版本中为了适应多并法才加入的,Callable是类似于Runnable的接口,实现Callable接口的类和实现Runnable的类都是可被其他线程执行的任务。

  • 无返回值的任务就是一个实现了runnable接口的类.使用run方法.
  • 有返回值的任务是一个实现了callable接口的类.使用call方法.

Callable和Runnable的区别如下:

  • Callable定义的方法是call,而Runnable定义的方法是run。
  • Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
  • Callable的call方法可抛出异常,而Runnable的run方法不能抛出异常。

执行任务

通过java.util.concurrent.ExecutorService接口对象来执行任务,该对象有两个方法可以执行任务execute和submit。execute这种方式提交没有返回值,也就不能判断是否执行成功。submit这种方式它会返回一个Future对象,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成。

execute与submit区别:

  • 接收的参数不一样
  • submit有返回值,而execute没有
  • submit方便Exception处理
  • execute是Executor接口中唯一定义的方法;submit是ExecutorService(该接口继承Executor)中定义的方法

关闭线程池

线程池使用完毕,需要对其进行关闭,有两种方法

shutdown()

说明:shutdown并不是直接关闭线程池,而是不再接受新的任务…如果线程池内有任务,那么把这些任务执行完毕后,关闭线程池

shutdownNow()

说明:这个方法表示不再接受新的任务,并把任务队列中的任务直接移出掉,如果有正在执行的,尝试进行停止

合理的配置线程池

CPU密集型
一些进程绝大多数时间在计算上,称为计算密集型(CPU密集型)computer-bound。一些大量循环的代码(例如:图片处理、视频编码、人工智能等)就是CPU密集型。

I/O密集型
有一些进程则在input 和output上花费了大多时间,称为I/O密集型,I/O-bound。比如搜索引擎蜘蛛大多时间是在等待相应这种就属于I/O密集型。

一般公式:
CPU密集型:
核心线程数 = CPU核数+1;
最大线程数 = 核心线程数 * 2;
IO密集型:
核心线程数 = CPU核数 * 2;
最大线程数 = 核心线程数 * 2;

//范例
public class MyThreadPoolExecutor {
    //最大可用的CPU核数
    public static final int PROCESSORS=Runtime.getRuntime().availableProcessors();
    //线程最大的空闲存活时间,单位为秒
    public static final int KEEPALIVETIME=60;
    //任务缓存队列长度
    public static final int BLOCKINGQUEUE_LENGTH=500;

    public ThreadPoolExecutor createThreadPool(){
        return new ThreadPoolExecutor(PROCESSORS * 2,PROCESSORS * 4,KEEPALIVETIME,TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(BLOCKINGQUEUE_LENGTH));
    }
} 

案例

需求:从数据库中获取url,并利用httpclient循环访问url地址,并对返回结果进行操作

分析:由于是循环的对多个url进行访问并获取数据,为了执行的效率,考虑使用多线程,url数量未知如果每个任务都创建一个线程将消耗大量的系统资源,最后决定使用线程池。

public class GetMonitorDataService {
 
    private Logger logger = LoggerFactory.getLogger(GetMonitorDataService.class);
    @Resource
    private MonitorProjectUrlMapper groupUrlMapper;
    @Resource
    private MonitorDetailBatchInsertMapper monitorDetailBatchInsertMapper;
    public void sendData(){
        //调用dao查询所有url
        MonitorProjectUrlExample example=new MonitorProjectUrlExample();
        List<MonitorProjectUrl> list=groupUrlMapper.selectByExample(example);
        logger.info("此次查询数据库中监控url个数为"+list.size());
 
        //获取系统处理器个数,作为线程池数量
        int nThreads=Runtime.getRuntime().availableProcessors();
 
        //定义一个装载多线程返回值的集合
        List<MonitorDetail> result= Collections.synchronizedList(new ArrayList<MonitorDetail>());
        //创建线程池,这里定义了一个创建线程池的工具类,避免了创建多个线程池,ThreadPoolFactoryUtil可以使用单例模式设计
        ExecutorService executorService = ThreadPoolFactoryUtil.getExecutorService(nThreads);
        //遍历数据库取出的url
        if(list!=null&&list.size()>0) {
            for (MonitorProjectUrl monitorProjectUrl : list) {
                String url = monitorProjectUrl.getMonitorUrl();
                //创建任务
                ThreadTask threadTask = new ThreadTask(url, result);
                //执行任务
                executorService.execute(threadTask);
               
                try {//等待直到所有任务完成
                          executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
 //注意区分shutdownNow
            executorService.shutdown();
            //对数据进行操作
            saveData(result);
        }
    }


//-----任务类
public class ThreadTask implements Runnable{
    //这里实现runnable接口
    private String url;
    private List<MonitorDetail> list;
    public ThreadTask(String url,List<MonitorDetail> list){
        this.url=url;
        this.list=list;
    }
    //把获取的数据进行处理
    @Override
    public void run() {
        MonitorDetail detail = HttpClientUtil.send(url, MonitorDetail.class);
        list.add(detail);
    }

}

springboot的线程池配置

创建一个配置类ExecutorConfig,用来定义ThreadPoolTaskExecutor,使用@Configuration和@EnableAsync这两个注解,表示这是个配置类,并且是线程池的配置类,
如下所示:

@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {

    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);

    @Bean("asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(5);
        //配置最大线程数
        executor.setMaxPoolSize(5);
        //配置队列大小
        executor.setQueueCapacity(99999);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-service-");
        // 拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
}

在需要异常的方法上增加注解@Async(“asyncServiceExecutor”)即可

一般时候我们都不会选择直接使用上面的方式,而是继承ThreadPoolTaskExecutor重写里面的方法 拓展功能如下:

@Slf4j
public class CustomThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private static final long serialVersionUID = 685680463597708311L;

    private void showThreadPoolInfo(String prefix){
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if(null==threadPoolExecutor){
            return;
        }

//        map.put("提交任务数-->",threadPoolExecutor.getTaskCount());
//        map.put("完成任务数-->",threadPoolExecutor.getCompletedTaskCount());
//        map.put("当前有多少线程正在处理任务-->",threadPoolExecutor.getActiveCount());
//        map.put("还剩多少个任务未执行-->",threadPoolExecutor.getQueue().size());
//        map.put("当前可用队列长度-->",threadPoolExecutor.getQueue().remainingCapacity());

        log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("");
        return super.submitListenable(task);
    }
}

~~


标题:线程池简陋小记
作者:liqitian3344
地址:https://liqitian.com/articles/2019/12/23/1577081255517.html