最近在看netty的东西,为了学以致用,顺带看了下netty作为dubbo传输层的一种实现是怎么应用的
Netty体系结构
Netty 是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服务器
和客户端。Netty屏蔽了一些Java NIO上的细节,业务只要实现Netty暴露出来的回调接口,即可方便实现业务逻辑。
Netty主要有以下几个核心概念
- Channel,原生socket的包装,暴露了一些方便实用的接口
- EventLoop,Netty核心处理逻辑,绑定到一个线程上,处理Channel的整个生命周期
- ChannelFuture,提供对异步结果处理的若干回调接口
- ChannelHandler,Channel生命周期各节点的回调接口,类似filter之于servlet,interceptor之于spring mvc
- ChannelPipeline,ChannelHandler的处理链
Dubbo中的应用
dubbo在服务端传输层附近的主要工作是:
- 数据传输上,protocol调用信息(方法名,参数等)包装成invocation,invocation中会有dubbo协议维护的额外信息(比如attachment),然后交给序列化层序列化,然后交给netty传输
- 调用关系上,通过ProxyFactory讲业务service实现包装成invoker,再由protocol层包装成exporter对外提供网络服务
消费端反过来即可
netty服务由NettyServer对外暴露,DubboProtocol根据invocation找到invoker,由invoker调用具体service实现
代码分析开始
server启动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
|
写过netty程序的人应该很熟悉这里了,一个典型的netty应用程序启动流程
- 初始化ServerBootstrap
- 初始化两个NioEventLoopGroup,一个用于接受连接,一个用于处理业务逻辑
- 安插3个ChannelHandler
ChannelHandler
dubbo使用了3个ChannelHandler,其中2个解编码器,一个业务handler,一个一个说
NettyCodecAdapter
适配了解码器和编码器,内部类包装了构造的时候传进去的Codec2对象,底下由这个Codec2实现解码和编码
1
2
3
4
5
6
7
8
9
|
private class InternalEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
...
codec.encode(channel, buffer, msg);
...
}
}
|
1
2
3
4
5
6
7
8
|
private class InternalDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
...
msg = codec.decode(channel, message);
...
}
|
codec可以找到跟dubbo协议相关的DubboCodec类,在重载的decodeBody方法中,包装DecodeableRpcInvocation,塞进Request.data中,返回这个Request给下级handler
NettyServerHandler
NettyServerHandler是个ChannelInboundHandler,既然是个ChannelInboundHandler,那么找数据处理逻辑肯定奔着channelRead去了
1
2
3
4
5
6
7
8
9
|
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
|
这个内部handler由构造的时候传入,看上层代码发现NettyServer本身也是ChannelHandler,继续跟踪会发现实际是DubboProtocol中的一个匿名内部类的对象requestHandler经过几层包装而来
DecodeHandler
transport层(面向message),这个handler解到上级decoder解码后的request(内部data是一个DecodeableRpcInvocation对象),进入received逻辑
1
2
3
4
5
6
7
8
|
public void received(Channel channel, Object message) throws RemotingException {
...
if (message instanceof Request) {
decode(((Request) message).getData());
}
...
handler.received(channel, message);
}
|
很明显会命中这个if,对data进行decode
1
2
3
4
5
6
7
|
private void decode(Object message) {
if (message != null && message instanceof Decodeable) {
...
((Decodeable) message).decode();
...
} // ~ end of if
} // ~ end of method decode
|
进入DecodeableRpcInvocation的decode方法(太长,不贴了),主要步骤是读取方法名/参数类型/参数/attachment,完了之后这个DecodeableRpcInvocation算是正式可用了
结束之后调用内部handler继续
HeaderExchangeHandler
exchanger层(面向request/response),最终调用构造的时候接到的DubboProtocol.reply方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
public void received(Channel channel, Object message) throws RemotingException {
...
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
}
...
}
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
...
// find handler by message class.
Object msg = req.getData();
try {
// handle data.
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
|
DubboProtocol
protocol层(面向invoker, invocation),拿到上层解析完的invocation,找到invoker,调用代理的service实现
1
2
3
4
5
6
7
8
9
10
|
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
...
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
...
}
|
自此,从网络字节流invocation的逐级转化完成,接下去就是invoker调用service的过程,下一篇继续
Author
hbprotoss
LastMod
2018-10-31
License
本作品采用知识共享署名-非商业性使用-禁止演绎 3.0 未本地化版本许可协议进行许可。