Kilim 是一个用于在Java 协程框架,Kilim协程框架的结构图如下:
1. Task
可以认为Task 就是Actor,使用方式和Java Thread 基本相同,只是继承的为Task,覆盖的为execute 方法,启动也是调用task 的start 方法。
public Task start() { if (scheduler == null) { setScheduler(Scheduler.getDefaultScheduler()); } resume(); return this; }
2. Scheduler
Scheduler是Kilim框架中核心的任务调度器,负责管理任务的工作者线程WorkerThread,以及一个基本的FIFO队列,维护着Task任务列表,Scheduler负责分派Task给指定的工作者线程WorkerThread执行,类似于ExecutorService线程池管理Runnable和Callable任务的执行。
public LinkedList<WorkerThread> allThreads = new LinkedList<WorkerThread>(); public RingQueue<WorkerThread> waitingThreads = new RingQueue<WorkerThread>(10); protected volatile boolean shutdown = false; public RingQueue<Task> runnableTasks = new RingQueue<Task>(100);
工作者线程WorkerThread的默认初始化个数为Runtime.getRuntime().availableProcessors(),且所有空闲的线程会添加到RingQueue队列中
3. WorkerThread
WorkerThread是执行任务Task的具体线程,内部维护一个默认大小为10的环形队列RingQueue,与ThreadPoolExecutor线程池中定义的Worker类似,循环阻塞式的从任务队列中获取下一个任务执行。
RingQueue<Task> tasks = new RingQueue<Task>(10);
public void run() { try { while (true) { Task t = getNextTask(this); // blocks until task available runningTask = t; t._runExecute(this); runningTask = null; } } catch (ShutdownException se) { // nothing to do. } catch (OutOfMemoryError ex) { System.err.println("Out of memory"); } catch (Throwable ex) { ex.printStackTrace(); } runningTask = null; }
4. RingQueue
RingQueue本质上即一个环形队列,作为Queue用于不同的线程之间传递message的设计,和ZeroMQ利用ringbuffer来作为pipe在不同线程之间传递message的用法都是很类似的。
public class RingQueue<T> { protected T[] elements; protected int iprod; // producer index protected int icons; // consumer index; protected int maxSize; protected int size; }
5. Mailbox
Kilim 中通过Mailbox 对象来发送消息,Mailbox 的基本原则为可以有多个消息发送者,但只能有一个消息接收者,发送的方式有同步发送、异步发送和阻塞线程方式的同步发送三种,同步发送是指保证一定能将消息放入发送队列中,如当前发送队列已满,则等待到可用为止,阻塞的为当前Task;异步发送则是尝试将消息放入发送队列一次,如果发送失败,则返回false,成功则返回true,不会阻塞Task;阻塞线程方式的同步发送是指阻塞当前线程,并保证将消息发送给接收者。
public class Mailbox<T> implements PauseReason, EventPublisher { T[] msgs; private int iprod = 0; // producer index private int icons = 0; // consumer index; private int numMsgs = 0; private int maxMsgs = 300; EventSubscriber sink; }
Kilim 中通过Mailbox 来接收消息,接收消息的方式有同步接收、异步接收以及阻塞线程方式的同步接收三种,同步接收是指阻塞当前Task,直到接收到消息才返回;异步接收是指立刻返回Mailbox 中的消息,有就返回,没有则返回null;阻塞线程方式的同步接收是指阻塞当前线程,直到接收到消息才返回。
public T get() throws Pausable{ Task t = Task.getCurrentTask(); T msg = get(t); while (msg == null) { Task.pause(this); removeMsgAvailableListener(t); msg = get(t); } return msg; }
6. EventPublisher和EventSubscriber
public interface EventPublisher { }
public interface EventSubscriber { void onEvent(EventPublisher ep, Event e); }