本文最后更新于:April 11, 2022 pm
Netty 源码分析系列:
Netty 源码解析(一): 开始
Netty 源码解析(二): Netty 的 Channel
Netty 源码解析(三): Netty 的 Future 和 Promise
Netty 源码解析(四): Netty 的 ChannelPipeline
Netty 源码解析(五): Netty 的线程池分析
Netty 源码解析(六): Channel 的 register 操作
Netty 源码解析(七): NioEventLoop 工作流程
Netty 源码解析(八): 回到 Channel 的 register 操作
Netty 源码解析(九): connect 过程和 bind 过程分析
目录 connect 过程和 bind 过程分析 上面我们介绍的 register 操作非常关键,它建立起来了很多的东西,它是 Netty 中 NioSocketChannel 和 NioServerSocketChannel 开始工作的起点。
这一节,我们来说说 register 之后的 connect 操作和 bind 操作。这节非常简单。
connect 过程分析 对于客户端 NioSocketChannel 来说,前面 register 完成以后,就要开始 connect 了,这一步将连接到服务端。
private ChannelFuture doResolveAndConnect (final SocketAddress remoteAddress, final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.isDone()) { if (!regFuture.isSuccess()) { return regFuture; } return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else { .... } }
这里大家自己一路点进去,我就不浪费篇幅了。最后,我们会来到 AbstractChannel 的 connect 方法:
@Override public ChannelFuture connect (SocketAddress remoteAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, promise); }
我们看到,connect 操作是交给 pipeline 来执行的。进入 pipeline 中,我们会发现,connect 这种 Outbound 类型的操作,是从 pipeline 的 tail 开始的:
前面我们介绍的 register 操作是 Inbound 的,是从 head 开始的
@Override public final ChannelFuture connect (SocketAddress remoteAddress, ChannelPromise promise) { return tail.connect(remoteAddress, promise); }
接下来就是 pipeline 的操作了,从 tail 开始,执行 pipeline 上的 Outbound 类型的 handlers 的 connect(…) 方法,那么真正的底层的 connect 的操作发生在哪里呢?还记得我们的 pipeline 的图吗?
从 tail 开始往前找 out 类型的 handlers,每经过一个 handler,都执行里面的 connect() 方法,最后会到 head 中,因为 head 也是 Outbound 类型的,我们需要的 connect 操作就在 head 中,它会负责调用 unsafe 中提供的 connect 方法:
public void connect ( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); }
接下来,我们来看一看 connect 在 unsafe 类中所谓的底层操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 @Override public final void connect ( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { ...... boolean wasActive = isActive(); if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0 ) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run () { ChannelPromise connectPromise = AbstractNioChannel.this .connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } promise.addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null ) { connectTimeoutFuture.cancel(false ); } connectPromise = null ; close(voidPromise()); } } }); } } catch (Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); } }
如果上面的 doConnect 方法返回 false,那么后续是怎么处理的呢?
在上一节介绍的 register 操作中,channel 已经 register 到了 selector 上,只不过将 interestOps 设置为了 0,也就是什么都不监听。
而在上面的 doConnect 方法中,我们看到它在调用底层的 connect 方法后,会设置 interestOps 为 SelectionKey.OP_CONNECT
。
剩下的就是 NioEventLoop 的事情了,还记得 NioEventLoop 的 run() 方法吗?也就是说这里的 connect 成功以后,这个 TCP 连接就建立起来了,后续的操作会在 NioEventLoop.run()
方法中被 processSelectedKeys()
方法处理掉。
bind 过程分析 说完 connect 过程,我们再来简单看下 bind 过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null ) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { ...... } }
然后一直往里看,会看到,bind 操作也是要由 pipeline 来完成的:
// AbstractChannel
@Override public ChannelFuture bind (SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
bind 操作和 connect 一样,都是 Outbound 类型的,所以都是 tail 开始:
@Override public final ChannelFuture bind (SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
最后的 bind 操作又到了 head 中,由 head 来调用 unsafe 提供的 bind 方法:
@Override public void bind ( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
感兴趣的读者自己去看一下 unsafe 中的 bind 方法,非常简单,bind 操作也不是什么异步方法,我们就介绍到这里了。本知识非常简单,就是想和大家介绍下 Netty 中各种操作的套路。