现在的位置: 首页 > 综合 > 正文

Executor框架

2018年09月29日 ⁄ 综合 ⁄ 共 5427字 ⁄ 字号 评论关闭

Executor接口

public interface Executor {
    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the <tt>Executor</tt> implementation.
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution.
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

Executor接口提供一种解耦任务提交和任务如何运行的机制包括线程的使用、调度等。该接口没有要求任务必须是异步方式,可以在调用者的当前线程中执行。

class DirectExecutor implements Executor {
   public void execute(Runnable r) {
     r.run();
   }
}

更典型用法是,task在另外的线程中运行,而非当前线程启动。

class ThreadPerTaskExecutor implements Executor {
   public void execute(Runnable r) {
     new Thread(r).start();
   }
 }

其他很多Executor的实现都是实现某种复杂的排序和限制来决定任务如何以及什么时候运行,如Executors中的线程池。

ExecutorService接口

ExecutorService扩展了Executor接口,添加一些任务生命周期管理的方法。实际上Eecutors创建的线程池返回值就是该接口类型,因为其提供了任务的生命周期管理方法,因此可以当做线程池。

public interface ExecutorService extends Executor {
	/**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     */
    void shutdown();

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks that were
     * awaiting execution.
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  For example, typical
     * implementations will cancel via {@link Thread#interrupt}, so any
     * task that fails to respond to interrupts may never terminate.
     */
    List<Runnable> shutdownNow();

    boolean isShutdown();

	/**
     * Returns <tt>true</tt> if all tasks have completed following shut down.
     * Note that <tt>isTerminated</tt> is never <tt>true</tt> unless
     * either <tt>shutdown</tt> or <tt>shutdownNow</tt> was called first.
     * @return <tt>true</tt> if all tasks have completed following shut down
     */
    boolean isTerminated();

     /**
     * Blocks until all tasks have completed execution after a shutdown
     * request, or the timeout occurs, or the current thread is
     * interrupted, whichever happens first.
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * Submits a value-returning task for execution and returns a
     * Future representing the pending results of the task. The
     * Future's <tt>get</tt> method will return the task's result upon
     * successful completion.
     * <p>
     * If you would like to immediately block waiting
     * for a task, you can use constructions of the form
     * <tt>result = exec.submit(aCallable).get();</tt>
     */
    <T> Future<T> submit(Callable<T> task);    

     ...
}

ExecutorService有三种生命状态:运行、关闭、已终止。在初始创建的时候处于运行状态。shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已提交任务执行完成包括还未开始的任务。shutdownNow方法执行更粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未执行的任务。等所有任务都执行完成,ExecutorService进入已终止状态,可以调用awaitTerminate方法等待到达终止状态,或者轮询调用isTerminate方法来判断。

增加submit方法,可以接收返回值的任务Callable的参数,并返回一个Future对象(后续会提到)。

ScheduledExecutorService扩展了ExecutorService,可以用于调度任务如推迟一段时间以后执行或间隔一段时间执行。

schedule方法创建各种延迟的任务并返回一个任务对象可以用于取消或检测执行。

scheduleAtFixedRate和scheduleWithFixedDelay方法间断性执行任务直到被取消。

所有调度相关方法接收的延迟或间断参数都是相对时间而非绝对时间。

Executors类

Executors主要用来创建线程池,代理了线程池的创建,使得你的创建入口参数变得简单,其实线程池内部都是统一的方法来实现,通过构造方法重载,使得实现不同的功能,但是往往这种方式很多时候不知道具体入口参数的改变有什么意思。其提供一系列静态方法,用于快捷创建线程池。

newFixedThreadPool(int)  返回特定大小的线程池,返回类型为ExecutorService

newSingleThreadExecutor() 创建一个指定大小的线程池,如果超过大小,放入blocken队列中,默认是LinkedBlockingQueue,自己指定ThreadFactory,自己写的ThreadFactory

newCachedThreadPool() 创建可以进行缓存的线程池,默认缓存60s,数据会放在一个SynchronousQueue上,而不会进入blocken队列中,也就是只要有线程进来就直接进入调度
newSingleThreadScheduledExecutor() 添加一个Schedule的调度器的线程池,默认只有一个调度

Callable接口

Callable接口和Runnalbe类似都是设计用于其类的实例可以被另外一个线程执行。Callable拥有返回值并抛出异常,而Runnalbe没有返回值并且不能抛出声明式异常。

package java.util.concurrent;
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

package java.lang;
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used 
     * to create a thread, starting the thread causes the object's 
     * <code>run</code> method to be called in that separately executing 
     * thread. 
     * <p>
     * The general contract of the method <code>run</code> is that it may 
     * take any action whatsoever.
     */
    public abstract void run();
}

Future和FutureTask

Future代表一个异步计算的结果。提供以下方法:检查计算是否完成、等待计算完成、获取计算结果。计算结果只能通过get方法获得当运算完成以后,如果还没有完成将会阻塞。可以通过cancel方法来取消计算。一旦计算完成就不能再取消。

public interface Future<V> {
	/**
     * Attempts to cancel execution of this task.  This attempt will
     * fail if the task has already completed, has already been cancelled,
     * or could not be cancelled for some other reason. If successful,
     * and this task has not started when <tt>cancel</tt> is called,
     * this task should never run.  If the task has already started,
     * then the <tt>mayInterruptIfRunning</tt> parameter determines
     * whether the thread executing this task should be interrupted in
     * an attempt to stop the task.
     *
     * <p>After this method returns, subsequent calls to {@link #isDone} will
     * always return <tt>true</tt>.  Subsequent calls to {@link #isCancelled}
     * will always return <tt>true</tt> if this method returned <tt>true</tt>.
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * @return <tt>true</tt> if this task was cancelled before it completed
     */
    boolean isCancelled();

    /**
     * @return <tt>true</tt> if this task completed
     */
    boolean isDone();

    /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask

实现接口:Runnable, Future<V>, RunnableFuture<V>

FutureTask是Future的一个基本实现,可以开启、取消计算、查询计算是否完成和获取计算结果。一旦计算完成调用get方法可以立即取得结果,否则会阻塞。一旦计算完成,计算不能重启或取消(除非调用runAndRest方法)。

FutureTask可以用于包裹Callable或Runnable。由于其实现了Runnable接口,可以提交给Executor用于执行。

Executor相关类结构

抱歉!评论已关闭.