Redis 服务端启动与命令执行、 IO 多线程处理、集群建立及选举
2021-09-02 21:56:32 0 举报
redis 6.0 源码流程
作者其他创作
大纲/内容
connSocketSetReadHandler()
7. initThreadedIO()
25. 根据命令名找到对应的redisCommand26. 如果 redis 为集群模式,需要调用 getNodeByQuery() 找到 key 所在的 slot 是否应该交给当前服务端处理,如是之后调用对应的处理函数
fe-rfileProc()
18. aeCreateFileEvent()
1. main()
7. 注册写处理器,等待集群内其他节点可写时发送数据
11. aeMain()
setCommand()
7. clusterLoadConfig()
10. aeMain()
17. 客户端的连接建立后注册监听可读事件,设置读事件处理函数为 readQueryFromClient()
21. processInputBuffer()
7.1 createClusterNode()
acceptTcpHandler()
connSocketRead()
33. 子线程执行完毕,主线程遍历 server.clients_pending_read 列表,flags 为 CLIENT_PENDING_COMMAND 的客户端命令已经解析完成,主线程执行其命令即可
18. 客户端的连接建立后注册监听可读事件,设置读事件处理函数为 readQueryFromClient()
5. initServer()
8. clusterReadHandler()
12. 开始处理事件
12. 客户端连接 redis 正常端口,触发 AE_READABLE 事件
22. 将从 socket 中读取到数据转化为命令名及其参数
3. 解析 redis 命令的数组 redisCommandTable,将其添加到字典中。redisCommand 结构体定义在 server.h 文件中,主要需要关注的是命令名称字符串 name,及其对应的 redisCommandProc 处理函数指针 proc
9. aeCreateFileEvent()
20. 收到足够票数后进行主从切换
6. aeCreateEventLoop()
20.beforesleep()
25. readQueryFromClient()
35. 被邀请节点将 clusterReadHandler 函数作为事件处理器,监听该连接上的 AE_READABLE 事件
24. writeToClient()
36. 连接建立,触发发起邀请的集群节点的 AE_WRITABLE 事件
connection.h
4.1 clusterGetSlaveRank()
7. 根据 server.io_threads_num 配置创建 IO 线程8. pthread_create() 创建 IO 线程,并指定 IOThreadMain() 函数为 IO 线程运行的方法
8. listenToPort()
19. readQueryFromClient()
28.2 createAddNode()
19. 客户端发送命令,触发 AE_READABLE 事件
18. clusterBeforeSleep()
25. 根据命令名找到对应的redisCommand26. 如果 redis 为集群模式,需要调用 getNodeByQuery() 找到 key 所在的 slot 是否应该交给当前服务端处理,如是之后调用对应的处理函数,以 set 命令为例
38.发起邀请的节点将最少 3 个集群内其他节点的信息存放到被邀请节点的发送缓冲区,并注册 clusterWriteHandler 函数为写处理器,等待写事件触发发送数据
14. connCreateAcceptedSocket()
networking.c
发起故障转移的 Slave 节点
30. readQueryFromClient()
9. clusterProcessPacket()
Redis 集群建立流程
15. server.cluster->failover_auth_count 计票器自增,并设置 Slave 节点下一周期处理任务的 flags 为 CLUSTER_TODO_HANDLE_FAILOVER
11. aeProcessEvents()
15. clusterDoBeforeSleep()
connSetWriteHandlerWithBarrier()
server.c
11. clusterSendFailoverAuth()
28. setGenericCommand()
aeCreateFileEvent()
10. aeSetBeforeSleepProc()
25. lookupCommand()
邀请新节点加入集群的节点
12. clusterSendMessage()
clusterReadHandler()
该部分为被邀请节点的处理
28. handleClientsWithPendingReadsUsingThreads()
connSocketAccept()
4.2 clusterRequestFailoverAuth()
33. connAccept()
5. clusterBroadcastMessage()
6. clusterSendMessage()
6.InitServerLast()
28.1 createClusterNode()
22. startThreadedIO()
32. processMultibulkBuffer()
30. lookupKeyWrite()
11. 启动 eventLoop 事件处理循环
17. aeCreateFileEvent()
connection.c
3. clusterHandleSlaveFailover()
28. 根据 cluster meet 命令传过来的参数创建代表远端集群节点的 clusterNode,并将其添加到 server.cluster->nodes 字典中。需注意此时并没有执行连接,而是将集群节点的 flags 设置为 CLUSTER_NODE_HANDSHAKE 和 CLUSTER_NODE_MEET,并将 node->link 设置为 NULL, 定时任务触发时才会发起连接
cluster.c
32. clusterAcceptHandler()
Redis 服务端启动及命令执行
14. acceptCommonHandler()
Redis IO多线程处理流程
ae.c
33. 被邀请节点将 clusterConnAcceptHandler 函数作为连接处理器,并调用该处理器处理连接
26.1 getNodeByQuery()
被请求的 Master 节点
26. 如果开启了多线程模式,并且 IO 线程是启动的,则将新连接过来的 client 放在 server.clients_pending_read 列表头部,并将该客户端 flags 设置为 CLIENT_PENDING_READ,不再继续执行 readQueryFromClient() 函数
13. 客户端连接,触发 AE_READABLE 事件
服务端启动
17. connSetReadHandler()
12. aeProcessEvents()
37.1 将 clusterReadHandler 函数作为事件处理器,监听该连接上的 AE_READABLE 事件37.2 发起邀请的节点向被邀请节点发送 MEET 消息
27. c-cmd-proc(c)
21. handleClientsWithPendingWritesUsingThreads()
2. clusterCron()
29. IOThreadMain()
2. initServerConfig()
6. 创建事件循环实例 eventLoop7. 绑定监听指定端口8. 注册 serverCron() 为事件循环处理周期性事件的函数9. 注册 acceptTcpHandler() 为事件循环中处理 TCP 读事件的处理器10. 注册 beforeSleep() 为事件循环每个事件处理开始前的函数,其中包括过期数据的淘汰
Slave 节点轮询周期触发
8. aeCreateTimeEvent()
serverCron()
24. processCommand()
42.1 clusterProcessGossipSection()
7. connSetWriteHandlerWithBarrier()
34. processCommand()
34. clusterConnAcceptHandler()
16. processTimeEvents()
29. genericSetKey()
1. processTimeEvents()
16. createClient()
25. fe-rfileProc()
3. populateCommandTable()
clusterCommand()
触发发起请求的 Slave 节点的 AE_READABLE 事件
35. connSetReadHandler()
23. processCommandAndResetClient()
30. 查找 redisDb 中是否已经存在目标 key31. 如果目标key 已经存在,则覆盖其 value。具体流程是首先调用 dictFind() 找到目标key 所在的 dictEntry, 然后调用 dictSetVal() 设置新 value,最后释放相关对象内存空间
32. 按照相关配置将命令同步到 AOF 及 Slave
32. 处理 client 传输过来的数据,但是不会执行 flags 为 CLIENT_PENDING_READ 的客户端的命令,而是将客户端 flags 更新为 CLIENT_PENDING_COMMAND 后返回,统一交给主线程处理
37.2 clusterSendPing()
Slave 节点周期任务触发
16. connSetReadHandler()
10. clusterSendFailoverAuthIfNeeded()
callHandler()
26.postponeClientRead()
7. listenToPort()
24. IO 线程运行,根据 io_threads_op 调用函数将数据返回给客户端
clusterWriteHandler()
客户端
37. clusterLinkConnectHandler()
22. processMultibulkBuffer()
21. 如果检测到响应客户端有pending,并且IO线程尚未启动,则调用 startThreadedIO() 启动 IO 线程,并将客户端分配到每个线程的任务队列 io_threads_list 中,然后将线程的操作标志 io_threads_op 设置为 IO_THREADS_OP_WRITE
27.beforesleep()
14. 处理 CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 消息
42.2 clusterSendPing()
26.2 call()
6. clusterInit()
4. loadServerConfig()
32. propagate()
28. 下一个轮询周期,IO线程已启动,将 server.clients_pending_read 中保存的客户端分配到每个线程的任务队列 io_threads_list 中,并将线程的操作标志 io_threads_op 设置为 IO_THREADS_OP_READ ,主线程等待子线程执行完毕
32. 发起连接,触发被邀请的节点的 AE_READABLE 事件
37.1 connSetReadHandler()
Redis 选举流程
4. 根据从节点复制偏移量计算本节点发起故障转移的优先级,优先级数值越小,则其发起请求的选举时间越靠前,选举时间用 server.cluster->failover_auth_time 保存。如果执行选举的时间未到,先 return,等待下一轮周期任务继续5. 广播 CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 消息
db.c
40. 被邀请的节点接收发起邀请的节点的 PING 数据,被触发 AE_READABLE 事件
15. acceptCommonHandler()
7. 根据 server.cluster_configfile 配置文件创建代表自身的clusterNode 集群节点,并将其添加到 server.cluster->nodes 字典中8. 指定本服务端作为集群节点的监听端口,一般为正常 redis 端口号 +100009. 为监听端口创建的文件句柄添加 AE_READABLE 事件的处理器 clusterAcceptHandler
20. connRead()
14. clusterProcessPacket()
33. processCommandAndResetClient()
connhelpers.h
28. clusterStartHandshake()
15. createClient()
触发被请求的节点的 AE_READABLE 事件
7.2 clusterAddNode()
周期任务触发
31.1 createClusterLink()
18. 客户端发送 custer meet 命令 ,触发 AE_READABLE 事件
31. 客户端 flags 已经设置为 CLIENT_PENDING_READ,继续执行 readQueryFromClient() 函数处
t_string.c
32.processInputBuffer()
19. clusterHandleSlaveFailover()
30. clusterCron()
23. IOThreadMain()
10. 启动 eventLoop 事件处理循环
connSocketConnect
31.2 connConnect()
29. processTimeEvents()
20. clusterFailoverReplaceYourMaster()
12. 只有 Master 节点会响应 Slave 节点一个 CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 消息
41. 被邀请节点处理发起邀请的节点传输过来的数据42. 对于 MEET 消息中的 Gossip 数据,如果其中的节点信息在本节点已经有记录了,则检查是否需要更新,如果其中的节点在本节点没有,则将添加到本节点的 server.cluster->nodes 字典中,最后调用 clusterSendPing() 函数响应一个 PONG 消息
connWrite()
13. clusterReadHandler()
17. beforesleep()
38. clusterSendMessage()
13. connCreateAcceptedSocket()
41. clusterProcessPacket()
39. 发起邀请的节点的 AE_WRITEABLE 事件被触发
30. IO 线程运行,根据 io_threads_op 调用目标函数
31. dbOverwrite()
被邀请的节点
31. 本节点邀请其他节点加入集群,并将 clusterLinkConnectHandler 函数作为事件处理器,监听处理连接上的 AE_WRITABLE 事件
readQueryFromClient()
8. pthread_create()
31.postponeClientRead()
0 条评论
下一页