优势:并发高,传输快,使用简单
为什么传输快?
零拷贝技术: 传统IO要经历4次复制,4次切换上下文,把文件复制到内核里面的缓存区,再复制到用户态里的缓冲区,再复制到socket里的缓冲区(内核态),最后再复制到网卡.零拷贝有两种方式,mmap可以直接从内核缓存复制到socket,sendfile可以直接从内核缓存复制到网卡. 不光netty,kafka和nginx等中间件都使用了零拷贝.
IO模型: 基于Reactor的IO模型,有单Reactor单线程和单Reactor多线程和主从Reactor多线程模型.
单线程模型有四个部分,selector负责监听client的请求并转发给dispatcher,dispatcher收到请求后会将其分发给对应的handler,如果是建立连接的请求就会转发到acceptor,acceptor建立连接后会创建对应的handler,其后该连接的请求,例如read和send等都会由对应的handler处理,handler处理完再把数据返回给client.单线程的局限性在于处理任务的handler是单线程的,对于同一个连接的多个请求只能串行处理,有性能瓶颈.
多线程模型主要区别在于handler,handler只负责响应请求,不再处理具体任务,它会把执行过程交由worker线程处理,如果有多个请求就有多个worker,可以并发处理.
主从模型是为了解决acceptor的性能瓶颈问题,acceptor不再只是一个线程,而是拓展成了一个线程池.在这个模型下,所有请求都会经过主reactor(有个疑惑,请求直接交由从reactor不好嘛,为啥要多转发一次),主reactor只负责建立连接的请求,使用acceptor线程池处理,然后把读写等请求交由从reactor处理.
Bootstrap/ServerBootstrap: 启动基类
channel: 对应网络中的socket,作用包括维护当前连接的状态,连接的端口号,数据缓冲区的大小等,每个request对应一个channel,数据经由channel传入.
EventLoop: 多路复用程序,轮询各个channel,获取其中的事件并予以响应,是Reactor模型的主体.有多种实现,例如基于select的NioEventLoop,基于Epoll的EpollEventLoop
NioEventLoop: 维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务.
I/O任务,即selectionKey中就绪事件,例如read,write,accept,connect等,由processSelectedKeys方法触发。
非I/O任务,添加到taskQueue中的任务,如register(),bind()等任务
在 Netty 的每一个 NioEventLoop 中都有一个 TaskQueue,设计它的目的是在任务提交的速度大于线程的处理速度的时候起到缓冲作用。或者用于异步地处理 Selector 监听到的 IO 事件。
Selector: NioEventLoop中负责监听事件的线程,一个NioEventLoop对应一个selector,一个selector可监听多个channel.
NioEventLoopGroup: 主要负责管理eventloop的生命周期,内部维护了一组eventloop,和eventloop的关系可以理解为线程池和线程的关系.netty中分为bossGroup和workerGroup,bossGroup是负责建立连接的线程池(相当于主reactor),workerGroup是负责处理IO等请求的线程池(相当于从reactor).线程池的线程数默认为cpu核心数的两倍
BossGroup工作过程:
- 轮询注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
- 处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel,并将其注册到 WorkerGroup 中某个线程上的 Selector 上
- 再去以此循环处理任务队列中的下一个事件
WorkerGroup工作过程
- 轮询注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
- 在对应的 NioSocketChannel 上处理 read/write 事件
- 再去以此循环处理任务队列中的下一个事件
ChannelFuture: 异步返回事件处理的结果
ChannelHandler: 就是Reactor中的handler,具体处理channel传过来的事件的组件.
ChannelHandlerContext: 保存Channel相关的上下文信息,同时关联一个ChannelHandler对象
ChannelPipeline: 维护了一个channelHandler的双向链表,channelHandler是以链式调用的形式工作的,pipeline和channel是一对一的关系.
Netty服务端创建过程:
/**
* 需要的依赖:
* <dependency>
* <groupId>io.netty</groupId>
* <artifactId>netty-all</artifactId>
* <version>4.1.52.Final</version>
* </dependency>
*/
public static void main(String[] args) throws InterruptedException {
// 创建 BossGroup 和 WorkerGroup
// 1. bossGroup 只处理连接请求
// 2. 业务处理由 workerGroup 来完成
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 配置参数
bootstrap
// 设置线程组
.group(bossGroup, workerGroup)
// 说明服务器端通道的实现类(便于 Netty 做反射处理)
.channel(NioServerSocketChannel.class)
// 设置等待连接的队列的容量(当客户端连接请求速率大
// 于 NioServerSocketChannel 接收速率的时候,会使用
// 该队列做缓冲)
// option()方法用于给服务端的 ServerSocketChannel
// 添加配置
.option(ChannelOption.SO_BACKLOG, 128)
// 设置连接保活
// childOption()方法用于给服务端 ServerSocketChannel
// 接收到的 SocketChannel 添加配置
.childOption(ChannelOption.SO_KEEPALIVE, true)
// handler()方法用于给 BossGroup 设置业务处理器
// childHandler()方法用于给 WorkerGroup 设置业务处理器
.childHandler(
// 创建一个通道初始化对象
new ChannelInitializer<SocketChannel>() {
// 向 Pipeline 添加业务处理器
@Override
protected void initChannel(
SocketChannel socketChannel
) throws Exception {
socketChannel.pipeline().addLast(
new NettyServerHandler()
);
// 可以继续调用 socketChannel.pipeline().addLast()
// 添加更多 Handler
}
}
);
System.out.println("server is ready...");
// 绑定端口,启动服务器,生成一个 channelFuture 对象,
// ChannelFuture 涉及到 Netty 的异步模型,后面展开讲
ChannelFuture channelFuture = bootstrap.bind(8080).sync();
// 对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
/**
* 自定义一个 Handler,需要继承 Netty 规定好的某个 HandlerAdapter(规范)
* InboundHandler 用于处理数据流入本端(服务端)的 IO 事件
* InboundHandler 用于处理数据流出本端(服务端)的 IO 事件
*/
static class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道有数据可读时执行
*
* @param ctx 上下文对象,可以从中取得相关联的 Pipeline、Channel、客户端地址等
* @param msg 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// 接收客户端发来的数据
System.out.println("client address: "
+ ctx.channel().remoteAddress());
// ByteBuf 是 Netty 提供的类,比 NIO 的 ByteBuffer 性能更高
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("data from client: "
+ byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 数据读取完毕后执行
*
* @param ctx 上下文对象
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
// 发送响应给客户端
ctx.writeAndFlush(
// Unpooled 类是 Netty 提供的专门操作缓冲区的工具
// 类,copiedBuffer 方法返回的 ByteBuf 对象类似于
// NIO 中的 ByteBuffer,但性能更高
Unpooled.copiedBuffer(
"hello client! i have got your data.",
CharsetUtil.UTF_8
)
);
}
/**
* 发生异常时执行
*
* @param ctx 上下文对象
* @param cause 异常对象
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// 关闭与客户端的 Socket 连接
ctx.channel().close();
}
}
客户端创建过程
/**
* 需要的依赖:
* <dependency>
* <groupId>io.netty</groupId>
* <artifactId>netty-all</artifactId>
* <version>4.1.52.Final</version>
* </dependency>
*/
public static void main(String[] args) throws InterruptedException {
// 客户端只需要一个事件循环组,可以看做 BossGroup
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// 创建客户端的启动对象
Bootstrap bootstrap = new Bootstrap();
// 配置参数
bootstrap
// 设置线程组
.group(eventLoopGroup)
// 说明客户端通道的实现类(便于 Netty 做反射处理)
.channel(NioSocketChannel.class)
// handler()方法用于给 BossGroup 设置业务处理器
.handler(
// 创建一个通道初始化对象
new ChannelInitializer<SocketChannel>() {
// 向 Pipeline 添加业务处理器
@Override
protected void initChannel(
SocketChannel socketChannel
) throws Exception {
socketChannel.pipeline().addLast(
new NettyClientHandler()
);
// 可以继续调用 socketChannel.pipeline().addLast()
// 添加更多 Handler
}
}
);
System.out.println("client is ready...");
// 启动客户端去连接服务器端,ChannelFuture 涉及到 Netty 的异步模型,后面展开讲
ChannelFuture channelFuture = bootstrap.connect(
"127.0.0.1",
8080).sync();
// 对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
/**
* 自定义一个 Handler,需要继承 Netty 规定好的某个 HandlerAdapter(规范)
* InboundHandler 用于处理数据流入本端(客户端)的 IO 事件
* InboundHandler 用于处理数据流出本端(客户端)的 IO 事件
*/
static class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 通道就绪时执行
*
* @param ctx 上下文对象
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
// 向服务器发送数据
ctx.writeAndFlush(
// Unpooled 类是 Netty 提供的专门操作缓冲区的工具
// 类,copiedBuffer 方法返回的 ByteBuf 对象类似于
// NIO 中的 ByteBuffer,但性能更高
Unpooled.copiedBuffer(
"hello server!",
CharsetUtil.UTF_8
)
);
}
/**
* 当通道有数据可读时执行
*
* @param ctx 上下文对象
* @param msg 服务器端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// 接收服务器端发来的数据
System.out.println("server address: "
+ ctx.channel().remoteAddress());
// ByteBuf 是 Netty 提供的类,比 NIO 的 ByteBuffer 性能更高
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("data from server: "
+ byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 发生异常时执行
*
* @param ctx 上下文对象
* @param cause 异常对象
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// 关闭与服务器端的 Socket 连接
ctx.channel().close();
}
}
select: 基于轮询,时间复杂度为O(n),当IO事件触发后,遍历fdset去查找事件对应的fd.在连接数较大的情况下开销也是线性增长.使用数组存储fdset,连接数上限较小.实现简单,兼容性好.
poll: 和select基本相似,区别在于存储方式是链表,理论上能监听的fd没有数量限制.
epoll: select和poll的改进版,基于事件驱动,时间复杂度为O(1),实现原理是为每个fd注册一个回调函数,当fd对应的设备发生IO事件时,就会调用这个回调函数,将该fd放入一个链表中,然后服务器从该链表中逐个取出fd来处理.性能随着连接数上涨不会有明显衰退,适合高并发情景.
水平触发(level-trggered)
只要文件描述符关联的读内核缓冲区非空,有数据可以读取,就一直发出可读信号进行通知, 当文件描述符关联的内核写缓冲区不满,有空间可以写入,就一直发出可写信号进行通知 LT模式支持阻塞和非阻塞两种方式。epoll默认的模式是LT。select和poll都只支持水平.
边缘触发(edge-triggered)
当文件描述符关联的读内核缓冲区由空转化为非空的时候,则发出可读信号进行通知, 当文件描述符关联的内核写缓冲区由满转化为不满的时候,则发出可写信号进行通知 两者的区别在哪里呢?水平触发是只要读缓冲区有数据,就会一直触发可读信号,而边缘触发仅仅在空变为非空的时候通知一次.
LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表.
八股文时间~
Netty 和 Tomcat 的区别?
Netty 和 Tomcat 最大的区别就在于通信协议,Tomcat 是基于 http 协议的,他的实质是一个基于 http 协议的web容器,但是 Netty 不一样,他能通过编程自定义各种协议,因为 Netty 能够自己编码/解码字节流,完成类似Redis 访问的功能,这就是 Netty 和 Tomcat 最大的区别。
Netty 发送消息有几种方式?
Netty 有两种发送消息的方式:
- 直接写入 Channel 中,消息从 ChannelPipeline 当中尾部开始移动;
- 写入和 ChannelHandler 绑定的 ChannelHandlerContext 中,消息从 ChannelPipeline 中的下一个 ChannelHandler 中移动。
Netty 高性能表现在哪些方面?
- IO 线程模型:同步非阻塞,用最少的资源做更多的事。
- 内存零拷贝:尽量减少不必要的内存拷贝,实现了更高效率的传输。
- 内存池设计:申请的内存可以重用,主要指直接内存。内部实现是用一颗二叉查找树管理内存分配情况。
- 串形化处理读写:避免使用锁带来的性能开销。
- 高性能序列化协议:支持 protobuf 等高性能序列化协议。
什么是 Netty 的零拷贝?
Netty 的零拷贝主要包含三个方面:
- Netty 的接收和发送 ByteBuffer 采用 DIRECT BUFFERS,使用堆外直接内存进行 Socket 读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行 Socket 读写,JVM 会将堆内存 Buffer 拷贝一份到直接内存中,然后才写入 Socket 中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
- Netty 提供了组合 Buffer 对象,可以聚合多个 ByteBuffer 对象,用户可以像操作一个 Buffer 那样方便的对组合 Buffer 进行操作,避免了传统通过内存拷贝的方式将几个小 Buffer 合并成一个大的 Buffer。
- Netty 的文件传输采用了 transferTo 方法,它可以直接将文件缓冲区的数据发送到目标 Channel,避免了传统通过循环 write 方式导致的内存拷贝问题。
Netty 的应用场景有哪些?
典型的应用有:阿里分布式服务框架 Dubbo,默认使用 Netty 作为基础通信组件,还有 RocketMQ 也是使用 Netty 作为通讯的基础。