现在的位置: 首页 > 综合 > 正文

jetty的线程池实现QueuedThreadPool

2014年07月04日 ⁄ 综合 ⁄ 共 5295字 ⁄ 字号 评论关闭

线程池这个应该是比较重要的一个组件了吧。。。。首先在SelectChannelConnector中,需要建立SelectSet,从而建立selector,而select的执以及I/O的都需要放到线程池中运行,而且需要独占的线程。。

而当selector中获取远程连接的数据之后,就需要进行http的处理流程。。。这里又需要将他们派发到线程池中运行。。。

从而线程池实现的高效也是jetty是否能够高效的执行的关键。。。。

不过其实总体来说jetty的线程池的实现还是很简单的。。。。常用的就是QueuedThreadPool。。

也没啥继承,本来还以为要用到concurrent里面线程的。。

好了,先来看看他的一些属性申明吧:

    private String _name;
    private Set _threads;   //当前所有的线程的集合
    private List _idle;   //保存空闲的线程
    private Runnable[] _jobs;   //一个数组,用于保存提交的task,将其用成了循环队列
    private int _nextJob;    //下一个要执行的线程的位置
    private int _nextJobSlot;  //可以用来存放提交的任务的位置
    private int _queued;   //已经放了多少任务到jobs数组中,等待被执行
    private int _maxQueued;    //最大
    
    private boolean _daemon;  //是否要将线程设置为后台线程
    private int _id;

    //三个锁,_lock用于保护线程池公用的数据,例如queued,jobs啥的,
    private final Object _lock = new Lock();
    private final Object _threadsLock = new Lock();
    private final Object _joinLock = new Lock();

    private long _lastShrink;  //表示上一次线程空闲的时间
    private int _maxIdleTimeMs=60000;  //最大空闲时间是一分钟啊,还挺长的
    private int _maxThreads=250;   //最大线程数量。。我擦。。居然这么多
    private int _minThreads=2;   //最小线程数量  。。感觉这些默认的属性值都不靠谱啊在
    private boolean _warned=false;   //如果当前服务器太忙了,它会被设置
    private int _lowThreads=0;   //低水平的线程适量
    private int _priority= Thread.NORM_PRIORITY;   //线程优先级
    private int _spawnOrShrinkAt=0;   //最多能够将这么多任务放到任务队列中,如果太多了,那么应该启动更多的线程去执行,这个一般都会在外面被设置
    private int _maxStopTimeMs;

这里比较重要的就是几个数组吧,首先是threads,它是一个集合,用于保存现在拥有的所有线程。。。

其次是idle数组,这个用于保存空闲的线程。。。。

再其次就是job数组了,它用于保存提交的任务。。如果不能立即找到空闲的线程来执行的话,那么就先暂时将其派发到数组里面去

好了,接下来来看看  doStart方法吧,。。。看看当前线程池是怎么启动的。。。

    //启动当前的线程池,
    protected void doStart() throws Exception {
        if (_maxThreads<_minThreads || _minThreads<=0)
            throw new IllegalArgumentException("!0<minThreads<maxThreads");
        
        _threads=new HashSet();   //创建用于保存所有线程的集合 
        _idle=new ArrayList();    //保存空闲的线程的数组
        _jobs=new Runnable[_maxThreads];    //用于暂时保存任务
        
        //创建这么多的线程
        for (int i=0;i<_minThreads;i++) {
            newThread();
        }   
    }

其实这里做的事情没啥意思,无非就是创建一些集合,数组啥的,然后创建执行线程。。。。

那么来看看这个newThread方法做了什么事情吧:

    //创建线程的方法
    protected void newThread() {
        synchronized (_threadsLock) {
            if (_threads.size()<_maxThreads) {  //如果还没有到达最大线程
                PoolThread thread =new PoolThread();  //创建一个执行线程
                _threads.add(thread);  //将刚刚创建的线程保存到集合中去
                thread.setName(thread.hashCode()+"@"+_name+"-"+_id++);
                thread.start();    //启动这个线程
            } else if (!_warned) {  //否则就表示创建的线程已经到了最大数量了。。那么当前服务器太忙了
                _warned=true;
                Log.debug("Max threads for {}",this);
            }
        }
    }

好吧,还是没啥意思,这里面又真正的创建执行线程PoolThread,它是当前类型的内部类,然后在将其设置到threads数组里面,最后在启动它。。

那么我们接下来来看看这个PoolThread的定义吧:

    //执行线程的定义
    public class PoolThread extends Thread 
    {
        Runnable _job=null;

        PoolThread() {
            setDaemon(_daemon);  //这里是否要设置为后台线程要看外面的设置情况
            setPriority(_priority);
        }
        
        //执行函数
        public void run() {
            boolean idle=false;   //刚开始肯定默认 没有空闲
            Runnable job=null;  //需要执行的job
            try {
                while (isRunning())  {     //首先要判断当前线程池还在运行
                    // Run any job that we have.
                    if (job!=null)  {
                        final Runnable todo=job;
                        job=null;
                        idle=false;
                        todo.run();
                    }
                    
                    synchronized(_lock) {
                        // is there a queued job?
                        if (_queued>0) {   //表示数组里面还有线程需要执行
                            _queued--;   //减一,因为当前线程会执行一个task
                            job=_jobs[_nextJob++];   //获取 这个任务
                            if (_nextJob==_jobs.length)
                                _nextJob=0;   //循环数组的操作
                            continue;  //直接continue,那么就会执行刚刚取出来的任务了
                        }

                        // Should we shrink?
                        final int threads=_threads.size();  //当前线程的总数量
                        if (threads>_minThreads && 
                            (threads>_maxThreads || 
                             _idle.size()>_spawnOrShrinkAt))     {
                            long now = System.currentTimeMillis();
                            if ((now-_lastShrink)>getMaxIdleTimeMs())  {  //空闲时间太长了,而且线程太多了,那么需要退出一些线程
                                _lastShrink=now;
                                _idle.remove(this);
                                return;
                            }
                        }

                        if (!idle) {   //表示当前线程空闲了
                            _idle.add(this);   //将他们放到空闲数组里面去
                            idle=true;  //表示已经空闲了
                        }
                    }
                    synchronized (this) {
                        if (_job==null) {
                            this.wait(getMaxIdleTimeMs());  //阻塞这么长的时间,有可能会超时唤醒,也有可能会被外面唤醒 
                        }
                        job=_job;
                        _job=null;
                    }
                }
            }
            catch (InterruptedException e)
            {
                Log.ignore(e);
            }
            finally
            {
                synchronized (_lock)
                {
                    _idle.remove(this);
                }
                synchronized (_threadsLock)
                {
                    _threads.remove(this);
                }
                synchronized (this)
                {
                    job=_job;
                }
                
                // we died with a job! reschedule it
                if (job!=null)
                {
                    QueuedThreadPool.this.dispatch(job);
                }
            }
        }
        
        /* ------------------------------------------------------------ */
        void dispatch(Runnable job) {  //外面直接提交task给这个线程,那么会唤醒它
            synchronized (this) {
                _job=job;  //
                this.notify();  //唤醒当前的阻塞
            }
        }
    }

    private class Lock{}
}

其实这个很简单吧,run方法也很简单就能够理解,首先判断当前的jobs数组里面是否有需要执行的task,如果有的话,那么取出来一个任务执行,如果没有的话,那么就表示没有可以执行的任务了,那么就需要将其放到数组空闲数组里面去了(当然这里还会判断线程是否过多,以及空闲时间太长,有可能执行线程直接就退出了),,然后调用wait方法阻塞当前线程。。。它有可能因为超时而唤醒,也有可能因为外面又task需要执行了而被唤醒。。。


好了。。那么执行线程还是比较简单的吧。。。接着看看外面线程池的dispatch方法是怎么定义的吧:

    //将提交的任务调度执行
    public boolean dispatch(Runnable job) 
    {  
        if (!isRunning() || job==null)
            return false;

        PoolThread thread=null;
        boolean spawn=false;
            
        synchronized(_lock) {
            int idle=_idle.size();  //当前空闲线程数组的大小
            if (idle>0) {  //如果有空闲的,那么优先用空闲的线程来执行这个task
                thread=(PoolThread)_idle.remove(idle-1);
            } else {  //如果没有空闲的线程,那么需要将这个任务暂时保存下来,等到有线程空闲下来之后会拿去执行
                _queued++;  //这里增加已经保存的任务的数量
                if (_queued>_maxQueued) {
                    _maxQueued=_queued;
                }
                _jobs[_nextJobSlot++]=job;
                if (_nextJobSlot==_jobs.length) {
                    _nextJobSlot=0;   //这还是一个循环数组
                }
                if (_nextJobSlot==_nextJob) {  //这里表示数组里面已经存满了要执行的任务
                    Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];   //那么需要扩充数组了
                    int split=_jobs.length-_nextJob;
                  //复制,这里居然还考虑了保持原来任务的执行顺序。。呵呵。。是我的话就不管了,直接复制好了
                    if (split>0)
                        System.arraycopy(_jobs,_nextJob,jobs,0,split);  
                    if (_nextJob!=0)
                        System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
                    
                    _jobs=jobs;
                    _nextJob=0;
                    _nextJobSlot=_queued;
                }
                  
                spawn=_queued>_spawnOrShrinkAt;   //存放的线程时候已经太多了
            }
        }
        
        if (thread!=null) {
            thread.dispatch(job);  //如果有空闲的线程,那么由这个线程来哦执行
        }  else if (spawn) {  //如果延迟太厉害,那么需要再创建新的线程
            newThread();
        }
        return true;
    }

这个其实也很简单吧,无非是先看空闲的线程数组里是否有空闲的线程,如果有的话,那么直接唤醒它,由它来执行,如果没有的话就将其放到jobs队列当中去。。。这里还会判断当前job是否太多了。。有可能会创建新的执行线程。。。


到这里总体上jetty的线程池的实现就算差不多了。。很简单。。不过也还算不错吧。。。

与netty的线程池实现略有不同,这里所有的执行线程共享一个任务队列,而在netty中每一个执行线程自己都有自己的job队列。。从而在jetty中线程间的负载均衡看起来就简单的多了。。。。而netty中就比较二,只能轮询的将task交给执行线程。。。不过netty之所以这么实现也是有原因的,因为要实现线程封闭性。。。

另外jetty的线程池中线程的数量会根据服务器的繁忙程度更改,但是netty不会。。。。。

抱歉!评论已关闭.