手写线程池

2022-07-28,

文章目录

      • 阻塞队列
      • coreSize用完了,队列也满了采用了 才用这个接口的实现类的方法
      • 线程
      • 测试


本文代码可能会因为某些地方没有加锁,会抛出异常!请谨慎食用。写这个只是为了更好地了解线程池的实现。

阻塞队列

public class BlockQueue<T> {
    private Deque<T> queue=new ArrayDeque<>();
    private int capcity;
    private ReentrantLock lock=new ReentrantLock();
    private Condition emptyWait=lock.newCondition();
    private Condition fullWait=lock.newCondition();

    public BlockQueue(int capcity) {
        this.capcity = capcity;
    }
    public int getSize(){
        return queue.size();
    }
    public T take(){
        lock.lock();
        try{
            while(queue.isEmpty()){
                try {
                    emptyWait.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t=queue.removeFirst();
            fullWait.signal();
            return t;
        }finally{
            lock.unlock();
        }
    }
    public void put(T task,RejectPolicy<T> rejectPolicy){
        lock.lock();
        try{
            if(queue.size()==capcity){
                rejectPolicy.reject(this,task);
            }else{
                queue.addLast(task);
            }
        }finally {
            lock.unlock();
        }
    }
    public T timeTake(long timeout, TimeUnit timeUnit) {
        lock.lock();
        try{
            long nanos=timeUnit.toNanos(timeout);
            while(queue.isEmpty()){
                try {
                    if(nanos<=0) return null;
                    nanos=emptyWait.awaitNanos(timeout);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t=queue.removeFirst();
            fullWait.signal();
            return t;
        }finally{
            lock.unlock();
        }
    }
    public boolean timePut(T task,long timeout,TimeUnit timeUnit){
        lock.lock();
        try{
            long nanos=timeUnit.toNanos(timeout);
            while(queue.size()==capcity){
                try {
                    if(nanos<=0) return false;
                    nanos=fullWait.awaitNanos(timeout);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            emptyWait.signal();
            return true;
        }finally{
            lock.unlock();
        }
    }
}

coreSize用完了,队列也满了采用了 才用这个接口的实现类的方法

public interface RejectPolicy<T> {
    void reject(BlockQueue<T> taskQueue,T task);
}

线程池

public class ThreadPool {
    private BlockQueue<Runnable> taskQueue;
    private int coreSize;
    private HashSet<Worker> workers=new HashSet<>();
    private RejectPolicy<Runnable> rejectPolicy;
    public ThreadPool(int capcity,int coreSize,RejectPolicy<Runnable> rejectPolicy){
        this.taskQueue=new BlockQueue<>(capcity);
        this.coreSize=coreSize;
        this.rejectPolicy=rejectPolicy;
    }
    public void excute(Runnable task){
        synchronized (workers){
            if(workers.size()<coreSize){
                Worker worker=new Worker(task);
                worker.start();
                workers.add(worker);
            }else{
                taskQueue.put(task,rejectPolicy);
            }
        }
    }
    class Worker extends Thread{
        private Runnable task;
        public Worker(Runnable task){
            this.task=task;
        }
        @Override
        public void run(){
            while(task!=null||(task=taskQueue.timeTake(1000,TimeUnit.MILLISECONDS))!=null){
                try{
                    task.run();
                }catch(RuntimeException e){
                    e.printStackTrace();
                }
                task=null;
            }
            workers.remove(this);
        }
    }
}

测试

public class Test {
    public static void main(String[] args) {
        ThreadPool threadPool=new ThreadPool(2,2,(queue,task)->{
            //什么也不做
            //task.run(); 让主线程去执行
            //throw new RuntimeException("阻塞队列容量不够 直接不管执行任务失败"+task);//抛出异常
            //boolean flag=queue.timePut(task,1000, TimeUnit.MILLISECONDS);
            //if(flag==false) throw new RuntimeException("阻塞队列容量不够,超时获取也没获取得到"+task);
        });
        threadPool.excute(()->{
            System.out.println("fnq是小狗");
        });
        threadPool.excute(()->{
            System.out.println("swt是小猪猪");
        });
    }
}

本文地址:https://blog.csdn.net/qq_42576687/article/details/109268564

《手写线程池.doc》

下载本文的Word格式文档,以方便收藏与打印。