现在的位置: 首页 > 综合 > 正文

The Rox Java NIO Tutorial

2013年08月20日 ⁄ 综合 ⁄ 共 17137字 ⁄ 字号 评论关闭
原文地址: http://rox-xmlrpc.sourceforge.net/niotut/index.html#Credits

The Rox Java NIO Tutorial

Contents

  1. Introduction
  2. Credits
  3. General principles
  4. The server
  5. The client
  6. NIO and SSL on 1.4
  7. The code
  8. About the author

Introduction

This tutorial is intended to collect together my own experiences using the Java NIO libraries and the dozens of hints, tips, suggestions and caveats that litter the Internet. When I wrote Roxall
of the useful information existed as just that: hints, tips, suggestions and caveats on a handful of forums. This tutorial actually only covers using NIO for asynchronous networking (non-blocking sockets), and not the NIO libraries in all their glory. When
I use the term NIO in this tutorial I'm taking liberties and only talking about the non-blocking IO part of the API.

If you've spent any time looking at the JavaDoc documentation for the NIO libraries then you know they're not the most transparent docs floating around. And if you've spent any time trying to write code
based on the NIO libraries and you use more than one platform you've probably run into something that works on one platform but locks up on another. This is particularly true if you started on Windows and moved over to Linux (using Sun's implementation).

This tutorial takes you from nothing to a working client-server implementation, and will hopefully help you avoid all of the pitfalls waiting to trap the unwary. I've turned it around and start with a server
implementation, since that's the most common use-case for the NIO libraries.

Comments, criticisms, suggestions and, most important, corrections are welcome. The examples here have been developed and tested on Sun's NIO implementation using version 1.4.2 and 1.5.0 of Sun's Hotspot
JVM on Windows and Linux. Your mileage may vary but if you do run into funnies please let me know and I'll incorporate them into this page.

Drop me a line at nio@flat502.com.

Credits

Credit where credit is due. This tutorial pulls together a lot of ideas, none of which I can claim original ownership. Sources vary widely and I haven't managed to keep track of all of them, but an incomplete
list of some of the larger contributors would include:

Source source code in this tutorial was generated from Java source using Java2Html.

General principles

A few general principles inform the approach I've taken. There may be other approaches (I've certainly seen a few other suggestions) but I know this one works, and I know it performs. Sometimes it's worth
spending time finding the best possible approach. Sometimes it's enough to find an approach that works.

These general ideas apply on both the client and server side. In fact, given that the only difference between the client and the server is whether or not a connection is initiated or accepted, the bulk
of the logic for using NIO for a comms implementation can be factored out and shared.

So, without further ado:

Use a single selecting thread

Although NIO selectors are threadsafe their key sets are not. The upshot of this is that if you try to build a solution that depends on multiple threads accessing your selector you very quickly end up in
one of two situations:

  1. Plagued by deadlocks and race conditions as you build up an increasingly fragile house of cards (or rather house of locks) to avoid stepping on your own toes while accessing the selector and its key sets.
  2. Effectively single-threading access to the selector and its key sets in an attempt to avoid, well, stepping on your own toes.

The upshot of this is that if you want to build a solution based on NIO then trust me and stick to a single selecting thread. Offload events as you see fit but stick
to one thread on the selector.

I tend to handle all I/O within the selecting thread too. This means queuing writes and having the selecting thread perform the actual I/O, and having the selecting thread read off ready channels and offload
the read data to a worker thread. In general I've found this scales well enough so I've not yet had a need to have other threads perform the I/O on the side.

Modify the selector from the selecting thread only

If you look closely at the NIO documentation you'll come across the occasional mention of naive implementations blocking where more efficient implementations might not, usually in the context of altering
the state of the selector from another thread. If you plan on writing code against the NIO libraries that must run on multiple platforms you have to assume the worst. This isn't just hypothetical either, a little experimentation should be enough to convince
you that Sun's Linux implementation is "naive". If you plan on targeting one platform only feel free to ignore this advice but I'd recommend against it. The thing about code is that it oftens ends up in the oddest of places.

As a result, if you plan to hang onto your sanity don't modify the selector from any thread other than the selecting thread. This includes modifying the interest ops set for a selection key, registering
new channels with the selector, and cancelling existing channels.

A number of these changes are naturally initiated by threads that aren't the selecting thread. Think about sending data. This pretty much always has to be initiated by a calling thread that isn't the selecting
thread. Don't try to modify the selector (or a selection key) from the calling thread. Queue the operation where the selecting thread can get to it and call Selector.wakeup. This will wake the selecting thread up. All it needs to do is check if
there are any pending changes, and if there are apply them and go back to selecting on the selector. There are variations on this but that's the general idea.

Set OP_WRITE only when you have data ready

A common mistake is to enable OP_WRITE on a selection key and leave it set. This results in the selecting thread spinning because 99% of the time a socket channel is ready for writing. In fact the only
times it's not going to be ready for writing is during connection establishment or if the local OS socket buffer is full. The correct way to do this is to enable OP_WRITE only when you have data ready to be written on that socket channel. And don't forget
to do it from within the selecting thread.

Alternate between OP_READ and OP_WRITE

If you try to mix OP_READ and OP_WRITE you'll quickly get yourself into trouble. The Sun Windows implementation has been seen to deadlock if you do this. Other implementations may fare better, but I prefer
to play it safe and alternate between OP_READ and OP_WRITE, rather than trying to use them together.

With those out of the way, let's take a look at some actual code. The code presented here could do with a little cleaning up. I've simplified a few things in an attempt to stick to the core issue, using
the NIO libraries, without getting bogged down in too many abstractions or design discussions.

The server

A starting point

We need a little infrastructure before we can start building a NIO server. First, we'll need a thread. This will be our selecting thread and most of our logic will live in the class that is home to it.
And I'm going to have the selecting thread perform reads itself, so we'll need a ByteBuffer to read into. I'm going to use a non-direct buffer rather than a direct buffer. The performance difference is minor in this case and the code is slightly clearer. We're
also going to need a selector and a server socket channel on which to accept connections. Throwing in a few other minor odds and ends we end up with something like this.

public class NioServer implements Runnable {
  // The host:port combination to listen on
  private InetAddress hostAddress;
  private int port;

  // The channel on which we'll accept connections
  private ServerSocketChannel serverChannel;
  
  // The selector we'll be monitoring
  private Selector selector;

  // The buffer into which we'll read data when it's available
  private ByteBuffer readBuffer = ByteBuffer.allocate(8192);

  public NioServer(InetAddress hostAddress, int portthrows IOException {
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
  }

  public static void main(String[] args) {
    try {
      new Thread(new NioServer(null, 9090)).start();
    catch (IOException e) {
      e.printStackTrace();
    }
  }
}

Creating the selector and server channel

The astute will have noticed the call to initSelector() in the constructor. Needless to say this method doesn't exist yet. So let's write it. It's job is to create and initialize a non-blocking
server channel and a selector. It must and then register the server channel with that selector.

  private Selector initSelector() throws IOException {
    // Create a new selector
    Selector socketSelector = SelectorProvider.provider().openSelector();

    // Create a new non-blocking server socket channel
    this.serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);

    // Bind the server socket to the specified address and port
    InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
    serverChannel.socket().bind(isa);

    // Register the server socket channel, indicating an interest in 
    // accepting new connections
    serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

    return socketSelector;
  }

Accepting connections

Right. At this point we have a server socket channel ready and waiting and we've indicated that we'd like to know when a new connection is available to be accepted. Now we need to actually accept it. Which
brings us to our "select loop". This is where most of the action kicks off. In a nutshell our selecting thread sits in a loop waiting until one of the channels registered with the selector is in a state that matches the interest operations we've registered
for it. In this case the operation we're waiting for on the server socket channel is an accept (indicated by OP_ACCEPT). So let's take a look at the first iteration (I couldn't resist) of our run() method.

  public void run() {
    while (true) {
      try {
        // Wait for an event one of the registered channels
        this.selector.select();

        // Iterate over the set of keys for which events are available
        Iterator selectedKeys = this.selector.selectedKeys().iterator();
        while (selectedKeys.hasNext()) {
          SelectionKey key = (SelectionKeyselectedKeys.next();
          selectedKeys.remove();

          if (!key.isValid()) {
            continue;
          }

          // Check what event is available and deal with it
          if (key.isAcceptable()) {
            this.accept(key);
          }
        }
      catch (Exception e) {
        e.printStackTrace();
      }
    }
  }

This only takes us half way though. We still need to accept connections. Which means, you guessed it, an accept() method.

  private void accept(SelectionKey keythrows IOException {
    // For an accept to be pending the channel must be a server socket channel.
    ServerSocketChannel serverSocketChannel = (ServerSocketChannelkey.channel();

    // Accept the connection and make it non-blocking
    SocketChannel socketChannel = serverSocketChannel.accept();
    Socket socket = socketChannel.socket();
    socketChannel.configureBlocking(false);

    // Register the new SocketChannel with our Selector, indicating
    // we'd like to be notified when there's data waiting to be read
    socketChannel.register(this.selector, SelectionKey.OP_READ);
  }

Reading data

Once we've accepted a connection it's only of any use if we can read (or write) data on it. If you look back at the accept() method you'll notice that the newly accepted socket channel was
registered with our selector with OP_READ specified at the operation we're interested in. That means our selecting thread will be released from the call to select() when data becomes available on the socket channel. But what do we do with it?
Well, the first thing we need is a small change to our run() method. We need to check if a socket channel is readable (and deal with it if it is).

          // Check what event is available and deal with it
          if (key.isAcceptable()) {
            this.accept(key);
          else if (key.isReadable()) {
            this.read(key);
          }

Which means we need a read() method...

  private void read(SelectionKey keythrows IOException {
    SocketChannel socketChannel = (SocketChannelkey.channel();

    // Clear out our read buffer so it's ready for new data
    this.readBuffer.clear();
    
    // Attempt to read off the channel
    int numRead;
    try {
      numRead = socketChannel.read(this.readBuffer);
    catch (IOException e) {
      // The remote forcibly closed the connection, cancel
      // the selection key and close the channel.
      key.cancel();
      socketChannel.close();
      return;
    }

    if (numRead == -1) {
      // Remote entity shut the socket down cleanly. Do the
      // same from our end and cancel the channel.
      key.channel().close();
      key.cancel();
      return;
    }

    // Hand the data off to our worker thread
    this.worker.processData(this, socketChannel, this.readBuffer.array(), numRead); 
  }

Hang on, where did worker come from? This is the worker thread we hand data off to once we've read it. For our purposes all we'll do is echo that data back. We'll take a look at the code for
the worker shortly. However, assuming the existence of an EchoWorker class, our infrastructure needs a little updating. We need an extra instance member, a change to our constructor and one or two extras in our main() method.

  private EchoWorker worker;

  public NioServer(InetAddress hostAddress, int port, EchoWorker workerthrows IOException {
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
    this.worker = worker;
  } 

  public static void main(String[] args) {
    try {
      EchoWorker worker = new EchoWorker();
      new Thread(worker).start();
      new Thread(new NioServer(null, 9090, worker)).start();
    catch (IOException e) {
      e.printStackTrace();
    }
  }

Let's take a closer look at EchoWorker. Our echo worker implementation is implemented as a second thread. For the purposes of this example we're only going to have a single worker thread. This
means we can hand events directly to it. We'll do this by calling a method on the instance. This method will stash the event in a local queue and notify the worker thread that there's work available. Needless to say, the worker thread itself will sit in a
loop waiting for events to arrive and when they do it will process them. Events in this case are just data packets. All we're going to do is echo those packets back to the sender.

Normally, instead of calling a method on the worker directly, you'll want to use a form of blocking queue that you can share with as many workers as you like. Java 1.5's concurrency package introduces a BlockingQueue interface
and some implementations, but it's not that hard to roll your own. The logic in the EchoWorker code below is a good start.

public class EchoWorker implements Runnable {
  private List queue = new LinkedList();
  
  public void processData(NioServer server, SocketChannel socket, byte[] data, int count) {
    byte[] dataCopy = new byte[count];
    System.arraycopy(data, 0, dataCopy, 0, count);
    synchronized(queue) {
      queue.add(new ServerDataEvent(server, socket, dataCopy));
      queue.notify();
    }
  }
  
  public void run() {
    ServerDataEvent dataEvent;
    
    while(true) {
      // Wait for data to become available
      synchronized(queue) {
        while(queue.isEmpty()) {
          try {
            queue.wait();
          catch (InterruptedException e) {
          }
        }
        dataEvent = (ServerDataEventqueue.remove(0);
      }
      
      // Return to sender
      dataEvent.server.send(dataEvent.socket, dataEvent.data);
    }
  }
}

If you look at the last line of the run() method you'll see we're calling a new method on the server. Let's take a look at what we need to do to send data back to a client.

Writing data

Writing data to a socket channel is pretty straightforward. However, before we can do that we need to know that the channel is ready for more data. Which means setting the OP_WRITE interest op flag on that
channel's selection key. Since the thread that wants to do this is not the selecting thread we need to queue this request somewhere and wake the selecting thread up. Putting all of this together we need a write() method and a few additional members
in our server class.

  // A list of ChangeRequest instances
  private List changeRequests = new LinkedList();

  // Maps a SocketChannel to a list of ByteBuffer instances
  private Map pendingData = new HashMap();

  public void send(SocketChannel socket, byte[] data) {
    synchronized (this.changeRequests) {
      // Indicate we want the interest ops set changed
      this.changeRequests.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));
      
      // And queue the data we want written
      synchronized (this.pendingData) {
        List queue = (Listthis.pendingData.get(socket);
        if (queue == null) {
          queue = new ArrayList();
          this.pendingData.put(socket, queue);
        }
        queue.add(ByteBuffer.wrap(data));
      }
    }
    
    // Finally, wake up our selecting thread so it can make the required changes
    this.selector.wakeup();
  }

We've introduced another class: ChangeRequest. There's no magic here, it just gives us an easy way to indicate a change that we want made on a particular socket channel. I've jumped ahead a
little with this class, in anticipation of some of the other changes we'll ultimately want to queue.

public class ChangeRequest {
  public static final int REGISTER = 1;
  public static final int CHANGEOPS = 2;
  
  public SocketChannel socket;
  public int type;
  public int ops;
  
  public ChangeRequest(SocketChannel socket, int type, int ops) {
    this.socket = socket;
    this.type = type;
    this.ops = ops;
  }
}

Waking the selecting thread up is of no use until we modify our selecting thread's logic to do what needs to be done. So let's update our run() method.

  public void run() {
    while (true) {
      try {
        // Process any pending changes
        synchronized(this.changeRequests) {
          Iterator changes = this.changeRequests.iterator();
          while (changes.hasNext()) {
            ChangeRequest change = (ChangeRequestchanges.next();
            switch(change.type) {
            case ChangeRequest.CHANGEOPS:
              SelectionKey key = change.socket.keyFor(this.selector);
              key.interestOps(change.ops);
            }
          }
          this.changeRequests.clear(); 
        }
        
        // Wait for an event one of the registered channels
        this.selector.select();

        // Iterate over the set of keys for which events are available
     

抱歉!评论已关闭.