现在的位置: 首页 > 综合 > 正文

Tomcat之Connector处理过程分析

2018年02月11日 ⁄ 综合 ⁄ 共 7299字 ⁄ 字号 评论关闭

TomcatServlet容器著称(处理JspServlet等动态资源的应用服务器)。由Tomcat的总体架构可知,Servlet容器由2个主要组件构成:Connector(连接器)和Container(容器)。Connector负责接收客户端的请求,而Container处理并响应该请求。

JavaEE规范可知,Servlet容器内部只处理HTTP协议的请求,但是对于连接器的设计来说,它可以接收任何协议(如HTTPAJP等)的请求,因此,在连接器接收到客户端的请求(Socket)后,需要将该请求包装成容器可以处理的对象,然后再传递给容器处理,同时,连接器也要创建一个响应对象一并传给容器,好让容器响应请求。如此一来,容器就与具体传输协议解耦了,而这正是Connector架构所要达到的目的。

以上是闲话,但是我觉得说的挺好的,摘自网上。

我们将Servlet容器分成两个部分:Connector和Container。

我们知道客户端和服务器之间要进行通信也是要建立连接的,也需要初始化套接字,通过套接字通信。在Servlet容器中就是通过Connector这个部分来实现。

在Connector进入start状态后,调用protocolHandler.start(),protocolHandler是根据你的协议类型来设置的,默认为org.apache.coyote.http11.Http11Protocol。在Http11Protocol中是通过JIoEndpoint来进行监听套接字,用Http11ConnectionHandler去处理这个套接字的通信。

protocolHandler的start()中通过endPoint来建立连接,endpoint.start()
//
JIoEndpoint endpoint

for (int i = 0; i < acceptorThreadCount; i++) {
                Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);
                acceptorThread.setPriority(threadPriority);
                acceptorThread.setDaemon(daemon);
                acceptorThread.start();
            }

创建的Acceptor线程用来接受HTTP连接请求。

 protected class Acceptor implements Runnable {


        /**
         * The background thread that listens for incoming TCP/IP connections and
         * hands them off to an appropriate processor.
         */
        public void run() {

            // Loop until we receive a shutdown command
            while (running) {

                // Loop if endpoint is paused
                while (paused) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                // Accept the next incoming connection from the server socket
                try {
                    Socket socket = serverSocketFactory.acceptSocket(serverSocket);
                    serverSocketFactory.initSocket(socket);
                    // Hand this socket off to an appropriate processor
                    if (!processSocket(socket)) {
                        // Close socket right away
                        try {
                            socket.close();
                        } catch (IOException e) {
                            // Ignore
                        }
                    }
                }catch ( IOException x ) {
                    if ( running ) log.error(sm.getString("endpoint.accept.fail"), x);
                } catch (Throwable t) {
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }

                // The processor will recycle itself when it finishes

            }

        }

    }

现在socket就可以用来和客户端进行通信。Acceptor只是用来接受连接,但是具体的处理并不是由它来解决的。而是交给Worker线程。

protected boolean processSocket(Socket socket) {
        try {
            if (executor == null) {
                getWorkerThread().assign(socket);
            } else {
                executor.execute(new SocketProcessor(socket));
            }
        } catch (Throwable t) {
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

当Workder线程启动时是处于阻塞状态的,它一直阻塞在await(),一直在等待有个socket来处理,当我们将socket通过assign(socket)交给Worker线程,那么就不再处于阻塞状态了。

 public void run() {

            // Process requests until we receive a shutdown signal
            while (running) {

                // Wait for the next socket to be assigned
                Socket socket = await();
                if (socket == null)
                    continue;

                // Process the request from this socket
                if (!setSocketOptions(socket) || !handler.process(socket)) {
                    // Close socket
                    try {
                        socket.close();
                    } catch (IOException e) {
                    }
                }

                // Finish up this request
                socket = null;
                recycleWorkerThread(this);

            }

        }

synchronized void assign(Socket socket) {

            // Wait for the Processor to get the previous Socket
            while (available) {
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }

            // Store the newly available Socket and notify our thread
            this.socket = socket;
            available = true;
            notifyAll();

        }

        
        /**
         * Await a newly assigned Socket from our Connector, or <code>null</code>
         * if we are supposed to shut down.
         */
        private synchronized Socket await() {

            // Wait for the Connector to provide a new Socket
            while (!available) {
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }

            // Notify the Connector that we have received this Socket
            Socket socket = this.socket;
            available = false;
            notifyAll();

            return (socket);

        }

我们在之前就提过,Http11Protocol中是通过JIoEndpoint来进行监听套接字,用Http11ConnectionHandler(Http11Protocol的内部类)去处理这个套接字的通信,所以Worker其实还是将socket交给Http11ConnectionHandler处理,handler.process(socket)。这个handler还是Http11Protocol在init()时交给JIoEndpoint。

public void init() throws Exception {
        endpoint.setName(getName());
        endpoint.setHandler(cHandler);

public boolean process(Socket socket) {
            Http11Processor processor = recycledProcessors.poll();
            try {


                if (processor == null) {
                    processor = createProcessor();
                }


                if (processor instanceof ActionHook) {
                    ((ActionHook) processor).action(ActionCode.ACTION_START, null);
                }


                if (proto.isSSLEnabled() && (proto.sslImplementation != null)) {
                    processor.setSSLSupport
                        (proto.sslImplementation.getSSLSupport(socket));
                } else {
                    processor.setSSLSupport(null);
                }
                
                processor.process(socket);
                return false;


            } catch(java.net.SocketException e) {
                // SocketExceptions are normal
                Http11Protocol.log.debug
                    (sm.getString
                     ("http11protocol.proto.socketexception.debug"), e);
            } catch (java.io.IOException e) {
                // IOExceptions are normal
                Http11Protocol.log.debug
                    (sm.getString
                     ("http11protocol.proto.ioexception.debug"), e);
            }
            // Future developers: if you discover any other
            // rare-but-nonfatal exceptions, catch them here, and log as
            // above.
            catch (Throwable e) {
                // any other exception or error is odd. Here we log it
                // with "ERROR" level, so it will show up even on
                // less-than-verbose logs.
                Http11Protocol.log.error
                    (sm.getString("http11protocol.proto.error"), e);
            } finally {
                //       if(proto.adapter != null) proto.adapter.recycle();
                //                processor.recycle();


                if (processor instanceof ActionHook) {
                    ((ActionHook) processor).action(ActionCode.ACTION_STOP, null);
                }
                recycledProcessors.offer(processor);
            }
            return false;
        }

从这里我们看出其实真正处理socket的是Http11Processor。

1:初始化流。

// Setting up the I/O
        this.socket = theSocket;
        inputBuffer.setInputStream(socket.getInputStream());
        outputBuffer.setOutputStream(socket.getOutputStream());

2:解析头部。

// Parsing the request header
            try {
                if (keptAlive) {
                    if (keepAliveTimeout > 0) {
                        socket.setSoTimeout(keepAliveTimeout);
                    }
                    else if (soTimeout > 0) {
                        socket.setSoTimeout(soTimeout);
                    }
                }
                inputBuffer.parseRequestLine();
                request.setStartTime(System.currentTimeMillis());
                keptAlive = true;
                if (disableUploadTimeout) {
                    socket.setSoTimeout(soTimeout);
                } else {
                    socket.setSoTimeout(timeout);
                }
                // Set this every time in case limit has been changed via JMX
                request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
                inputBuffer.parseHeaders();
            } catch (IOException e) {
                error = true;
                break;
            } catch (Throwable t) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("http11processor.header.parse"), t);
                }
                // 400 - Bad Request
                response.setStatus(400);
                adapter.log(request, response, 0);
                error = true;
            }

3:对request的其他处理。

  // Process the request in the adapter
            if (!error) {
                try {
                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                    adapter.service(request, response);
                    // Handle when the response was committed before a serious
                    // error occurred.  Throwing a ServletException should both
                    // set the status to 500 and set the errorException.
                    // If we fail here, then the response is likely already
                    // committed, so we can't try and set headers.
                    if(keepAlive && !error) { // Avoid checking twice.
                        error = response.getErrorException() != null ||
                                statusDropsConnection(response.getStatus());
                    }

                } 

这里是调用adapter的service方法。

public interface Adapter {
    public void service(Request req, Response res)
            throws Exception;
    public boolean event(Request req, Response res, SocketStatus status)
            throws Exception;
    public void log(Request req, Response res, long time);
}

adapter是一个适配器。他适配了org.apache.coyote.Request和org.apache.catalina.connector.Request,后者正是继承了HttpServletRequest。不知道你有没有跟我相同的疑问,为什么需要这样一个适配器,这不是多此一举吗?事实上,这里适配器起到了隔离内外的作用,对于coyote内部,用coyote.Request可以做一些request本身职责以外的事情(但是又不能不在request中实现,因为生命周期就是request,如设置开始结束时间,日志记录等等),但是这些事情不需要给开发者看到,有可能会被误用导致coyote出错,于是暴露给开发者的是connector.Request,中间使用adapter来适配。adapter的初始化还要追溯到Connector。

// Initializa adapter
        adapter = new CoyoteAdapter(this);
        protocolHandler.setAdapter(adapter);

prorocolHandler是Http11ConnectionHandler。而Http11ConnectionHandler又把adapter交给了Http11Processor

 processor.setAdapter(proto.adapter);

而Adapter.service(...)就是在Http11Processor中被调用的。

其实这么多只是为了一个目的:接受客户端的连接请求,对请求进行处理,再返回给客户端一个响应。只是过程有点复杂。

总结用到的类:Connector;Http11Protocol;Http11Processor;JIoEndpoint;Adapter(CoyoteAdapter)。还有很多的内部类,比如Worker,Acceptor,Http11ConnectionHandler等

现在大概清楚了请求的处理过程了,但是细节还是不怎么清楚。看了一晚上,继续努力。

抱歉!评论已关闭.