Netty 源码流程图
2022-11-17 12:21:24 0 举报
Netty 服务端源码流程图
作者其他创作
大纲/内容
ChannelPipeline p = channel.pipeline()
开始管道注册
监听端口
线程组
head 结点
ChannelHandler
这里反射调用的就是 NioServerSocketChannel.class 这个类 的无餐构造方法
具体调用
读取数据完成后调用
设置子线程组
分配一个 byteBuf 牵扯到零拷贝
获取
原生 NIO方法 等待时间发生 但是传入了等待时间
next.invokeChannelRegistered()
((ChannelInboundHandler) handler()).channelRegistered(this)
匿名方法 run
Runnable task = pollTask()
写(事件)
ch.configureBlocking(false)
调用放入 queue的注册任务
获取 socketChannel
设置属性
HeadContext
加入一个handler piple管道的样子
this.childGroup = childGroup
ch.eventLoop().execute(new Runnable()
processSelectedKeys()
加入 childHandler childHandler 是创建启动类 ServerBootstrap 时候加入的 new ChannelInitializer()
打开 Selector后 赋值给 selector
核心方法 进行注册
这里就可以看出 NioEventLoopGroup 实例 里面有个现成数组 数组放的是 NioEventLoop
selector = selectorTuple.selector
doBind(localAddress)
继承
激活管道注册 传入 handler 头节点
SelectorProvider.provider()
void channelInactive(ChannelHandlerContext ctx)
unsafe.read()
void channelActive(ChannelHandlerContext ctx)
Channel注销时调用
ServerBootstrap bootstrap = new ServerBootstrap()
pipeline.invokeHandlerAddedIfNeeded()
原生 NIO
创建管道头节点
创建 NioEventLoopGroup
获取 pipeline 这里获取的 是serversocketcChannelPipeline 因为 还没有调用 doReadMessages方法 socketChanel 还没创建
设置 NioServerSocketChannel.class 构造方法
ctx.fireChannelRegistered()
for循环
safeExecute(task)
children = new EventExecutor[nThreads]
io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
实际调用
return serverSocketChannel.accept()
runAllTasks(ioTime * (100 - ioRatio) / ioRatio)
这里会通过链表遍历 需要调用的 handler
childGroup 线程组 注册
return constructor.newInstance()
child.pipeline().addLast(childHandler)
逐一调用
处理事件
创建任务队列 Queue初始化LinkedBlockingQueue
SelectStrategy.SELECT
持有属性
调用
获取 selector
public void initChannel(final Channel ch)
单线程 事件循环线程
执行run 方法
ChannelInboundHandler
register0(promise)
创建 NioEventLoop
返回
创建 Pipeline管道
run()
select 次数自增
startThread()
for 死循环
ChannelHandlerContext
回调方法
查找 handler
new NioEventLoopGroup
provider.openSelector()
方法调用 注册
打开 Selector 等于调用原生 NIO Selector.open()
TailContext
selectCnt ++
调用父类构造方法
config().group().register(channel)
next().register(channel)
业务Handler
寻找 handler
return ctx
强转为 SocketChannel 然后 socketChannel的 pipeline 加入一个handler
相互指向引用节点
io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
获取的是 socketChannel 的 pipeline 因为已经注册了
创建的线程池也传入到了 NioEventLoop中使用
把创建的ServerSocektChannel 赋值给属性
SingleThreadEventExecutor.this.run()
设置为非阻塞
SingleThreadEventExecutor
pipeline.fireChannelRead(byteBuf)
初始化并且注册
执行启动线程
连接(事件)
ServerBootstrapAcceptor
channelFactory(new ReflectiveChannelFactory<C>(channelClass))
ChannelFuture regFuture = initAndRegister()
创建管道尾结点
核心就是NIO 原生写入
void channelReadComplete(ChannelHandlerContext ctx)
通过构造方法 创建 NioServerSocketChannel.class 实例
会根据操作系统 一般linux 调用 new EpollSelectorImpl
加入
获取事件 整型值
handler != null加入到 pipeline 最后
数据读取完毕后调用
void channelUnregistered(ChannelHandlerContext ctx)
打开 Selector
addTask(task)
这里有连接事件 读写事件才开始循环
等待回调方法 进行注册
绑定端口 并且阻塞同步
NioEventLoop
// 多路复用器Selector selector// LinkedBolockingQueue 阻塞队列Queue<Runnable> taskQueu
调用父类构造
判断 executor == null
javaChannel()
加入的就是这个 匿名方法 从 queue 获取的时候会执行这个方法
Netty parent 和 child 线程组
获取 pipeline
设置属性 ACCETP 感兴趣的事件整型值 但是还没有设置到 selector
调用方法 创建 pipeline
调用之后 socketChannel 当前的pipeline
for (int i = 0; i < selectedKeys.size; ++i)
启动线程
进行绑定端口
判断是否写事件
设置 parentGroup
阻塞队列api 加入
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED))
因为创建的是 ServerBootStrap 所以 config 是 ServerBootStrapgroup 就是 设置的线程组
io.netty.channel.AbstractChannel.AbstractUnsafe#register
判断传入 nThreads == 0 条件成立会使用默认 DEFAULT_EVENT_LOOP_THREADSNettyRuntime.availableProcessors() * 2 CPU核心数量 * 2
构造方法 传入null Executor
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
SelectionKey k = selectedKeys.keys[i]
io.netty.util.concurrent.SingleThreadEventExecutor#execute
进行调用
加入任务
taskQueue.offer(task)
Channel
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
for (;;)
运行全部任务 从阻塞队列 queue
private final List<Object> readBuf = new ArrayList<Object>()
激活调用 ChannelRead
从队列获取任务
原生NIO 参考代码 // NIO的非阻塞是由操作系统内部实现的,底层调用了linux内核的accept函数 SocketChannel socketChannel = serverSocket.accept();
SelectorTuple selectorTuple = openSelector()
true
select(wakenUp.getAndSet(false))
NioServerSocketChannel()
调用handler channelRegistered方法 这里就是开发人员写 handler 重写 channelRegistered方法 会被调用
实际调用方法
int readyOps = k.readyOps()
线程池数组
void channelRegistered(ChannelHandlerContext ctx)
regFuture.isDone()
this.constructor = clazz.getConstructor()
for(;;)
addLast0(newCtx)
selector.select(timeoutMillis)
doReadMessages(readBuf)
Executor
super(parent)
for (int i = 0; i < size; i ++)
客户端连接服务端的时候会回调该方法
AbstractNioChannel
触发重写 handler 注册的接口 方法
读(事件)
这里会触发注册逻辑 真正的客户端发送数据是注册后 才开始有的读写,第一次连接只是先注册pipeline 是serverSocketChannel 的pipeline
循环列表
pipeline
read 方法所属类 NioMessageUnsafe 属性 一个readBuf 集合
执行 run方法
使用NioServerSocketChannel作为服务器的通道实现
进行绑定
客户端断开连接调用
io.netty.channel.nio.AbstractNioChannel#doRegister
AbstractChannelHandlerContext ctx = this
执行放到 run里面的 Runnable
io.netty.channel.DefaultChannelPipeline.HeadContext#channelRegistered
super.group(parentGroup)
阻塞队列
触发重写 handler 读取数据完毕接口方法
调用这个实例的方法
Channel child = (Channel) msg
原生NIO
ctx = ctx.next
offerTask(task)
AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx;
获取 socketChannel
替换后
head.next = tail;tail.prev = head;
执行 select方法 不是完全阻 有超时时间传入塞
触发重写 handler 连接接口方法
processSelectedKeysOptimized()
线程组执行 Runnable
执行注册
io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
传入的 NIO SelectorProvider.provider() 赋值给 provider
打开 serverSocketChannel 原生 NIO
执行绑定操作
参考代码 ServerBootstrap 启动类
调用完注册后 socketChannel会加入 开发人员业务的handler
设置 channel工厂 把 NioServerSocketChannel.class 传入
遍历调用 pipeline 的handler
private final Queue<Runnable> taskQueue
交给线程池一个任务
原生NIO代码 注册 selector
重载构造
for循环
.channel(NioServerSocketChannel.class)
doStartThread()
.childHandler(new ChannelInitializer<SocketChannel>()
包装为 AbstractChannelHandlerContext
io.netty.channel.socket.nio.NioSocketChannel#doReadBytes
ChannelHandler handler = config.handler()
Nio 原生方法 处理事件key
fireChannelRead
ChannelInitializer
next() 从线程获取一个 EventLoop 进行注册初始化里面有很多 只注册一个
核心代码 调用业务 handler处理读取的数据
allocHandle.lastBytesRead(doReadBytes(byteBuf))
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
allocHandle.allocate(allocator)
run方法
创建通道初始化对象,设置初始化参数
初始化线程池
bind(new InetSocketAddress(inetPort))
服务端启动流程 创建对象
Selector selector = this.selector;
protected void initChannel(SocketChannel ch)
next.invokeChannelRead(m)
serversocketchannel 注册回调
激活管道注册
通过注册的 chanel 的线程组执行线程任务ServerSocketChannel
处理事件 key
读取 byteBuf 数据
pipeline.fireChannelRead(readBuf.get(i))
p.addLast(new ChannelInitializer<Channel>()
NioEventLoopGroup整体类内部结构
如果初始化注册成功
io.netty.bootstrap.ServerBootstrap#init
tail = new TailContext(this)
NIO
客户端
因为 pipeline 是 serverSocketChannel 这里会调用regester0 注册时候放入的 ServerBootstrapAcceptor 然后进行注册
加入到最后,但是不是尾部,是 pipeline 的最后,tail的前一个
ChannelPipeline pipeline = pipeline()
newSocket(DEFAULT_SELECTOR_PROVIDER)
执行调用
这里注意 是 socketChannel (客户端)不是 serverScoketChannel (服务端)
实现
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
SocketChannel ch = SocketUtils.accept(javaChannel())
ServerBootstrap
// EventLoopGroup 接口 EventLoopGroup childGroup
this.ch = ch
doReadBytes(byteBuf)
安全执行任务
msg是传过来的 socketChannel
provider.openServerSocketChannel()
this.readInterestOp = readInterestOp
head = new HeadContext(this)
父类构造方法 传入ServerSocektChannel 和 ACCEPT事件
executor.execute(new Runnable()
return (SocketChannel) super.javaChannel()
chanel 就是工厂创建的ServerSocketChannel
执行绑定 doBing0
io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
doRegister()
SocketChannel sc = iterator.next(); ByteBuffer byteBuffer = ByteBuffer.allocate(128); // 非阻塞模式read方法不会阻塞,否则会阻塞 int len = sc.read(byteBuffer);
for 循环
@Override protected void initChannel(SocketChannel ch) throws Exception { //对workerGroup的SocketChannel设置处理器 ch.pipeline().addLast(new NettyServerHandler()); }
初始化 channel
SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider()
循环对线程池数组进行赋值
if ((readyOps & SelectionKey.OP_WRITE) != 0)
NIO
判断是否是读连接事件
pipeline.fireChannelReadComplete()
ChannelPipeline pipeline = ch.pipeline()
io.netty.channel.nio.NioEventLoop#run
参考原生NIO代码
Channel注册候调用
注册
执行读取
获取设置的 options属性 进行能够设置
io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
读取消息
Runnable 肯执行run方法
再次调用父类 并且传入 SelectionKey.OP_READ 事件整型值
进行调用 ChannelRead 方法
加入 pipeline 最后
这个不是 线程池的 execute
循环所有 selectedKyes
属性
进行循环 readBuf
这个方法会调用 init 初始化时候放入的 handler 比较繁琐 直接看 回调方法
NioEventLoopGroup
// 线程数组存放的是 NioEventLoopEventExecutor[] children (NioEventLoop)
provider = selectorProvider
调用 group方法 设置两个线程组 bossGroup(parentGroup)workerGroup(childGroup)
参考NIO代码
处理 连接 | 读| 写等事件 key
childGroup.register(child).addListener(new ChannelFutureListener()
ChannelFuture cf = bootstrap.bind(9000).sync()
连接 会产生 ACCEPT事件
读取数据调用
int size = readBuf.size()
加入 pipleline
创建线程池数组 长度是nThreads
核心代码 把 socketChannel 数据都写到 byteBuf
pipeline = newChannelPipeline();
AbstractChannelHandlerContext.invokeChannelRegistered(head)
客户端连接时调用
父类构造方法
绑定
this(newSocket(DEFAULT_SELECTOR_PROVIDER))
eventLoop.execute(new Runnable()
加入 ChannelInitializer
执行绑定
for (int i = 0; i < nThreads; i ++)
初始化
pipeline.fireChannelRegistered()
init(channel)
创建 socket 传入的是 原生NIOSelectorProvider.provider()
task.run()
寻找 Inbound handler
ChannelPipeline
tail 结点
channel = channelFactory.newChannel()
pipeline.addLast(handler)
final ChannelPipeline pipeline = pipeline()
taskQueue = newTaskQueue(this.maxPendingTasks)
调用创建的工厂
private final EventExecutor[] children
这里是创建的 new ServerBootstrap
调用父类
执行注册 regester0方法 的时候才会 回调这个 加入handler
加入到队列
findContextInbound(MASK_CHANNEL_REGISTERED)
把回调方法 要加入的handler加入 并且替换 channelInitializer handler
channel.eventLoop().execute(new Runnable()
创建 NioSocketChannel
pipeline.fireChannelActive()
处理异常时调用
0 条评论
下一页