本文最后更新于:April 11, 2022 pm
之前写了两篇关于 NIO 的文章,第一篇介绍了 NIO 的 Channel、Buffer、Selector 使用,第二篇介绍了非阻塞 IO 和异步 IO,并展示了简单的用例。
本文将介绍 Tomcat 中的 NIO 使用,使大家对 Java NIO 的生产使用有更加直观的认识。
虽然本文的源码篇幅也不短,但是 Tomcat 的源码毕竟不像 Doug Lea 的并发源码那么“变态”,对于大部分读者来说,阅读难度比之前介绍的其他并发源码要简单一些,所以读者不要觉得有什么压力。
本文基于 Tomcat 版本 9.0.6。
先简单画一张图示意一下本文的主要内容:
目录 源码环境准备 Tomcat 9.0.6 下载地址:https://tomcat.apache.org/download-90.cgi
由于上面下载的 tomcat 的源码并没有使用 maven 进行组织,不方便我们看源码,也不方便我们进行调试。这里我们将使用 maven 仓库中的 tomcat-embed-core,自己编写代码进行启动的方式来进行调试。
首先,创建一个空的 maven 工程,然后添加以下依赖。
<dependency > <groupId > org.apache.tomcat.embed</groupId > <artifactId > tomcat-embed-core</artifactId > <version > 9.0.6</version > </dependency >
上面的依赖,只会将 tomcat-embed-core-9.0.6.jar 和 tomcat-annotations-api-9.0.6.jar 两个包引进来,对于本文来说,已经足够了,如果你需要其他功能,需要额外引用其他的依赖,如 Jasper。
然后,使用以下启动方法:
public static void main (String[] args) throws LifecycleException { Tomcat tomcat = new Tomcat(); Connector connector = new Connector("HTTP/1.1" ); connector.setPort(8080 ); tomcat.setConnector(connector); tomcat.start(); tomcat.getServer().await(); }
经过以上的代码,我们的 Tomcat 就启动起来了。
Tomcat 中的其他接口感兴趣的读者请自行探索,如设置 webapp 目录,设置 resources 等
这里,介绍第一个重要的概念:Connector 。在 Tomcat 中,使用 Connector 来处理连接,一个 Tomcat 可以配置多个 Connector,分别用于监听不同端口,或处理不同协议。
在 Connector 的构造方法中,我们可以传 HTTP/1.1
或 AJP/1.3
用于指定协议,也可以传入相应的协议处理类,毕竟协议不是重点,将不同端口进来的连接对应不同处理类才是正道。典型地,我们可以指定以下几个协议处理类:
org.apache.coyote.http11.Http11NioProtocol:对应非阻塞 IO
org.apache.coyote.http11.Http11Nio2Protocol:对应异步 IO
org.apache.coyote.http2.Http2Protocol:对应 http2 协议,对 http2 感兴趣的读者,赶紧看起来吧。
本文的重点当然是非阻塞 IO 了,之前已经介绍过异步 IO
的基础知识了,读者看完本文后,如果对异步 IO 的处理流程感兴趣,可以自行去分析一遍。
如果你使用 9.0 以前的版本,Tomcat 在启动的时候是会自动配置一个 connector 的,我们可以不用显示配置。
9.0 版本的 Tomcat#start() 方法:
public void start () throws LifecycleException { getServer(); server.start(); }
8.5 及之前版本的 Tomcat#start() 方法:
public void start () throws LifecycleException { getServer(); getConnector(); server.start(); }
endpoint 前面我们说过一个 Connector 对应一个协议,当然这描述也不太对,NIO 和 NIO2 就都是处理 HTTP/1.1 的,只不过一个使用非阻塞,一个使用异步。进到指定 protocol 代码,我们就会发现,它们的代码及其简单,只不过是指定了特定的 endpoint 。
打开 Http11NioProtocol
和 Http11Nio2Protocol
源码,我们可以看到,在构造方法中,它们分别指定了 NioEndpoint 和 Nio2Endpoint。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Http11NioProtocol extends AbstractHttp11JsseProtocol <NioChannel > { public Http11NioProtocol () { super (new NioEndpoint()); } ... }public class Http11Nio2Protocol extends AbstractHttp11JsseProtocol <Nio2Channel > { public Http11Nio2Protocol () { super (new Nio2Endpoint()); } ... }
这里介绍第二个重要的概念:endpoint 。Tomcat 使用不同的 endpoint 来处理不同的协议请求,今天我们的重点是 NioEndpoint ,其使用非阻塞 IO 来进行处理 HTTP/1.1 协议的请求。
NioEndpoint 继承 => AbstractJsseEndpoint 继承 => AbstractEndpoint 。中间的 AbstractJsseEndpoint 主要是提供了一些关于 HTTPS
的方法,这块我们暂时忽略它,后面所有关于 HTTPS 的我们都直接忽略,感兴趣的读者请自行分析。
init 过程分析 下面,我们看看从 tomcat.start() 一直到 NioEndpoint 的过程。
1. AbstractProtocol # init
@Override public void init () throws Exception { ... String endpointName = getName(); endpoint.setName(endpointName.substring(1 , endpointName.length()-1 )); endpoint.setDomain(domain); endpoint.init(); }
2. AbstractEndpoint # init
public final void init () throws Exception { if (bindOnInit) { bind(); bindState = BindState.BOUND_ON_INIT; } ... }
3. NioEndpoint # bind
这里就到我们的 NioEndpoint 了,要使用到我们之前学习的 NIO 的知识了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Override public void bind () throws Exception { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = (getAddress()!=null ?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort())); serverSock.socket().bind(addr,getAcceptCount()); serverSock.configureBlocking(true ); if (acceptorThreadCount == 0 ) { acceptorThreadCount = 1 ; } if (pollerThreadCount <= 0 ) { pollerThreadCount = 1 ; } setStopLatch(new CountDownLatch(pollerThreadCount)); initialiseSsl(); selectorPool.open(); }
ServerSocketChannel 已经打开,并且绑定要了之前指定的 8080 端口,设置成了阻塞模式 。
设置了 acceptor 的线程数为 1
设置了 poller 的线程数,单核 CPU 为 1,多核为 2
打开了一个 SelectorPool,我们先忽略这个
到这里,我们还不知道 Acceptor 和 Poller 是什么东西,我们只是设置了它们的数量,我们先来看看最后面提到的 SelectorPool。
start 过程分析 刚刚我们分析完了 init() 过程,下面是启动过程 start() 分析。
AbstractProtocol # start
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public void start () throws Exception { ... endpoint.start(); asyncTimeout = new AsyncTimeout(); Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout" ); int priority = endpoint.getThreadPriority(); if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { priority = Thread.NORM_PRIORITY; } timeoutThread.setPriority(priority); timeoutThread.setDaemon(true ); timeoutThread.start(); }
AbstractEndpoint # start
public final void start () throws Exception { if (bindState == BindState.UNBOUND) { bind(); bindState = BindState.BOUND_ON_START; } startInternal(); }
下面这个方法还是比较重要的,这里会创建前面说过的 acceptor 和 poller。
NioEndpoint # startInternal
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Override public void startInternal () throws Exception { if (!running) { running = true ; paused = false ; processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool()); if ( getExecutor() == null ) { createExecutor(); } initializeConnectionLatch(); pollers = new Poller[getPollerThreadCount()]; for (int i=0 ; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-" +i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true ); pollerThread.start(); } startAcceptorThreads(); } }
到这里,我们启动了工作线程池 、 poller 线程组 、acceptor 线程组 。同时,工作线程池初始就已经启动了 10 个线程。我们用 jconsole 来看看此时的线程,请看下图:
从 jconsole 中,我们可以看到,此时启动了 BlockPoller、worker、poller、acceptor、AsyncTimeout,大家应该都已经清楚了每个线程是哪里启动的吧。
Tomcat 中并没有 Worker 这个类,此名字是我瞎编。
此时,我们还是不知道 acceptor、poller 甚至 worker 到底是干嘛的,下面,我们从 acceptor 线程开始看起。
Acceptor 它的结构非常简单,在构造函数中,已经把 endpoint 传进来了,此外就只有 threadName 和 state 两个简单的属性。
private final AbstractEndpoint<?,U> endpoint;private String threadName;protected volatile AcceptorState state = AcceptorState.NEW;public Acceptor (AbstractEndpoint<?,U> endpoint) { this .endpoint = endpoint; }
threadName 就是一个线程名字而已,Acceptor 的状态 state 主要是随着 endpoint 来的。
public enum AcceptorState { NEW, RUNNING, PAUSED, ENDED }
我们直接来看 acceptor 的 run 方法吧:
Acceptor # run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 @Override public void run () { int errorDelay = 0 ; while (endpoint.isRunning()) { while (endpoint.isPaused() && endpoint.isRunning()) { state = AcceptorState.PAUSED; try { Thread.sleep(50 ); } catch (InterruptedException e) { } } if (!endpoint.isRunning()) { break ; } state = AcceptorState.RUNNING; try { endpoint.countUpOrAwaitConnection(); if (endpoint.isPaused()) { continue ; } U socket = null ; try { socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { endpoint.countDownConnection(); if (endpoint.isRunning()) { errorDelay = handleExceptionWithDelay(errorDelay); throw ioe; } else { break ; } } errorDelay = 0 ; if (endpoint.isRunning() && !endpoint.isPaused()) { if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); String msg = sm.getString("endpoint.accept.fail" ); if (t instanceof Error) { Error e = (Error) t; if (e.getError() == 233 ) { log.warn(msg, t); } else { log.error(msg, t); } } else { log.error(msg, t); } } } state = AcceptorState.ENDED; }
大家应该发现了,Acceptor 绕来绕去,都是在调用 NioEndpoint 的方法,我们简单分析一下这个。
在 NioEndpoint init 的时候,我们开启了一个 ServerSocketChannel,后来 start 的时候,我们开启多个 acceptor(实际上,默认是 1 个),每个 acceptor 启动以后就开始循环调用 ServerSocketChannel 的 accept() 方法获取新的连接,然后调用 endpoint.setSocketOptions(socket) 处理新的连接,之后再进入循环 accept 下一个连接。
到这里,大家应该也就知道了,为什么这个叫 acceptor 了吧?接下来,我们来看看 setSocketOptions 方法到底做了什么。
NioEndpoint # setSocketOptions
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Override protected boolean setSocketOptions (SocketChannel socket) { try { socket.configureBlocking(false ); Socket sock = socket.socket(); socketProperties.setProperties(sock); NioChannel channel = nioChannels.pop(); if (channel == null ) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this ); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("" ,t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } return false ; } return true ; }
我们看到,这里又没有进行实际的处理,而是将这个 SocketChannel 注册 到了其中一个 poller 上。因为我们知道,acceptor 应该尽可能的简单,只做 accept 的工作,简单处理下就往后面扔。acceptor 还得回到之前的循环去 accept 新的连接呢。
我们只需要明白,此时,往 poller 中注册了一个 NioChannel 实例,此实例包含客户端过来的 SocketChannel 和一个 SocketBufferHandler 实例。
Poller 之前我们看到 acceptor 将一个 NioChannel 实例 register 到了一个 poller 中。在看 register 方法之前,我们需要先对 poller 要有个简单的认识。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class Poller implements Runnable { public Poller () throws IOException { this .selector = Selector.open(); } private Selector selector; private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>(); private volatile boolean close = false ; private long nextExpiration = 0 ; private AtomicLong wakeupCounter = new AtomicLong(0 ); private volatile int keyCount = 0 ; ... }
敲重点:每个 poller 关联了一个 Selector。
Poller 内部围着一个 events 队列转,来看看其 events() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public boolean events () { boolean result = false ; PollerEvent pe = null ; for (int i = 0 , size = events.size(); i < size && (pe = events.poll()) != null ; i++ ) { result = true ; try { pe.run(); pe.reset(); if (running && !paused) { eventCache.push(pe); } } catch ( Throwable x ) { log.error("" ,x); } } return result; }
events() 方法比较简单,就是取出当前队列中的 PollerEvent 对象,逐个执行 event.run() 方法。
然后,现在来看 Poller 的 run() 方法,该方法会一直循环,直到 poller.destroy() 被调用。
Poller # run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public void run () { while (true ) { boolean hasEvents = false ; try { if (!close) { hasEvents = events(); if (wakeupCounter.getAndSet(-1 ) > 0 ) { keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0 ); } if (close) { events(); timeout(0 , false ); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail" ), ioe); } break ; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("" ,x); continue ; } if ( keyCount == 0 ) hasEvents = (hasEvents | events()); Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null ; while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); if (attachment == null ) { iterator.remove(); } else { iterator.remove(); processKey(sk, attachment); } } timeout(keyCount,hasEvents); } getStopLatch().countDown(); }
poller 的 run() 方法主要做了调用 events() 方法和处理注册到 Selector 上的 ready key,这里我们暂时不展开 processKey 方法,因为此方法必定是及其复杂的。
我们回过头来看之前从 acceptor 线程中调用的 register 方法。
Poller # register
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void register (final NioChannel socket) { socket.setPoller(this ); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this ); socket.setSocketWrapper(ka); ka.setPoller(this ); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); ka.setKeepAliveLeft(NioEndpoint.this .getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); PollerEvent r = eventCache.pop(); ka.interestOps(SelectionKey.OP_READ); if ( r==null ) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r); }
这里将这个 socket(包含 socket 和 buffer 的 NioChannel 实例) 包装为一个 PollerEvent,然后添加到 events 中,此时调用此方法的 acceptor 结束返回,去处理新的 accepted 连接了。
接下来,我们已经知道了,poller 线程在循环过程中会不断调用 events() 方法,那么 PollerEvent 的 run() 方法很快就会被执行,我们就来看看刚刚这个新的连接被注册 到这个 poller 后,会发生什么。
PollerEvent # run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Override public void run () { if (interestOps == OP_REGISTER) { try { socket.getIOChannel().register( socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper); } catch (Exception x) { log.error(sm.getString("endpoint.nio.registerFail" ), x); } } else { final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { if (key == null ) { socket.socketWrapper.getEndpoint().countDownConnection(); } else { final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment(); if (socketWrapper != null ) { int ops = key.interestOps() | interestOps; socketWrapper.interestOps(ops); key.interestOps(ops); } else { socket.getPoller().cancelledKey(key); } } } catch (CancelledKeyException ckx) { try { socket.getPoller().cancelledKey(key); } catch (Exception ignore) {} } } }
到这里,我们再回顾一下:刚刚在 PollerEvent 的 run() 方法中,我们看到,新的 SocketChannel 注册到了 Poller 内部的 Selector 中,监听 OP_READ 事件,然后我们再回到 Poller 的 run() 看下,一旦该 SocketChannel 是 readable 的状态,那么就会进入到 poller 的 processKey 方法。
processKey Poller # processKey
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 protected void processKey (SelectionKey sk, NioSocketWrapper attachment) { try { if ( close ) { cancelledKey(sk); } else if ( sk.isValid() && attachment != null ) { if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false ); } else { unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false ; if (sk.isReadable()) { if (!processSocket(attachment, SocketEvent.OPEN_READ, true )) { closeSocket = true ; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true )) { closeSocket = true ; } } if (closeSocket) { cancelledKey(sk); } } } } else { cancelledKey(sk); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("" ,t); } }
接下来是 processSocket 方法,注意第三个参数,上面进来的时候是 true。
AbstractEndpoint # processSocket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public boolean processSocket (SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null ) { return false ; } SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null ) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } Executor executor = getExecutor(); if (dispatch && executor != null ) { executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail" , socketWrapper) , ree); return false ; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); getLog().error(sm.getString("endpoint.process.fail" ), t); return false ; } return true ; }
NioEndpoint # createSocketProcessor
@Override protected SocketProcessorBase<NioChannel> createSocketProcessor ( SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) { return new SocketProcessor(socketWrapper, event); }
我们看到,提交到 worker 线程池中的是 NioEndpoint.SocketProcessor 的实例,至于它的 run() 方法之后的逻辑,我们就不再继续往里分析了。
总结 最后,再祭出文章开始的那张图来总结一下:
这里简单梳理下前面我们说的流程,帮大家回忆一下:
指定 Protocol,初始化相应的 Endpoint,我们分析的是 NioEndpoint;
init 过程:在 NioEndpoint 中做 bind 操作;
start 过程:启动 worker 线程池,启动 1 个 Acceptor 和 2 个 Poller,当然它们都是默认值,可配;
Acceptor 获取到新的连接后,getPoller0() 获取其中一个 Poller,然后 register 到 Poller 中;
Poller 循环 selector.select(xxx),如果有通道 readable,那么在 processKey 中将其放到 worker 线程池中。
后续的流程,请自行分析。