Netty服务端核心源码
2024-01-21 18:50:27 0 举报
Netty服务端核心源码
作者其他创作
大纲/内容
执行添加到任务队列中的任务
添加Handler,创建ServerBootstrap时添加的业务Handlerchild.pipeline().addLast(childHandler);
new NioEventLoop()
processSelectedKeysOptimized();
head = HeadContext
NioServerSocketChannel注册next().register(channel);
next
NioEventLoopGroup()结构图
newTaskQueue(queueFactory)
pre
final SelectionKey k = selectedKeys.keys[i];
super.next()
final Channel child = (Channel) msg;
创建NioEventLoop选择器,默认轮询策略chooser = chooserFactory.newChooser(children);
execute(Runnable task)
chooser
当前是NioSocketChannel进行注册时
doRegister();
newSocket
轮询策略,从NioEventLoop数组中轮询选择一个executors[idx.getAndIncrement() & executors.length - 1];
this.localAddress = localAddress;
进入父类:AbstractNioChannelsuper(parent);
阻塞一段时间,等待事件发生int selectedKeys = selector.select(timeoutMillis);
最后pipeline结构
initChannel(ctx)
SingleThreadEventLoop
doReadMessages
NioSocketChannel的pipeline结构
next()轮询
NioServerSocketChannel的pipeline结构
tail
ServerBootstrapAcceptor
head
反射创建NioServerSocketChannelchannel = channelFactory.newChannel();
NioByteUnsafe.read
executor
run() 循环处理事件和任务for (;;)
FirstChannelInboundHandler
AbstractNioChannel构造函数
unsafe = new NioMessageUnsafe();
SingleThreadEventExecutor
register0(promise);
p.addLast(new ChannelInitializer<Channel>())
ChannelPipeline p = channel.pipeline();
真正创建线程执行任务executor.execute(new Runnable())
tail =TailContext
ChannelInitializer
this.ch = ch;
tail =TailContext
selector
接收连接事件SelectionKey.OP_ACCEPT
处理可能发生的事件processSelectedKeys();
doBind(localAddress)
MultithreadEventLoopGroup.register(channel);
AbstractNioChannel.doRegister();
进入父类
super.group(parentGroup);this.childGroup = childGroup;
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
serverSocketChannel.accept();
获取事件类型int readyOps = k.readyOps();
selector = selectorTuple.selector;
super进入父类
ch.configureBlocking(false);
int localRead = doReadMessages(readBuf);
即没有任务,也没有事件发生case SelectStrategy.SELECT:
MultithreadEventExecutorGroup
provider = selectorProvider;
final Channel channel = regFuture.channel();
unwrappedSelector = selectorTuple.unwrappedSelector;
eventLoop.execute(Runnable command)=SingleThreadEventExecutor.execute()
SingleThreadEventLoop.register(channel);
unsafe = newUnsafe();
注册childchildGroup.register(child)
chooser.next()
PowerOfTwoEventExecutorChooser.next()
创建任务队列return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
unsafe.read();
tail
用来执行pipeline,处理数据unsafe = newUnsafe();new NioByteUnsafe();
SingleThreadEventExecutor.this.run();=NioEventLoop.run()
handler().handlerAdded(this);
new NioEventLoop()填充属性
开启线程doStartThread()
init(channel);
执行添加到任务队列中的任务runAllTasks();
taskQueue
......
NioServerSocketChannel()构造函数
AbstractChannel.register()
MultithreadEventLoopGroup
NioEventLoop
pipeline.invokeHandlerAddedIfNeeded();
task.execute();
和NioServerSocketChannel注册逻辑一致从childGroup中轮询选择一个EventLoop进行注册next().register(channel);
ChannelFuture regFuture = config().group().register(channel);
new NioEventLoopGroup()
循环处理事件for (int i = 0; i < selectedKeys.size; ++i)
ChannelFuture regFuture = initAndRegister();
创建NioSocketChannel
NioMessageUnsafe.read
创建属于NioSocketChannel的pipelinepipeline = newChannelPipeline();
处理接收连接和读事件if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
创建byteBufbyteBuf = allocHandle.allocate(allocator);
selectStrategy = strategy;
创建SocketChannelSocketChannel ch = SocketUtils.accept(javaChannel());
执行pipelinepipeline.fireChannelRead(readBuf.get(i));
拿到pipelinefinal ChannelPipeline pipeline = pipeline();
ChannelInitializer.handlerAdded(this);
bootstrap.localAddress(new InetSocketAddress(\"127.0.0.1\
将主注册任务放入任务队列addTask(task);
this.readInterestOp = readInterestOp;
bootstrap.childHandler( ChannelHandler childHandler)
doReadBytes(byteBuf)
pipeline =DefaultChannelPipeline
new SingleThreadEventExecutor()填充属性
NioServerSocketChannel()
AbstractNioChannel()构造函数
channelRead()
先添加到任务队列,线程从队列中获取任务异步执行
pipeline = newChannelPipeline();
bootstrap.bind()
创建ServerSocketChannelprovider.openServerSocketChannel();
调用ChannelInitializer的initChannel方法往pipeline中添加Handler方法initChannel((C) ctx.channel());移除ChannelInitializerpipeline.remove(this);
bootstrap.channel(NioServerSocketChannel.class)
执行pipeline中的Handlerpipeline.fireChannelRead(byteBuf);
处理读数据事件SelectionKey.OP_READ
PendingHandlerCallback task = pendingHandlerCallbackHead;
线程任务执行器,用来创建线程执行任务executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
参数DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();this(newSocket(DEFAULT_SELECTOR_PROVIDER));
this()
head = HeadContext
ServerBootstrap
this.addTaskWakesUp = addTaskWakesUp; false
0 条评论
下一页