现在的位置: 首页 > 综合 > 正文

Java多线程模式(四)【完】

2014年10月22日 ⁄ 综合 ⁄ 共 9491字 ⁄ 字号 评论关闭

Future Pattern

      在Thread-Per-Message Pattern中,我们研究过“收到每个请求建立一个线程”的做法,但这样的请求是不要求有返回值的。如果当需要返回值,但由于后台处理需要很久,返回值不能马上获取,那么就可以使用 Future Pattern。Future Pattern同样会每个请求建立一个线程处理,同时会马上返回一个对象,但该对象并不是真正的返回值,真正的返回值可能现在还没有准备好,不过客户端可以根据这个返回对象,在之后的时间来获取真正的返回值。

public interface Data {
    public String getContent();
}


public class RealData implements Data {

    private String content;

    public RealData(int count, char c) {
        System.out.println("making RealData(" + count + ", " + c + ") Begin.");
        char[] buffer = new char[count];
        for (int i = 0; i < count; i++) {
            buffer[i] = c;
            slowly();
        }
        this.content = String.valueOf(buffer);
        System.out.println("making RealData(" + count + ", " + c + ") End.");
    }

    @Override
    public String getContent() {
        return this.content;
    }

    private void slowly() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
    }
}

public class FutureData implements Data {

    private RealData realData;

    private boolean  ready = false;

    public synchronized void setRealData(RealData realData) {
        if (ready) {
            return;
        }
        this.realData = realData;
        this.ready = true;
        notifyAll();
    }

    @Override
    public synchronized String getContent() {
        while (!ready) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        return this.realData.getContent();
    }
}


public class Host {

    public Data handle(final int count, final char c) {

        System.out.println("handle ( " + count + ", " + c + ") Begin.");
        final FutureData futureData = new FutureData();
        new Thread() {
            @Override
            public void run() {
                RealData realData = new RealData(count, c);
                futureData.setRealData(realData);
            }
        }.start();

        System.out.println("handle ( " + count + ", " + c + ") End.");
        return futureData;
    }
}


public class Main {

    public static void main(String[] args) {
        System.out.println("main Begin.");
        Host host = new Host();
        Data data1 = host.handle(10, 'a');
        Data data2 = host.handle(20, 'b');
        Data data3 = host.handle(30, 'c');

        System.out.println("main other job Begin.");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println("main other job End.");

        System.out.println("data1 = " + data1.getContent());
        System.out.println("data2 = " + data2.getContent());
        System.out.println("data3 = " + data3.getContent());

        System.out.println("main End.");
    }
}


        在Worker Thread Pattern中,我们讨论过“方法调用”和“方法执行”的分离。而Future Pattern 分离了“准备返回值”和“使用返回值”。我们在Futtern Pattern中,可以看到设计模式Proxy Pattern的实现。

Two-Phase Termination Pattern

      Two-Phase Termination Pattern很简单,但该模式提供了一种结束线程的优雅方法。java.lang.Thread类有一个用来强制结束掉线程的stop()方法。但是stop方法已经不建议使用(deprecated),原因是stop()方法会使实例丧失安全性的保障。使用stop()方法时,线程会抛出java.lang.ThreadDeath异常而马上结束,即使该线程现在正在执行灵界区间(例如synchronized方法的中间),也会马上结束。
public class CountupThread extends Thread {

    private boolean isShutdown = false;

    private int     count      = 0;

    @Override
    public void run() {

        try {
            while (isShutdown) {
                doWork();
            }
        } catch (InterruptedException e) {

        } finally {
            doShutdown();
        }

    }

    public void shutdownReqeust() {
        this.isShutdown = true;
        interrupt();
    }

    private void doShutdown() {
        System.out.println("doShutdown: current count is " + this.count);
    }

    private void doWork() throws InterruptedException {
        System.out.println("curren count is " + ++count);
        Thread.sleep(500);
    }

    public static void main(String[] args) {
        System.out.println("main Begin.");

        CountupThread countupThread = new CountupThread();
        countupThread.start();

        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
        }
        System.out.println("main : shutdown request.");
        countupThread.shutdownReqeust();

        System.out.println("main : join");

        // 等待线程结束
        try {
            countupThread.join();
        } catch (InterruptedException e) {
        }
        System.out.println("main End.");
    }
}

Thread-Specific Storage Pattern

Thread-Specific Storage Pattern就是“线程独有的存储库”、“针对每个线程提供的内存空间”的意义。java.lang.ThreadLocal的实例可以想象成一种集合架构(collection)或许会比较好理解。ThreadLocal的实例只有一个,管理多个对象。
public class Log {

    private static final ThreadLocal<TSLog> tsLogCollection = new ThreadLocal<TSLog>();

    public static void println(String s) {
        getTSLog().printWrite(s);
    }

    public static void close() {
        getTSLog().close();
    }

    private static TSLog getTSLog() {
        TSLog tsLog = tsLogCollection.get();
        // 如果线程时第一次调用,新建立新文件并注册log
        if (tsLog == null) {
            tsLog = new TSLog(Thread.currentThread().getName() + "-log.txt");
            tsLogCollection.set(tsLog);
        }
        return tsLog;
    }
}


import java.io.FileNotFoundException;
import java.io.PrintWriter;

public class TSLog {

    private PrintWriter writer;

    public TSLog(String filename) {
        try {
            this.writer = new PrintWriter(filename);
        } catch (FileNotFoundException e) {
        }
    }

    public void printWrite(String s) {
        writer.println(s);
    }

    public void close() {
        writer.println("===========End of log===========");
        writer.close();
    }
}


public class ClientThread extends Thread {

    public ClientThread(String name) {
        super(name);
    }

    @Override
    public void run() {
        System.out.println(getName() + " Begin.");

        for (int i = 0; i < 10; i++) {
            Log.println("i = " + i);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
        }
        Log.close();
        System.out.println(getName() + " End.");
    }

    public static void main(String[] args) {
        new ClientThread("Alice").start();
        new ClientThread("Bobby").start();
        new ClientThread("Chris").start();
    }
}

Active Object Pattern

Active Object Pattern其实可以看作是多个多线程模式和多个设计模式组合成的一种更高级的模式,里面多个对象各司其职,共同协作。Active Object Pattern里面使用到了Producer-Consumer Pattern、Thread-Per-Message Pattern、Future Pattern和设计模式的Proxy Pattern、Command Pattern等。
Server端代码:
public interface ActiveObject {

    public Result makeString(int count, char fillchar);

    public void displayString(String string);
}


public class Proxy implements ActiveObject {

    private SchedulerThread scheduler;

    private Servant         servant;

    public Proxy(SchedulerThread scheduler, Servant servant) {
        this.scheduler = scheduler;
        this.servant = servant;
    }

    @Override
    public Result makeString(int count, char fillchar) {
        FutureResult future = new FutureResult();
        MakeStringRequest request = new MakeStringRequest(servant, future, count, fillchar);
        this.scheduler.invoke(request);
        return future;
    }

    @Override
    public void displayString(String string) {
        DisplayStringRequest request = new DisplayStringRequest(servant, string);
        this.scheduler.invoke(request);
    }
}

public class Servant implements ActiveObject {

    @Override
    public Result makeString(int count, char fillchar) {
        char[] buffer = new char[count];
        for (int i = 0; i < count; i++) {
            buffer[i] = fillchar;
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
            }
        }

        RealResult result = new RealResult(String.valueOf(buffer));
        return result;
    }

    @Override
    public void displayString(String string) {
        System.out.println("displayString( " + string + " )");
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
        }
    }
}

public interface Result {

    public String getResultValue();
}


public class FutureResult implements Result {

    private Result  result;

    private boolean isReady = false;

    public synchronized void setResult(Result result) {
        if (isReady) {
            return;
        }
        this.result = result;
        this.isReady = true;
        notifyAll();
    }

    @Override
    public synchronized String getResultValue() {
        while (!isReady) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        return result.getResultValue();
    }

}

public class RealResult implements Result {

    private String resultValue;

    public RealResult(String resultValue) {
        this.resultValue = resultValue;
    }

    @Override
    public String getResultValue() {
        return this.resultValue;
    }
}


public abstract class MethodRequest {

    protected final Servant      servant;

    protected final FutureResult future;

    public MethodRequest(Servant servant, FutureResult future) {
        this.servant = servant;
        this.future = future;
    }

    public abstract void execute();

}


public class MakeStringRequest extends MethodRequest {

    private int  count;

    private char fillchar;

    public MakeStringRequest(Servant servant, FutureResult future, int count, char fillchar) {
        super(servant, future);
        this.count = count;
        this.fillchar = fillchar;
    }

    @Override
    public void execute() {
        Result result = this.servant.makeString(count, fillchar);
        future.setResult(result);
    }
}


public class DisplayStringRequest extends MethodRequest {

    private String string;

    public DisplayStringRequest(Servant servant, String string) {
        super(servant, null);
        this.string = string;
    }

    @Override
    public void execute() {
        this.servant.displayString(string);
    }
}

public class SchedulerThread extends Thread {

    private ActivationQueue queue = new ActivationQueue();

    public void invoke(MethodRequest request) {
        this.queue.putRequest(request);
    }

    @Override
    public void run() {
        while (true) {
            this.queue.takeRequest().execute();
        }
    }
}

package activeobject.server;

import java.util.LinkedList;

public class ActivationQueue {

    private final LinkedList<MethodRequest> requestQueue = new LinkedList<MethodRequest>();

    private final int                       queueSize    = 100;

    public synchronized void putRequest(MethodRequest request) {
        while (this.requestQueue.size() >= queueSize) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        this.requestQueue.addLast(request);
        notifyAll();
    }

    public synchronized MethodRequest takeRequest() {
        while (this.requestQueue.size() == 0) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }

        MethodRequest request = this.requestQueue.removeFirst();
        notifyAll();
        return request;
    }
}

public class ActiveObjectFactory {

    public static ActiveObject createActiveObjcet() {
        Servant servant = new Servant();

        SchedulerThread scheduler = new SchedulerThread();

        Proxy proxy = new Proxy(scheduler, servant);

        scheduler.start();

        return proxy;
    }
}

UML如下图:

客户端代码:

import activeobject.server.ActiveObject;

public class DisplayClientThread extends Thread {

    private ActiveObject activeObj;

    public DisplayClientThread(String name, ActiveObject activeObj) {
        super(name);
        this.activeObj = activeObj;
    }

    @Override
    public void run() {
        int i = 0;
        while (true) {
            i++;
            String string = getName() + " No." + i;
            activeObj.displayString(string);
        }
    }
}

import activeobject.server.ActiveObject;
import activeobject.server.Result;

public class MakerClientThread extends Thread {

    private final ActiveObject activeObj;

    private final char         fillchar;

    public MakerClientThread(String name, ActiveObject activeObj) {
        super(name);
        this.activeObj = activeObj;
        this.fillchar = name.charAt(0);
    }

    @Override
    public void run() {
        int i = 0;
        while (true) {
            i++;
            Result result = activeObj.makeString(i, fillchar);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }

            String resultValue = result.getResultValue();
            System.out.println(Thread.currentThread().getName() + ":value = " + resultValue);
        }

    }
}

import activeobject.server.ActiveObject;
import activeobject.server.ActiveObjectFactory;

public class Main {

    public static void main(String[] args) {
        ActiveObject activeObj = ActiveObjectFactory.createActiveObjcet();
        new MakerClientThread("Alice", activeObj).start();
        new MakerClientThread("Bobby", activeObj).start();
        new DisplayClientThread("Chris", activeObj).start();
    }
}

抱歉!评论已关闭.