现在的位置: 首页 > 综合 > 正文

线程池-生产消费者

2018年12月10日 ⁄ 综合 ⁄ 共 3665字 ⁄ 字号 评论关闭

求集合内两两字符串的编辑距离,先使用暴力方法,之后再介绍用kd树的方法

主线程进行任务分发,将字符串依次放入线程池队列中,线程池内的线程就计算这个字符串和所有字符串的编辑距离,然后将计算结果写入blockqueue中,再起一个线程将blockqueue中的结果写入磁盘中.这里有一个技巧,当处理完所有字符串,写进程可能在将队列中的结果写入到磁盘也可能阻塞在take函数处。主线程就在队列末尾放入一个结束标记,读线程接收到这个标记之后就抛出异常结束了。

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class EditDistance{

	ExecutorService service ;
	List<String> ls;
	LinkedBlockingQueue<String> outque;
	class WordDist  implements Comparable{
		public String word;
		public int dist;
		public WordDist(String wd, int d) {
			word = wd;
			dist = d;
		}
		
		
		public int compareTo(Object o) {
			return dist - ((WordDist)o).dist ;
		}
		
		public String toString () {
			return word + " " + dist;
		}
	}
	
	class Process implements Runnable {
		String name;
		
		public Process(String s) {
			name = s;
		}
		
		public void run() {
			PriorityQueue<WordDist> que = new PriorityQueue<WordDist>();
			for (int j = 0; j < ls.size(); ++j) {
				if (!name.equals(ls.get(j))) {
					que.add(new WordDist(ls.get(j), calcEditDist(name, ls.get(j))));
				}
			}
			
			StringBuilder sb = new StringBuilder();
			sb.append(name+"\t");
			int num = 20;
			int count = 0;
			while (que.peek() != null && count < num) {
				sb.append(que.poll() + " ");
				++count;
			}
			try {
			outque.put(sb.toString());
			} catch (InterruptedException e) {
				System.out.println(Thread.currentThread().getName() + " " + name);
				e.printStackTrace();
			}
		}
	}
	
	class Writer implements Runnable {
		PrintWriter pw;
		public Writer(String fn) {
			try {
				pw = new PrintWriter(fn);
			} catch (IOException e) {
				e.printStackTrace();
				return;
			}
		}
		
		@Override
		public void run() {
			try {
				while (true) {
					String rst = outque.take();
					if (rst.equals("!!!POISON")) {
						throw new InterruptedException("meet a poison. over");
					}
					pw.println(rst);
				}
			} catch (InterruptedException e) {
				pw.close();
				System.out.println(e);
				System.out.println("Writer is over");
			}
		}
		
	}
	
	public int calcEditDist(String s1, String s2) {
		
		int [][]mat = new int[s1.length()+1][s2.length()+1];//
		mat[0][0] = 0;
		for (int i = 1; i <= s1.length(); ++i) mat[i][0] = i;
		for (int i = 1; i <= s2.length(); ++i) mat[0][i] = i;
		
		for (int i = 1; i <= s1.length(); ++i) {
			for (int j = 1; j <= s2.length(); ++j) {
				int repCost = 1;
				if (s1.charAt(i-1) == s2.charAt(j - 1)) {
					repCost = 0;
				}
				mat[i][j] = min(mat[i-1][j-1]+repCost, mat[i-1][j] + 1, mat[i][j-1] + 1);
			}
		}
		return mat[s1.length()][s2.length()];
	}
	
	private int min(int a, int b, int c) {
		int m = a;
		if (m > b) m = b;
		if (m > c) m = c;
		return m;
	}
	
	public void genCluster(String in, String out) {
		try {
			
			BufferedReader br = new BufferedReader (new InputStreamReader(new FileInputStream (in) ,"utf-8"));
			//PrintWriter pw = new PrintWriter(out, "utf-8");
			
			service = Executors.newFixedThreadPool(7);
			
			String line;
			ls = new ArrayList<String>();
			outque = new LinkedBlockingQueue<String>();//
			while ((line = br.readLine()) != null) {
				Scanner scan = new Scanner(line);
				scan.useDelimiter(" \\|\\|\\| ");
				ls.add(scan.next());
			}
			br.close();
			
			Thread writer = new Thread(new Writer(out));
			writer.start();
			
			for (int i = 0; i < ls.size(); ++i) {
				service.submit(new Process(ls.get(i)));
			}
			
			service.shutdown();//提交完任务之后就调用shutdown,不能再提交新任务
			try {
				service.awaitTermination(24, TimeUnit.HOURS);//再接着阻塞等待所有任务的结束
			} catch (InterruptedException e) {
				e.printStackTrace();
				System.out.println("main await error");
			}
			try {
				outque.put("!!!POISON");
			} catch (InterruptedException e) {
				e.printStackTrace();
				System.out.println("put poison error");
			}
	
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	
	
	
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		if (args.length != 2) {
			System.out.println("in out");
			return;
		}
		
		EditDistance dist = new EditDistance();
		dist.genCluster(args[0], args[1]);
	}

}

service.awaitTermination(24, TimeUnit.HOURS);

这个函数出问题了,java多线程和c有个不同之处在于,只要有一个线程再跑着,只要不是后台线程,即使main线程执行结束了,就是用jstack看不到main线程了,其他线程也会继续执行

恩,上面的程序跑了不止24小时,main线程执行结束了,也用poison把写线程结束了,只剩下生产者线程了.....

抱歉!评论已关闭.