现在的位置: 首页 > 综合 > 正文

线程交互

2012年09月22日 ⁄ 综合 ⁄ 共 6784字 ⁄ 字号 评论关闭

-- Start

我们在
并发
一节中讲了一个并发存钱和取钱的例子, 事实上这个例子有一个问题, 那就是当余额小于等于0时我们仍然可以取钱(也许是信用卡), 下面我们把这个例子修改一下, 增加卡类型, 如果是信用卡, 则当余额小于等于0时我们仍然可以取钱; 如果是储蓄卡, 当余额小于等于我们要取的钱时, 取钱的线程必须等待, 直到存钱的线程唤醒它或超时为止.

public class Test {

	public static void main(String[] args) throws Exception {
		Bank icbc = new Bank(); // 工商银行
		Account account = new Account(icbc, "储蓄卡", 1); // 在工商银行开户, 并存入 1 块钱

		// 路人甲在 ATM 1 给我转帐, 每次转帐 1 块钱, 连续转帐 10 次
		new Thread(new ATM(account, 1, 10), "ATM 1").start();

		// 路人乙在 ATM 2 给我转帐, 每次转帐 2 块钱, 连续转帐 5 次
		new Thread(new ATM(account, 2, 5), "ATM 2").start();

		// 我从 ATM 3 开始取钱, 每次取 3块, 连续取5次
		new Thread(new ATM(account, -3, 5), "ATM 3").start();

		// 路人丙在 ATM 4 给我转帐, 每次转帐 4 块钱, 连续转帐 5 次
		new Thread(new ATM(account, 4, 5), "ATM 4").start();

		// 我老婆从 ATM 5 开始取钱, 每次取5块, 连续取7次
		new Thread(new ATM(account, -5, 5), "ATM 5").start();
	}
}

class ATM implements Runnable {

	private Account account; // 账户
	private int tradeAmount; // 交易额
	private int tradeTimes; // 交易次数

	// 构造方法
	public ATM(Account account, int tradeAmount, int tradeTimes) {
		this.account = account;
		this.tradeAmount = tradeAmount;
		this.tradeTimes = tradeTimes;
	}

	public void run() {
		for (int i = 0; i < tradeTimes; i++) {

			try {
				account.getBank().tradeAmount(account, tradeAmount);
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
			} catch (Exception e) {
				e.printStackTrace();
				return;
			}
		}
	}

}

class Account {
	private Bank bank;
	private String cardType;
	private int amount;

	public Account(Bank bank, String cardType, int amount) {
		this.bank = bank;
		this.amount = amount;
		this.cardType = cardType;
	}

	// Getter and Setter
	public Bank getBank() {
		return bank;
	}

	public String getCardType() {
		return cardType;
	}

	public int getAmount() {
		return amount;
	}

	public void setAmount(int amount) {
		this.amount = amount;
	}
}

class Bank {
	// 同步方法
	public synchronized void tradeAmount(Account account, int amount) throws Exception {
		while ("储蓄卡".equals(account.getCardType()) // 储蓄卡
				&& amount < 0 // 取钱
				&& account.getAmount() < Math.abs(amount)) { // 余额小于要取的钱
			try {
				// 当取钱线程获得锁进入同步方法后, 却发现它必须等到余额大于或等于要取的钱后才能执行 
				// 如果不能满足条件, 它必须等待并放弃持有的锁, 直到某个存钱线程存入一笔钱后通知它
				// 它得到通知后将再次尝试获得锁并检查是否满足条件, 如果此时满足条件, 它将执行取钱操作
				// 如果仍然不能满足条件, 它将继续等待直到满足条件或超时 
				// 由于取钱线程可能需要多次检查条件, 所以此处用的是 while, 而不是 if
				wait(60000); // 等待一分钟
			} catch (InterruptedException e) {
				throw new Exception("您的余额不足", e);
			}
		}

		account.setAmount(amount + account.getAmount());
		System.out.println("您在 " + Thread.currentThread().getName() + " 交易了 " + amount + ", 您账户的账户余额是 " + account.getAmount());

		// 通知所有等待的线程, 得到通知的线程将再次检查条件是否满足
		notifyAll();
	}

}

从 JDK 1.5 引入锁之后, 我们又多个一种实现线程交互的方式, 下面是一个简单的例子.

class Bank {
	final Lock lock;
	final Condition enoughCash;

	public Bank() {
		lock = new ReentrantLock();
		enoughCash = lock.newCondition();
	}

	public void tradeAmount(Account account, int amount) throws Exception {
		lock.lock();

		try {
			while ("储蓄卡".equals(account.getCardType()) // 储蓄卡
					&& amount < 0 // 取钱
					&& account.getAmount() < Math.abs(amount)) { // 余额小于要取的钱
				try {
					// 当取钱线程获得锁进入同步方法后, 却发现它必须等到余额大于或等于要取的钱后才能执行
					// 如果不能满足条件, 它必须等待并放弃持有的锁, 直到某个存钱线程存入一笔钱后通知它
					// 它得到通知后将再次尝试获得锁并检查是否满足条件, 如果此时满足条件, 它将执行取钱操作
					// 如果仍然不能满足条件, 它将继续等待直到满足条件或超时
					// 由于取钱线程可能需要多次检查条件, 所以此处用的是 while, 而不是 if
					enoughCash.await(60, TimeUnit.SECONDS); // 等待一分钟
				} catch (InterruptedException e) {
					throw new Exception("您的余额不足", e);
				}
			}

			account.setAmount(amount + account.getAmount());
			System.out.println("您在 " + Thread.currentThread().getName() + " 交易了 " + amount + ", 您账户的账户余额是 " + account.getAmount());

			// 通知所有等待的线程, 得到通知的线程将再次检查条件是否满足
			enoughCash.signalAll();
		} finally {
			lock.unlock();
		}

	}

}

屏障(CyclicBarrier)

JDK 1.5 新加入了一个称为 CyclicBarrier 的类, 它能帮助我们实现线程之间的交互. 如: 下面的例子演示了4个工人盖一栋二层楼的房子, 每人负责东西南北四面墙中的一面, 由于每个工人的进度不同, 当一个工人盖好第一层的一面墙后(此时他到达障碍点), 他必须等待其他墙盖好后才能盖屋顶.

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Test {

	public static void main(String[] args) throws Exception {
		// 四个工人合作盖一栋房子
		int workerNum = 4;

		// 定义屏障点, 由于每个工人盖房的速度不同, 只有当四个工人分别盖好四面墙时, 才能盖屋顶
		CyclicBarrier cyclic = new CyclicBarrier(workerNum, new Runnable() {
			public void run() {
				System.out.println("盖屋顶");
			}
		});

		// 四个工人开始同时干活
		for (int i = 0; i < workerNum; i++) {
			new Thread(new Worker(cyclic)).start();
		}

	}

}

class Worker implements Runnable {
	private final CyclicBarrier cyclic;

	public Worker(CyclicBarrier cyclic) {
		this.cyclic = cyclic;
	}

	public void run() {

		try {
			building(); // 盖第一层四面墙中的一面
			cyclic.await(); // 盖好一面墙后暂停, 等待其他三面墙盖好后盖屋顶
			building(); // 继续盖第二层
			cyclic.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}

	private void building() {
		System.out.println("盖墙");
	}

}

门栓(CountDownLatch)

JDK 1.5 新加入了一个称为 CountDownLatch 的类, 它能帮助我们实现线程之间的交互. 如: 下面的例子演示了一个程序运行之前需要先初始化数据库, 缓存, 文件等.

import java.util.concurrent.CountDownLatch;

public class Test {

	public static void main(String[] args) throws Exception {

		// 定义一个有三个门栓的门
		final CountDownLatch doneSignal = new CountDownLatch(3);

		// 初始化数据库
		new Thread(new Runnable() {
			public void run() {
				System.out.println("初始化数据库");
				doneSignal.countDown(); // 打开一个门栓
			}
		}).start();

		// 初始化缓存
		new Thread(new Runnable() {
			public void run() {
				System.out.println("初始化缓存");
				doneSignal.countDown(); // 打开一个门栓
			}
		}).start();

		// 初始化文件
		new Thread(new Runnable() {
			public void run() {
				System.out.println("初始化文件");
				doneSignal.countDown(); // 打开一个门栓
			}
		}).start();

		doneSignal.await(); // 等待三个门栓全部打开
		doWork(); // 三个门栓全部打开后就可以运行我们的任务了
	}

	private static void doWork() {
		System.out.println("生成报告.");
	}
}

阶段器(Phaser)

Java 7 新加入了一个称为 Phaser 的类, 其功能跟CyclicBarrier和CountDownLatch有些类似,但提供了更灵活的用法. 例如, 假设我们想从数据库和缓存中取得数据, 然后将数据写入一个文件中, 我们不必等到数据库, 缓存和文件都初始化完成之后再开始我们的程序, 一旦数据库初始化完成, 我们就可以查询数据库, 一旦缓存初始化完成, 我们就可以查询缓存, 一旦文件和数据初始化完成, 我们就可以向文件中写入数据.

-- 待续

交换器(Exchanger)

JDK 1.5 新加入了一个称为 Exchanger 的类, 它能帮助我们实现两个线程之间的数据交换. 如: 下面的例子演示了一个线程从一个文件中读取数据到一个List中, 另一个线程从另个List中读取数据, 两个线程使用了不同的List, 运行一段时间后我们交换这两个List.

import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.Exchanger;

public class Test {

	public static void main(String[] args) throws Exception {
		Exchanger<List<String>> exchanger = new Exchanger<List<String>>();

		new Thread(new WriteTask(exchanger)).start();
		new Thread(new ReadTask(exchanger)).start();
	}

}

class WriteTask implements Runnable {
	private int bufferSize = 3;
	private List<String> writeDataBuffer = new ArrayList<String>(bufferSize);
	private Exchanger<List<String>> exchanger;

	public WriteTask(Exchanger<List<String>> exchanger) {
		this.exchanger = exchanger;
	}

	public void run() {
		try {
			Scanner s = new Scanner(new File("C:\\workspace\\test.txt"));
			while (s.hasNextLine()) {
				if (writeDataBuffer.size() >= bufferSize) {
					writeDataBuffer = exchanger.exchange(writeDataBuffer); // 交换读缓冲区和写缓冲区
				}

				writeDataBuffer.add(s.nextLine()); // 从文件中读取一行数据到写缓冲区
				Thread.sleep(1000);
			}

			writeDataBuffer.add("EOF");
			exchanger.exchange(writeDataBuffer);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		}
	}
}

class ReadTask implements Runnable {
	private Exchanger<List<String>> exchanger;
	private List<String> readDataBuffer = new ArrayList<String>(3);

	public ReadTask(Exchanger<List<String>> exchanger) {
		this.exchanger = exchanger;
	}

	public void run() {
		try {
			String line = "";
			while (!"EOF".equals(line)) {
				if (readDataBuffer.size() == 0) {
					readDataBuffer = exchanger.exchange(readDataBuffer); // 交换读缓冲区和入缓冲区
				}

				line = readDataBuffer.remove(0);
				System.out.println(line); // 从读缓冲区得到数据
				Thread.sleep(1000);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

---更多参见:Java 精萃
-- 声 明:转载请注明出处
-- Last Updated on 2012-07-06
-- Written by ShangBo on 2012-06-22
-- End

抱歉!评论已关闭.