现在的位置: 首页 > 综合 > 正文

Java语言多线程程序模型研究

2014年02月01日 ⁄ 综合 ⁄ 共 12337字 ⁄ 字号 评论关闭

多线程是较复杂程序设计过程中不可缺少的一部分。为了提高应用程序运行的性能,采用多线程的设计是一种比较可行的方案。本文通过介绍使用Java编写的扫描计算机端口的实例,来说明多线程设计中应注意的问题,以及得出经常使用的多线程模型。

    本文要求读者具备一定的Java语言基础,对Socket有一定的了解。本文的所有程序在Java SDK 1.4.2编译通过并能正常运行。

    现在,我们需要对一台主机扫描其端口,找出哪些端口是open的状态。我们先采用单线程进行处理,程序代码如下:

  1. import java.io.IOException;
  2. import java.net.Socket;
  3. import java.net.UnknownHostException;
  4. public class PortScannerSingleThread {
  5.     public static void main(String args) {
  6.         String host = null;        //第一个参数,目标主机。
  7.         int beginport = 1;         //第二个参数,开始端口。
  8.         int endport = 65535;       //第三个参数,结束端口。
  9.         try{
  10.             host = args;
  11.             beginport = Integer.parseInt(args);
  12.             endport = Integer.parseInt(args);
  13.             if(beginport = 0 || endport= 65536 || beginport> endport){
  14.                 throw new Exception("Port is illegal");
  15.             }
  16.         }catch(Exception e){
  17.             System.out.println("Usage: java PortScannerSingleThread host beginport endport");
  18.             System.exit(0);
  19.         }
  20.         
  21.         for (int i = beginport; i <= endport; i++) {
  22.             try {
  23.                 Socket s = new Socket(host, i);
  24.                 System.out.println("The port " + i + " is opened at " + host);
  25.             }catch (UnknownHostException ex) {
  26.                 System.err.println(ex);
  27.                 break;
  28.             }catch (IOException ex) {
  29.             }
  30.         }
  31.     }
  32. }

 在以上程序中,通过java.net.Socket类来识别端口是否是open状态。程序接受3个参数,第一个参数是主机IP,第二和第三个参数是需要扫描的起始和中止的端口号(1~65535)。本程序(java PortScannerSingleThread 10.1.1.1 1 1000)运行结果如下:

The port 25 is opened at 10.1.1.182

The port 110 is opened at 10.1.1.182

The port 135 is opened at 10.1.1.182

...

    但是,以上程序运行效率实在不敢恭维,把目标主机端口扫描一遍需要十几分钟甚至更长,估计没有哪个用户可以忍受这样的效率。

    所以,提高程序处理效率是必须的,下面的程序通过多线程的方法来进行处理。程序代码如下:

  1. import java.io.IOException;
  2. import java.net.Socket;
  3. import java.net.UnknownHostException;
  4. public class PortScannerMultiThread {
  5.     public static void main(String args) {
  6.         String host = null;
  7.         int beginport = 1;
  8.         int endport = 65535;
  9.         try{
  10.             host = args;
  11.             beginport = Integer.parseInt(args);
  12.             endport = Integer.parseInt(args);
  13.             if(beginport <= 0 || endport >= 65536 || beginport > endport){
  14.                 throw new Exception("Port is illegal");
  15.             }
  16.         }catch(Exception e){
  17.             System.out.println("Usage: java PortScannerSingleThread host beginport endport");
  18.             System.exit(0);
  19.         }
  20.         
  21.         for (int i = beginport; i <= endport; i++) {
  22.             PortProcessor pp = new PortProcessor(host,i);      //一个端口创建一个线程
  23.             pp.start();
  24.         }
  25.     }
  26. }
  27. class PortProcessor extends Thread{
  28.     String host;
  29.     int port;
  30.     
  31.     PortProcessor(String host, int port){
  32.         this.host = host;
  33.         this.port = port;
  34.     }
  35.     
  36.     public void run(){
  37.         try{
  38.             Socket s = new Socket(host,port);
  39.             System.out.println("The port " + port + " is opened at " + host);
  40.         }catch(UnknownHostException ex){
  41.             System.err.println(ex);
  42.         }catch(IOException ioe){
  43.         }
  44.     }
  45. }

 以上程序在for循环结构中创建PortProcessor对象,PortProcessor类是线程类,其关键的Socket在public void run()方法中实现。此程序比第一个单线程的程序运行效率提高很多倍,几乎在几秒钟内得出结果。所以可见多线程处理是何等的重要。

程序(java PortScannerMultiThread 10.1.1.100 1 1000)运行结果如下:

The port 25 is opened at 10.1.1.100

The port 42 is opened at 10.1.1.100

The port 88 is opened at 10.1.1.100

...

    仔细对第2个程序分析,不难发现其中的问题:创建的线程个数是不固定的,取决于输入的第二和第三个参数。如果扫描1~100端口,那么主线程就产生100个线程来分别处理;如果扫描1~10000端口,主线程就会产生10000个线程来进行处理。在JVM中创建如此多的线程同样会带来性能上的问题,因为线程的创建和消失都是需要花费系统资源的。所以以上的第二个程序也存在明显的不足。

    所以,我们需要一个确定数量的线程在JVM中运行,这样就需要了解“线程池”(ThreadPool)的概念。线程池在多线程程序设计中是比不可少的,而且初学者不太容易掌握,下面通过对线程池的介绍,结合第3和第4个程序,引出两种常用的线程池模型。

    第一种实现线程池的方法是:创建一个”池“,在”池“中增加要处理的数据对象,然后创建一定数量的线程,这些线程对”池“中的对象进行处理。当”池“是空的时候,每个线程处于等待状态;当往”池“里添加一个对象,通知所有等待的线程来处理(当然一个对象只能有一个线程来处理)。

    第二种方法是:同样创建一个”池“,但是在”池“中放的不是数据对象,而是线程,可以把”池“中的一个个线程比喻成一个个”工人“,当没有任务的时候,”工人“们严阵以待;当给”池“添加一个任务后,”工人“就开始处理并直到处理完成。

    在第3个程序中,定义了List类型的entries作为“池”,这个“池”用来保存需要扫描的端口,List中的元素必须是Object类型,不能用基本数据类型int往池里添加,而需要用使用Integer。在processMethod()方法中,首先就启动一定数量的PortThread线程,同时在while循环中通过entries.add(0, new Integer(port))往“池”里添加对象。在PortThread类的run()方法中通过entry = (Integer)entries.remove(entries.size()-1);取得“池”中的对象,转换成int后传递给Socket构造方法。

    第3个程序如下:

  1. import java.io.IOException;
  2. import java.net.InetAddress;
  3. import java.net.Socket;
  4. import java.net.UnknownHostException;
  5. import java.util.Collections;
  6. import java.util.LinkedList;
  7. import java.util.List;
  8. public class PortScanner {
  9.     private List entries = Collections.synchronizedList(new LinkedList());  //这个”池“比较特别
  10.     int numofthreads;
  11.     static int port;
  12.     int beginport;
  13.     int endport;
  14.     InetAddress remote = null;
  15.     
  16.     public boolean isFinished(){
  17.         if(port >= endport){
  18.             return true;
  19.         }else{
  20.             return false;
  21.         }
  22.     }
  23.     
  24.     PortScanner(InetAddress addr, int beginport, int endport, int numofthreads){
  25.         this.remote = addr;
  26.         this.beginport = beginport;
  27.         this.endport = endport;
  28.         this.numofthreads = numofthreads;    
  29.     }
  30.     
  31.     public void processMethod(){
  32.         for(int i = 0; i < numofthreads; i++){          //创建一定数量的线程并运行
  33.             Thread t = new PortThread(remote, entries, this);
  34.             t.start();
  35.         }
  36.         
  37.         port = beginport;
  38.         
  39.         while(true){
  40.             if(entries.size() > numofthreads){
  41.                 try{
  42.                     Thread.sleep(1000);      //”池“中的内容太多的话就sleep
  43.                 }catch(InterruptedException ex){
  44.                     
  45.                 }
  46.                 continue;
  47.             }
  48.             
  49.             synchronized(entries){
  50.                 if(port > endport) break;
  51.                 entries.add(0, new Integer(port));  //往”池“里添加对象,需要使用int对应的Integer类
  52.                 entries.notifyAll();
  53.                 port++;
  54.             }
  55.         }
  56.     }
  57.     
  58.     public static void main(String args) {
  59.         String host = null;
  60.         int beginport = 1;
  61.         int endport = 65535;
  62.         int nThreads = 100;
  63.         try{
  64.             host = args;
  65.             beginport = Integer.parseInt(args);
  66.             endport = Integer.parseInt(args);
  67.             nThreads = Integer.parseInt(args);
  68.             if(beginport <= 0 || endport >= 65536 || beginport > endport){
  69.                 throw new Exception("Port is illegal");
  70.             }
  71.         }catch(Exception e){
  72.             System.out.println("Usage: java PortScannerSingleThread host beginport endport nThreads");
  73.             System.exit(0);
  74.         }
  75.         
  76.         try{
  77.             PortScanner scanner = new PortScanner(InetAddress.getByName(host), beginport, endport, nThreads);
  78.             scanner.processMethod();
  79.         }catch(UnknownHostException ex){
  80.         }    
  81.     }
  82. }
  83. class PortThread extends Thread{
  84.     private InetAddress remote;
  85.     private List entries;
  86.     PortScanner scanner;
  87.     
  88.     PortThread(InetAddress add, List entries, PortScanner scanner){
  89.         this.remote = add;
  90.         this.entries = entries;
  91.         this.scanner = scanner;
  92.     }
  93.             
  94.     public void run(){
  95.         Integer entry;
  96.         while(true){
  97.             synchronized(entries){
  98.                 while(entries.size() == 0){
  99.                     if(scanner.isFinished()) return;
  100.                     try{
  101.                         entries.wait();           //”池“里没内容就只能等了
  102.                     }catch(InterruptedException ex){
  103.                     }
  104.                 }
  105.                 entry = (Integer)entries.remove(entries.size()-1);  //把”池“里的东西拿出来进行处理
  106.             }
  107.             
  108.             Socket s = null;
  109.             
  110.             try{
  111.                 s = new Socket(remote, entry.intValue());
  112.                 System.out.println("The port of " + entry.toString() + " of the remote " + remote +" is opened.");
  113.             
  114.             }catch(IOException e){
  115.             }finally{
  116.                 try{
  117.                     if(s != null) s.close();
  118.                 }catch(IOException e){
  119.                     
  120.                 }
  121.             }
  122.         }
  123.     }
  124. }

  以上程序需要4个参数,输入java PortScanner 10.1.1.182 1 10000 100运行(第4个参数是线程数),结果前两个程序一样,但是速度比第一个要快,可能比第二个要慢一些。

    第3个程序是把端口作为“池”中的对象,下面我们看第4个实现方式,把“池”里面的对象定义成是线程类,把具体的任务定义成”池“中线程类的参数。第4个程序有2个文件组成,分别是ThreadPool.java和PortScannerByThreadPool.java.

    ThreadPool.java文件内容如下:

  1. import java.util.LinkedList;
  2. public class ThreadPool{
  3.     private final int nThreads;
  4.     private final PoolWorker threads;
  5.     private final LinkedList queue;
  6.     public ThreadPool(int nThreads){
  7.         this.nThreads = nThreads;
  8.         queue = new LinkedList();
  9.         threads = new PoolWorker[nThreads];
  10.         for (int i=0; i<nThreads; i++) {
  11.             threads[i] = new PoolWorker();
  12.             threads[i].start();
  13.         }
  14.     }
  15.     public void execute(Runnable r) {
  16.         synchronized(queue) {
  17.             queue.addLast(r);
  18.             queue.notifyAll();
  19.         }
  20.     }
  21.     private class PoolWorker extends Thread {
  22.         public void run() {
  23.             Runnable r;
  24.             while (true) {
  25.                 synchronized(queue) {
  26.                     while (queue.isEmpty()) {
  27.                         try{
  28.                             queue.wait();
  29.                         }catch (InterruptedException ignored){
  30.                         }
  31.                     }
  32.                     r = (Runnable) queue.removeFirst();
  33.                 }
  34.                 try {
  35.                     r.run();
  36.                 }
  37.                 catch (RuntimeException e) {
  38.                 }
  39.             }
  40.         }
  41.     }
  42. }

 在ThreadPool.java文件中定义了2个类:ThreadPool和PoolWorker。ThreadPool类中的nThreads变量表示线程数,PoolWorker数组类型的threads变量表示线程池中的“工人”,这些“工人”的工作就是一直循环处理通过queue.addLast(r)加入到“池”中的任务。

    PortScannerByThreadPool.java文件内容如下:

  1. import java.io.IOException;
  2. import java.net.InetAddress;
  3. import java.net.Socket;
  4. public class PortScannerByThreadPool {
  5.     public static void main(String args) {
  6.         String host = null;
  7.         int beginport = 1;
  8.         int endport = 65535;
  9.         int nThreads = 100;
  10.         try{
  11.             host = args;
  12.             beginport = Integer.parseInt(args);
  13.             endport = Integer.parseInt(args);
  14.             nThreads = Integer.parseInt(args);
  15.             if(beginport <= 0 || endport >= 65536 || beginport > endport){
  16.                 throw new Exception("Port is illegal");
  17.             }
  18.         }catch(Exception e){
  19.             System.out.println("Usage: java PortScannerSingleThread host beginport endport nThreads");
  20.             System.exit(0);
  21.         }
  22.         
  23.         ThreadPool tp = new ThreadPool(nThreads);
  24.         
  25.         for(int i = beginport; i <= endport; i++){
  26.             Scanner ps = new Scanner(host,i);
  27.             tp.execute(ps);
  28.         }
  29.     }
  30. }      
  31.     
  32. class Scanner implements Runnable{
  33.     String host;    
  34.     int port;
  35.         
  36.     Scanner(String host, int port){
  37.         this.host = host;
  38.         this.port = port;
  39.     }
  40.         
  41.     public void run(){
  42.         Socket s = null;
  43.         try{
  44.             s = new Socket(InetAddress.getByName(host),port);
  45.             System.out.println("The port of " + port + " is opened.");
  46.         }catch(IOException ex){
  47.         }finally{
  48.             try{
  49.                 if(s != null) s.close();
  50.             }catch(IOException e){
  51.             }
  52.         }
  53.     }
  54. }

 PortScannerByThreadPool是主程序类,处理输入的4个参数(和第3个程序是一样的):主机名、开始端口、结束端口和线程数。Scanner类定义了真正的”任务“。在PortScannerByThreadPool中通过new ThreadPool(nThreads)创建ThreadPool对象,然后在for循环中通过new Scanner(host,i)创建”任务“对象,再通过tp.execute(ps)把”任务“对象添加到”池“中。

    读者可以编译运行第4个程序,得出的结果和前面的是一样的。但是第4和第3个程序之间最大的差别就是:第4个程序会一直运行下去,不会自动结束。在第3个程序中存在一个isFinished()方法,可以用来判断任务是否处理完毕,而第4个程序中没有这样做。请读者自己思考这个问题。

    在第3和第4个程序中,我们可以概括出多线程的模型。第3个程序的线程”池“里装的要处理的对象,第4个程序的线程”池“里装的是”工人“,还需要通过定义”任务“并给把它”派工“给”工人“。我个人比较偏好后者的线程池模型,虽然类的个数多了几个,但逻辑很清晰。不管怎样,第3和第4个程序中关键的部分都大同小异,就是2个synchronized程序块中的内容,如下(第4个程序中的):

  1. synchronized(queue) {
  2.     queue.addLast(r);
  3.     queue.notifyAll();
  4. }
  5. synchronized(queue) {
  6.     while (queue.isEmpty()) {
  7.         try{
  8.             queue.wait();
  9.         }catch (InterruptedException ignored){
  10.         }
  11.     }
  12.     r = (Runnable) queue.removeFirst();
  13. }

一般拿synchronized用来定义方法或程序块,这样可以在多线程同时访问的情况下,保证在一个时刻只能有一个线程对这部分内容进行访问,避免了数据出错。在第3个程序中通过List entries = Collections.synchronizedList(new LinkedList())来定义”池“,在第4个程序中直接用LinkedList queue,都差不多,只是Collections.synchronizedList()可以保证”池“的同步,其实”池“里的内容访问都是在synchronized定义的程序块中,所以不用Collections.synchronizedList()也是可以的。

    wait()和notifyAll()是很重要的,而且这2个方法是Object基类的方法,所以任何一个类都是可以使用的。这里说明一个可能产生混淆的问题:queue.wait()并不是说queue对象需要进行等待,而是说queue.wait()所在的线程需要进行等待,并且释放对queue的锁,把对queue的访问权交给别的线程。如果读者对这2个方法难以理解,建议参考JDK的文档说明。

    好了,通过以上4个例子的理解,读者应该能对多线程的程序设计有了一定的理解。第3和第4个程序对应线程模型是非常重要的,可以说是多线程程序设计过程中不可或缺的内容。

 

 

 

 

抱歉!评论已关闭.