现在的位置: 首页 > 综合 > 正文

Merlin brings nonblocking I/O to the Java platform

2013年06月14日 ⁄ 综合 ⁄ 共 19115字 ⁄ 字号 评论关闭
原文地址:https://www.ibm.com/developerworks/java/library/j-javaio/
Summary:
  The Java technology platform is long overdue for a nonblocking I/O mechanism. Fortunately, Merlin (JDK 1.4) has a magic wand for almost every occasion, and unblocking blocked I/O is this magician's specialty. Software engineers Aruna Kalagnanam
and Balu G introduce the nonblocking features of Merlin's new I/O package, java.nio (NIO), and employ a socket-programming example to show you just what NIO can do.

A server's ability to handle numerous client requests within a reasonable time is dependent on how effectively it uses I/O streams. Aserver that caters to hundreds of clients simultaneously must be able to use I/O services concurrently. Until JDK 1.4 (aka
Merlin), the Java platform did not support nonblocking I/O calls. With an almost one-to-one ratio of threads to clients, servers written in the Java language weresusceptible to enormous thread overhead, which resulted in both performance problems and lack
of scalability.

To address this issue, a new set of classes have been introduced to the Javaplatform with the latest release. Merlin's java.nio package is chock full oftricks for resolving thread overhead, the most important being the newSelectableChannel andSelectorclasses.
A channel represents a means of communication between a clientand a server. Aselector is analogous to a Windows message loop, in whichthe selector captures the various events from different clients and dispatchesthem to their respective event
handlers. In this article, we'll show you how these two classesfunction together to create a nonblocking I/O mechanism for the Javaplatform.

I/O programming before Merlin

We'll start with a look at a basic, pre-Merlin server-socket program. In the lifetime of aServerSocket class, the important functions are as follows:

  • Accept incoming connections
  • Read requests from clients
  • Service those requests

Let's take a look at each of these steps using code snippets to illustrate.First, we create a newServerSocket:

ServerSocket s = new ServerSocket();

Next, we want to accept an incoming call. A call to accept() should dothe trick here, but there's a little trap you have to watch for:

Socket conn = s.accept( );

The call to accept() blocks until the server socket accepts a client request for connection. Once a connection is established, the server reads the client requests, usingLineNumberReader. BecauseLineNumberReaderreads
data in chunks until the buffer is full, the call blocks on a read. The following snippet showsLineNumberReader in action (blocks and all).

InputStream in = conn.getInputStream();
InputStreamReader rdr = new InputStreamReader(in);
LineNumberReader lnr = new LineNumberReader(rdr);
Request req = new Request();
while (!req.isComplete() )
{
   String s = lnr.readLine();
   req.addLine(s);
}

InputStream.read() is another way to read data. Unfortunately,theread method also blocks until data is available, as does thewrite method.

Figure 1 depicts the typical workings of a server. The bold lines represent blocking operations.

Figure 1. A typical server in action
A blocking 1/O diagram

Prior to JDK 1.4, liberal use of threads was the most typical way of getting around blocking. But this solution created its own problem -- namely thread overhead,which impacts both performance and scalability. With the arrival of Merlin and thejava.nio package,
however, everything has changed.

In the sections that follow, we'll look at the foundations of java.nio,and then apply some of what we've learned to revising the server-socketexample described above.


The Reactor pattern

The principal force behind the design of NIO is the Reactor design pattern. Server applications in a distributed system must handle multiple clients thatsend them service requests. Before invoking a specific service, however, theserver application must demultiplex
and dispatch each incoming request to itscorresponding service provider. The Reactor pattern serves precisely this function. Itallows event-driven applications to demultiplex and dispatch service requests, whichare then delivered concurrently to an application
from one or more clients.

Core functions of the Reactor pattern

  • Demultiplexing events
  • Dispatching events to their corresponding event handlers

The Reactor pattern is closely related to the Observer pattern in this aspect: all dependents are informed when a single subject changes. The Observer patternis associated with a single source of events, however, whereas the Reactor pattern is associated
with multiple sources of events.

See Resources to learn more about the Reactor pattern.


Channels and Selectors

NIO's nonblocking I/O mechanism is built around selectors and channels. AChannel class represents a communication mechanism between a server and a client. In keeping with the Reactor pattern, aSelector class is
a multiplexor of Channels. It demultiplexes incoming client requests and dispatches them to their respective request handlers.

We'll look closely at the respective functions of theChannel class and theSelector class, and at how the two work together to create a nonblocking I/O implementation.

What the Channel does

A channel represents an open connection to an entity such as a hardware device,a file, a network socket, or a program component that is capable of performingone or more distinct I/O operations, such as reading or writing. NIO channelscan be asynchronously
closed and interrupted. So, if a thread is blocked in anI/O operation on a channel, another thread can close that channel. Similarly, if a thread is blocked in an I/O operation on a channel, another thread can interrupt that blocked thread.

Figure 2. Class hierarchy for java.nio.channels
A class hierarchy diagram for the java.nio package

As Figure 2 shows, there are quite a few channel interfaces in the java.nio.channelspackage. We're mainly concerned with thejava.nio.channels.SocketChannel andjava.nio.channels.ServerSocketChannel interfaces. These channels can
be treated as replacements forjava.net.Socket andjava.net.ServerSocket, respectively. Channels can be used in a blocking or a nonblocking mode, though of course we will focus on using channelsin nonblocking mode.

Creating a nonblocking channel

We have two new classes to deal with in order to implement basic nonblocking socket read and write operations. These are theInetSocketAddress class from the java.net package, which specifies where to connect to, and theSocketChannel
class from the java.nio.channels package, which does the actual reading and writing operations.

The code snippets in this section show a revised, nonblocking approach to creating a basic server-socket program. Note the changes between these code samples and those used in thefirst example, starting with the addition of our two new classes:

String host = ......;
   InetSocketAddress socketAddress = new InetSocketAddress(host, 80);
	
SocketChannel channel = SocketChannel.open();
   channel.connect(socketAddress);

The role of the buffer

A is an abstract class that contains dataof a specific primitive data type. It is basically a wrapper around a fixed-sizearray with getter/setter methods that make its contents accessible. TheBuffer class has a number
of subclasses, as follows:

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

ByteBuffer is the only class that supports readingand writing from and to the other types, since the other classes are typespecific. Once connected, data can be read from or written to the channel with aByteBuffer object. See Resources
to learn more about the ByteBuffer.

To make the channel nonblocking, we call configureBlockingMethod(false) on the channel, as shown here:

channel.configureBlockingMethod(false);

In blocking mode, a thread will block on a read or a write until theoperation is completely finished. If during a read, data hasnot completely arrived at the socket, the thread will block on theread operation until all the data is available.

In nonblocking mode, the thread will read whatever amount of datais available and return to perform other tasks. IfconfigureBlockingMethod() is passed true, the channel's behavior will be exactly the same as that of a blocking read or write
on aSocket. The one major difference, mentioned above, isthat these blocking reads and writes can be interrupted by other threads.

The Channel alone isn't enough to create a nonblocking I/Oimplementation. AChannel class must work in conjunction with theSelector class to achieve nonblocking I/O.

What the Selector does

The Selector class plays the role of a Reactor in theReactor pattern scenario. TheSelector multiplexes events onseveralSelectableChannels. EachChannel registers events with theSelector.
When events arrive from clients, theSelector demutliplexes them and dispatchesthe events to the correspondingChannels.

The simplest way to create a Selector is to usethe open() method, as shown below:

Selector selector = Selector.open();

Channel meets Selector

Each Channel that has to service client requests must firstcreate a connection. The code below creates aServerSocketChannel calledServer and binds it to a local port:

ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetAddress ia = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(ia, port );
serverChannel.socket().bind(isa);

Each Channel that has to service client requestsmust next register itself with theSelector. AChannel should be registered according to the events itwill handle. For instance, aChannel that acceptsincoming
connections should be registered as shown here:

SelectionKey acceptKey = 
    channel.register( selector,SelectionKey.OP_ACCEPT);

A Channel's registration with the Selector isrepresented by aSelectionKey object. AKey is validuntil one of these three conditions is met:

  • The Channel is closed.
  • The Selector is closed.
  • The Key itself is cancelled by invoking its cancel() method.

The Selector blocks on the select() call. It then waits until a new connection ismade, another thread wakes it up, or another thread interrupts the originalblocked thread.

Registering the Server

Server is the ServerSocketChannel that registers itself with theSelector to accept all incoming connections, as shown here:

SelectionKey acceptKey = serverChannel.register(sel, SelectionKey.OP_ACCEPT);

   while (acceptKey.selector().select() > 0 ){

     ......

After the Server is registered, we iterate throughthe set of keys and handle each one based on its type. After a key is processed, it isremoved from the list of ready keys, as shown here:

Set readyKeys = sel.selectedKeys();
    Iterator it = readyKeys.iterator();
while (it.hasNext()) 
{

SelectionKey key = (SelectionKey)it.next();
  it.remove();
  ....
  ....
  ....
 }

If the key is acceptable, the connection is accepted and thechannel is registered for further events such as read or write operations. If the key is readable or writable, the server indicates it is ready to reador write data on its end:

SocketChannel socket;
if (key.isAcceptable()) {
    System.out.println("Acceptable Key");
    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    socket = (SocketChannel) ssc.accept();
    socket.configureBlocking(false);
    SelectionKey another = 
      socket.register(sel,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
if (key.isReadable()) {
    System.out.println("Readable Key");
    String ret = readMessage(key);
    if (ret.length() > 0) {
      writeMessage(socket,ret);
    }
		    
}
if (key.isWritable()) {
    System.out.println("Writable Key");
    String ret = readMessage(key);
    socket = (SocketChannel)key.channel();   
    if (result.length() > 0 ) {
      writeMessage(socket,ret);
    }
    }


Abracadabra -- nonblocking server socket appear!

The final part of this introduction to nonblocking I/O in JDK 1.4 is left to you: running the example.

In this simple nonblocking server-socket example, the server reads a file name sent from the client, displays the file contents, and writes the contents back to the client.

Here's what you need to do to run the example:

  1. Install JDK 1.4 (see Resources).

  2. Copy both source files onto your directory.
  3. Compile and run the server as java NonBlockingServer.
  4. Compile and run the client as java Client.
  5. Input the name of a text or java file in the directory where the class files are present.
  6. The server will read the file and send its contents to the client.
  7. The client will print out the data received from the server. (Only 1024 bytes will be read since that is the limit of theByteBuffer used.)
  8. Close the client by entering the command to quit or shutdown.

Conclusion

The new I/O packages from Merlin cover a broad scope. The major advantage of Merlin's new nonblocking I/O implementation is twofold:threads no longer block on reads or writes and theSelector is able to handle multiple connections,greatly reducing
server application overhead.

We have highlighted these two primary advantages of the newjava.nio package. We hope that you will apply what you've learned here to your real-time application development efforts.


Download

Name Size Download method
j-javaio.zip 3KB HTTP

Resources

程序源代码

 Client.java
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.io.*;
import java.nio.channels.spi.*;
import java.nio.charset.*;
import java.lang.*;
 
 
public class Client
{
    public SocketChannel client = null;
    public InetSocketAddress isa = null;
    public RecvThread rt = null;
 
    public Client()
    {
    }
     
    public void makeConnection()
    {
        int result = 0;
        try
        {
            
            client = SocketChannel.open();
                isa = new InetSocketAddress("nicholson",4900);
            client.connect(isa);
            client.configureBlocking(false);
            receiveMessage();     
        }
        catch(UnknownHostException e)
        {
            e.printStackTrace();
        }
        catch(IOException e)
        {
            e.printStackTrace();
        }
        while ((result = sendMessage()) != -1)
        {
        }
 
        try
        {
            client.close();
            System.exit(0);
        }
        catch(IOException e)
        {
            e.printStackTrace();
        }
    }
     
    public int sendMessage()
    {
        System.out.println("Inside SendMessage");
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        String msg = null;
        ByteBuffer bytebuf = ByteBuffer.allocate(1024);
        int nBytes = 0;
        try
        {
            msg = in.readLine();
            System.out.println("msg is "+msg);
            bytebuf = ByteBuffer.wrap(msg.getBytes());
            nBytes = client.write(bytebuf);
            System.out.println("nBytes is "+nBytes);
            if (msg.equals("quit") || msg.equals("shutdown")) {
                System.out.println("time to stop the client");
                interruptThread();
                try
                {
                    Thread.sleep(5000);
                }
                catch(Exception e)
                {
                    e.printStackTrace();
                }
                client.close();
                return -1;
            }
         
        }
        catch(IOException e)
        {
            e.printStackTrace();
        }
        System.out.println("Wrote "+nBytes +" bytes to the server");
        return nBytes;
    }
 
    public void receiveMessage()
    {
        rt = new RecvThread("Receive THread",client);
        rt.start();
 
    }
 
    public void interruptThread()
    {
        rt.val = false;
    }
 
    public static void main(String args[])
    {
        Client cl = new Client();
        cl.makeConnection();
    }
 
    public class RecvThread extends Thread
    {
        public SocketChannel sc = null;
        public boolean val = true;
    
        public RecvThread(String str,SocketChannel client)
        {
            super(str);
            sc = client;
        }
    
        public void run() {
 
            System.out.println("Inside receivemsg");
            int nBytes = 0;
            ByteBuffer buf = ByteBuffer.allocate(2048);
            try
            {
                while (val)
                {
                    while ( (nBytes = nBytes = client.read(buf)) > 0){
                        buf.flip();
                        Charset charset = Charset.forName("us-ascii");
                        CharsetDecoder decoder = charset.newDecoder();
                        CharBuffer charBuffer = decoder.decode(buf);
                        String result = charBuffer.toString();
                                    System.out.println(result);
                        buf.flip();
                        
                    }
                }
            
            }
            catch(IOException e)
            {
                e.printStackTrace();
            
            }
             
 
        }
    }
}

Server.java

 
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;
import java.nio.charset.*;
import java.lang.*;
 
 
public class NonBlockingServer
{
    public Selector sel = null;
    public ServerSocketChannel server = null;
    public SocketChannel socket = null;
    public int port = 4900;
    String result = null;
 
 
    public NonBlockingServer()
    {
        System.out.println("Inside default ctor");
    }
     
    public NonBlockingServer(int port)
    {
        System.out.println("Inside the other ctor");
        port = port;
    }
 
    public void initializeOperations() throws IOException,UnknownHostException
    {
        System.out.println("Inside initialization");
        sel = Selector.open();
        server = ServerSocketChannel.open();
        server.configureBlocking(false);
        InetAddress ia = InetAddress.getLocalHost();
        InetSocketAddress isa = new InetSocketAddress(ia,port);
        server.socket().bind(isa);
    }
     
    public void startServer() throws IOException
    {
        System.out.println("Inside startserver");
        initializeOperations();
        System.out.println("Abt to block on select()");
        SelectionKey acceptKey = server.register(sel, SelectionKey.OP_ACCEPT );    

    
        while (acceptKey.selector().select() > 0 )
        {    
         
            Set readyKeys = sel.selectedKeys();
            Iterator it = readyKeys.iterator();
 
            while (it.hasNext()) {
                SelectionKey key = (SelectionKey)it.next();
                it.remove();
                 
                if (key.isAcceptable()) {
                    System.out.println("Key is Acceptable");
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

                    socket = (SocketChannel) ssc.accept();
                    socket.configureBlocking(false);
                    SelectionKey another = socket.register(sel,SelectionKey.OP_READ|SelectionKey.OP_WRITE);

                }
                if (key.isReadable()) {
                    System.out.println("Key is readable");
                    String ret = readMessage(key);
                    if (ret.length() > 0) {
                        writeMessage(socket,ret);
                    }
                }
                if (key.isWritable()) {
                    System.out.println("THe key is writable");
                    String ret = readMessage(key);
                    socket = (SocketChannel)key.channel();
                    if (result.length() > 0 ) {
                        writeMessage(socket,ret);
                    }
                }
            }
        }
    }
 
    public void writeMessage(SocketChannel socket,String ret)
    {
        System.out.println("Inside the loop");
 
        if (ret.equals("quit") || ret.equals("shutdown")) {
            return;
        }
        File file = new File(ret);
        try
        {
        
            RandomAccessFile rdm = new RandomAccessFile(file,"r");
            FileChannel fc = rdm.getChannel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            fc.read(buffer);
            buffer.flip();
     
            Charset set = Charset.forName("us-ascii");
            CharsetDecoder dec = set.newDecoder();
            CharBuffer charBuf = dec.decode(buffer);
            System.out.println(charBuf.toString());
            buffer = ByteBuffer.wrap((charBuf.toString()).getBytes());
            int nBytes = socket.write(buffer);
            System.out.println("nBytes = "+nBytes);
                result = null;
        }
        catch(Exception e)
        {
            e.printStackTrace();
        }
 
    }
   
    public String readMessage(SelectionKey key)
    {
        int nBytes = 0;
        socket = (SocketChannel)key.channel();
        ByteBuffer buf = ByteBuffer.allocate(1024);
        try
        {
            nBytes = socket.read(buf);
            buf.flip();
            Charset charset = Charset.forName("us-ascii");
            CharsetDecoder decoder = charset.newDecoder();
            CharBuffer charBuffer = decoder.decode(buf);
            result = charBuffer.toString();
         
        }
        catch(IOException e)
        {
            e.printStackTrace();
        }
        return result;
    }
 
    public static void main(String args[])
    {
        NonBlockingServer nb = new NonBlockingServer();
        try
        {
            nb.startServer();
        }
        catch (IOException e)
        {
            e.printStackTrace();
            System.exit(-1);
        }
        
    }
}
 
 

抱歉!评论已关闭.