01_HDFS 源码
2024-05-23 21:22:14 0 举报
Hadoop HDFS源码+原理剖析
作者其他创作
大纲/内容
checksum4 byte
2、检查需要移除的nameservices
临时block文件*.brw
Y
4、如果有一些nameservice要删除掉,那么就要停止他对应的BPServiceActor线程
c) mkdirs
offerService()
不断的给namenode汇报自己的block(5分钟)
PendingReplicationBlocks
是否2个packet?
BlockManager
根据文件地址获取INodeFSDirectory.getPathComponentsForReservedPath()
添加至ackQueue
clientRpcServer供dfsclient请求
DFSClient
journa RPC服务端JournalNodeRpcServer.journal()
通过代理发送RPC请求IPCLoggerChannel.sendEdits()QJournalProtocol.journal()
writeChecksumChunks
启动
构建BlockReader Impl: ( RemoteBlockReader2 )
FSNamesystem.loadFromDisk(conf);初始化FSNamesystem,加载fsimage和edits log进行内存合并
队列有数据one = dataQueue.getFirst()
开始续约beginFileLease()
LogSource[ URLLog ]
当管道建立成功后,发送packet到第一个DataNode时异常:1、关闭 ResponseProcessor 线程2、关闭针对这个异常DataNode的各种IO 流和 Socket3、将ackQueue中的packet重新放回dataQueue中,之后再次打通管道的时候需要重新发送4、从管道中摘除primary dataNode5、满足一定特殊条件(当前DataNode数量 < replication/2 [ 1 < 3/2 ])就会请求nameNode一个新的DataNode,并且在请求的过程中,告诉nameNode那个异常的DataNode6、针对新的DataNode列表,建立socket和io资源,发送空packet再次尝试打通管道7、请求nameNode更新block对应的DataNode pipeline若第5步没有请求到新的DataNode,此时这个block就只有2个副本,之后nameNode的自动检测机制会发现这个副本不够的问题,然后从DataNode中复制一个。
写磁盘
发送空packet结束完整的block传输
校验NamespaceInfoverifyAndSetNamespaceInfo()
添加url和servlet的映射哪些请求url,是转发给哪个servlet来进行处理的,在这里要做好配置和映射addInternalServlet()
定时同步edits log
JournalSet.JournalSetOutputStream.flush()
释放契约
将需要复制 ( 副本数量不足 ) 的block放入UnderReplicatedBlocks neededReplications将某个block复制到哪些DataNode上
和第一个DataNode建立socket管道createSocketForPipeline
INodeDirectory
chooseTarget4NewBlock()
添加文件元数据FSDirectory.addFile()
a) 接收响应,拿到namespaceInfo
启动startDataNode()
当前nameNode的状态active/standby
设置ack监听添加packet
获取NamespaceInforetrieveNamespaceInfo()
如果没有packet就创建一个
INodeMap inodeMap
生成复制任务computeDatanodeWork()
管理文件系统namespace1) 文件->block映射(namespace)一个文件被切割成多少个block,和每个block的映射关系2) block->DataNode映射(inodes) block都放在那些DataNode上(包括这些DataNode的副本)
启动startCommonServices(conf)
pendingIncrementalBRperStorage最近变化的block
进入safeMode
初始化DataStorage,开辟对应的存储空间initBlockPool()
注册了一个新的BlockPool将BPOffserService对象传递了进去,在BlockPoolManager中注册blockPoolManager.addBlockPool(bpos)
SafeModeInfo安全模式管理器
发送blockblockSender.sendBlock()
FSDirectory.delete()
写入双缓冲EditLogFileOutputStream.writeRaw()
FSDirectory
NameNode
异步
下载文件open()
chunk
e) fetchLocatedBlocksAndGetLastBlockLength()getBlockLocations()获取文件都在哪些block上
移除各种内存数据结构
FSOutputSummer.write()
blockSafe < blockThreshold或磁盘空间不足
续约线程LeaseRenewer
DataXceiver
要根据namenode返回的状态(active or standby)更新当前BPServiceActor的状态,一组namenode是两个namenode,一个active,另一个standby每个namenode都对应一个BPServiceActor,保存自己对应的namenode的状态发送心跳的时候,如果说感知到namenode的状态发生了切换,active -> standby, BPServiceActor也是要更新自己的状态bpos.updateActorStatesFromHeartbeat();state = resp.getNameNodeHaState().getState();
放入复制work集合ReplicationWork
记录破损的block,以及对应的DataNode
内存目录树INodeDirectory rootDir
LeaseManager
监控心跳线程Monitor
namesystem.loadFSImage(startOpt);fsImage.saveNamespace(this);
mkdirs()
设置数据管道setPipeline
INodeFile
建立连接发送空 block
创建BlockPoolManager负责管理block
没有就创建
serviceRpcServer供dataNode请求
JournalNodeHttpServer
N
Standby nameNode
DistributedFileSystem
初始化在pipeline刚创建的时候(PIPELINE_SETUP_CREATE)此时告诉nameNode我准备接受block
在nameNode创建新blocknextBlockOutputStream()
写chunkwriteChunk
datanode是否宕机DatanodeManager.isDatanodeDead()
初始化initDataXceiver()为client提供数据流方式的block读写
FSEditLog.logMkDir()
Y-执行checkpoint
FSDataOutputStream
加锁
本地输出流EditLogFileOutputStream
chunk512 byte
EditLogFileInputStream
flushBuffer();
packet接收器PacketReceiverNIO处理packet数据
3、增量上报汇报一次最近增量变化的block(接收到或者删除掉的)(5分钟)
FSNamesystem.startCommonServices()
AsyncLogger.sendEdits()
找到指定block 对应的本地磁盘文件输入流datanode.data.getBlockInputStream()
nameservice代表一组namenode,它由两个namenode组成的,就是一个active + 一个standby
是否比buf缓冲区大?
Journal.journal()
写入双缓冲EditsDoubleBuffer
b) rpc响应
创建目录mkdirs()
IOUtilscopyBytes()
startFile()startFileInternal()
DataXceiver单独处理某个block
journal node 集群
packet
如果说建立管道失败的话(1)抛弃掉这个block,做一个申请block的反向操作,从nameNode的FSDirctory数据结构中删除。(2)将那个建立连接失败的datanode加入一个hdfs客户端的excludedNodes列表中(3)如果建立管道失败,最多可以重试3次(4)后面在重试申请block以及建立管道的时候,就会传递过去excludedNodes列表(5)namenode再分配新block的时候,就不会绑定有故障的datanode了
每个asyncLogger负责和一个Journal server传输edits log
开启后台监控线程blockManager.activate()
RECEIVING_BLOCK
FSNamesystem
构建DataXceiverServer以数据流的方式,为client读写block提供数据流的上传,数据流的下载
startFile()
开启transaction分配transactionId
打开一个新的edits log供下一次写入NNStorage.writeTransactionIdFileToStorage(imageTxId + 1);
EditLogOutputStream.flush()
saveAllocatedBlock
BPOfferService
往FsDataset里面加入了一个BlockPooldata.addBlockPool()
socket操作READ_BLOCK
ImageServlet持久化fsimage,清理editslog文件
b1) namesystem.handleHeartbeatb2) datanodeManager.handleHeartbeatb3) heartbeatManager.updateHeartbeatb4) DatanodeDescriptor.updateHeartbeat : 更新最后一次心跳 lastUpdate。
对应
FSEditLogOp op
获取block块大小
完整block文件*.blk
1 通过HTTP获取edits 文件流HttpURLConnection/getJournal
创建DataStorage负责管理datanode上的block数据
实例化ClientDatanodeProtocolPB处理各种rpc请求,包括:client和datanode之间进行管理(关闭datanode,汇报block)的接口datanode和datanode之间的rpc接口等
文件权限校验FSPermissionChecker
检查是否有副本不足的block
NNHAStatusHeartbeat
LocatedBlockDataNode[]数组blockID
新开启线程
超过1小时没有renew
建立管道 从hdfs client 到 dn1 dn2 dn3createBlockOutputStream
QuorumOutputStream.flushAndSync()
写入文件logSync()基于transactionId分段存储
FileUtilcopy()
PendingReplicationMonitor监控复制block过程中pending住的操作
HeartbeatResponse
生成接收中的文件
nameNode返回
openInfo
2、发送心跳上报block report(3秒)
N 发送到DataNode
接收packet
获取未准备好的blockLeaseManagergetNumUnderConstructionBlocks()
packet内容
创建不存在的目录FSDirectory.unprotectedMkdir()
LeaseManager.Monitor契约监控器移除掉过期lease
选择targetNode遍历复制任务,给block找到目标DataNode,也就是说将这个block复制到哪些DataNode上
安全模式阈值blockThreshold
移动
响应处理器ResponseProcessor当收到dataNode响应时,将packet从ackQueue中删除
添加日志FSEditLog.logAddBlock()
ack响应
通过hadoop-common的HttpServer2组件,构建HTTP serverHttpServer2会监听那个端口号上发过来的http请求
heartbeatManager
构建BlockSender
QuorumOutputStream会保证大多数请求成功(N/2)+1 个他会【阻塞】统计有多少个asyncLogger同步edits log成功
整个block流程结束
创建
如果某一个BPServiceActor注册成功后,就开始进行NN校验,比较当前成功连接的NN和其他NN是否一致(应该是在比较active和standby)bpos.registrationSucceeded()
packetReceiverreceiveNextPacket()
初始化BlockPoolManager的时候,会根据配置文件中的nameservice的配置,对每个nameservice(一组namenode),会去创建一个BPOfferService如果仅仅用的是hadoop的普通的HA架构,就只有一个nameservice,也就只有一个BPOfferService
如果bpNSInfo是null的话,代表的就是第一个namenode返回了一个NamespaceInfo
初始化initIpcServer()是一个rpc server
Y 等待响应
拉取packet数据没数据就wait
block接收完成notifyNamenodeReceivedBlock()
NameNodeRpcServer
EditLogTailerThread
和目标DataNode建立socket连接(类似建立pipeline发送空packet的时候)
启动NameNode.join()
等待连接peerServer.accept();
NameNodeHttpServer
开始读取packet,通过packetReceiver接收数据readBuffer()
发送完后,发送一个空packet结束完整block
将需要复制的block和复制到的DataNode,增加到srcNode的带复制队列中也就是说告诉scrNode(也就是执行任务的DataNode),它上面的哪些block复制到哪些DataNode上去。之后srcNode心跳过来,就直接给他返回replicateBlocks里的数据,让它进行复制。DatanodeDescriptor#replicateBlocks
如果还有数据,就继续读取下一个block,直到读完为止pos +1
AsyncLoggerSet
上传fsimageHttpURLConnectionPUT /imagetransfer
是否safemode
SortedMap leases
启动PacketResponder线程1 处理下游DataNode响应2 添加至ackQueue3 删除ackQueue
BlockPoolSliceScanner
通过RPC在nameNode上创建blockdfsClient.namenode.addBlock()
...9个
BlockManager管理
获取所有blockBlockManager.getTotalBlocks()getTotalBlocks()
每60s
是否最后一个空packet?if (one.lastPacketInBlock)
写文件数据Packet.writeData()
写入本地磁盘
超过100w条edits log没有写入文件或者超过1小时没有写文件
AsyncLoggerSet.sendEdits()
GetJournalEditServlet
journal输出流QuorumOutputStream写入大部分journal node的数据流
processOp()处理WRITE_BLOCK操作
a) 直接由FSNamesystem响应,返回一个namenode的NamespaceInfoNamespaceInfo大概来说就是包含了一些id,比如说clusterId,nsId,blockPoolId
空P
1
DecommissionManager监控block状态,如果死掉就尝试下线
2048个packet大小 差不多等于一个block大小了,这时候,需要创建一个0大小的packet发送到queue中,DataStreamer在消费queue的时候就知道空packet前两个就是一个block了。
上传文件create()
发送一个即将读取block的消息READ_BLOCK,告诉DataNode,准备相关资源,如:dataXceiverServer -> dataXceiver --> receivernew Sender(out).readBlock(....)
1、连接到namenode握手并且注册connectToNNAndHandshake()
每隔60s续约
正式启动DataBlockScanner1、将BlockPoolId加入block扫描器中2、上报损坏的block到nameNode
覆盖操作
选择一个DataNode 包含socket地址等信息chooseDataNode()
告诉complete状态的数量
下载的文件流FSDataInputStream
block接受开始/进行中notifyNamenodeReceivingBlock()
FSVolume
启动RPC server
PacketResponder线程如果packet是此次存储block的最后一个空包关闭block文件流
退出safeMode
开始读取这些blockread()
Packet 共127个chunk
ReplicationMonitor监控复制block线程
初始化定时运行的scannersinitPeriodicScanners(conf);启动block检查器DataBlockScanner
logStream.flush();flush到磁盘和发送到远程journal node上
写入双缓冲editLogStream.write(op);
如果是block副本不足,将返回 DNA_TRANSFER 命令,当前DataNode就将指定的block复制到指定的其他DataNode上去DataNode.transferBlocks()
5、如果某个nameservice他的namenode列表变化了,需要初始化新的BPServiceActor线程去跟新的namenode进行通信
BPserviceActor
PacketHeader
发送一个个packet 64kb,里面有chunk和checksum 512字节+4字节的checksumsendPacket()
现在我们已经知道了namespace ID,此时就可以调用datanode的initBlockPool()方法来完成local storage,也就是始化datanode上的那个block pool(BPOfferService)对应的local storage,本地存储
节点转为INodeINode[] inodes = iip.getINodes();
写入磁盘文件FSImage.saveNamespace()
分析文件状态analyzeFileState()
更新状态
构建完整blockLocatedBlock【blockIDDataNodes】
移除宕机datanodeDatanodeManager.removeDeadDatanode()
切割chunk
写checksum数据Packet.writeChecksum()
每30s心跳检查
DatanodeCommand:DataNode要执行的任务
解析路径,获取每个节点和不存在的节点FSDirectory.getExistingPathINodes()
最后一次心跳时间 > 10'30s
读取完毕
networktopology
Hadoop HDFS Namenode源码
发送packet
发送block和checksumblockSender.sendBlock
检查可用空间NameNodeResourceCheckercheckAvailableResources
初始化storage存储如果这是第一次初始化一个block pool的话,还需要同时初始化datasdet,block scanner,等等其他的一些核心组件initStorage(nsInfo)
启动HTTP server startHttpServer()
获取状态为complete的block数量getCompleteBlocksTotal()
DataNodeblock所在的副本2
保存editLog
核心线程DataStreamer
写入checksum文件校验文件破损
applyEditLogOp将返回的FSEditLogOp应用到namespace中FSDirectory
block的副本数量小于1,就认为是没有complete
DatanodeManager
DatanodeCommand[]
TransferFsImage.uploadImageFromStorage()
DataNodeblock所在的副本1
Packet默认一个65536byte
b) rpc发送
editLogStream = JournalSetOutputStreameditLogStream是一个聚合stream,他的write方法可以同时写到本地和远程
找出第一个为null的INode,并开始创建目录
为block选择多台机器即选择多个DataNode根据机架感知和负载均衡BlockPlacementPolicyDefault.chooseTarget()
e) 创建block
FileJournalManager.getLogFile(segmentTxId);
发送心跳过后,会拿到一个HeartBeatResponseHeartbeatResponse resp = sendHeartBeat()
上传文件fs.copyFromLocalFile()
找到第1个blockgetBlockAt()
FSDataInputStream
对外暴露的接口实现几乎都是调用FSNamesystem完成
getBlockLocations()
一个buf是9个chunk,这个buffer最好是3的倍数,因为代码会对3个chunk并发进行checksum。
构造请求nameNode的RPC代理
EditLog 写入流程写入双缓冲-> 本地文件 -> journal node(写入自己editlog) -> standby nameNode同步(写入自己editlog)
Hadoop HDFS Datanode启动源码
选择待复制的blockneededReplications.chooseUnderReplicatedBlocks()
副本阈值replQueueThreshold
FSNamesystem namesystem;
inodes(哪些block是在哪些datanode上),这份数据,是在每次namenode启动之后,由datanode来汇报过来的,而且datanode在运行期间会每隔一段时间都汇报一下自己有哪些block,汇报给namenode
通过sender发送到指定的DataNode上去new Sender(out).writeBlock()
放入带复制的集合中neededReplications
DataXceiverServer接收client或者其他DataNode的请求
SortedMap sortedLeasesByPath
dataNode之间传输异常1、修改mirrorError=true2、BlockReceiver.PacketResponder 的ack响应status不是success3、DFSOutputStream.DataStreamer.ResponseProcessor接收到ack响应不是success,标记那个异常DataNode下标。4、此时DataStreamer卡在等待ackQueue != 0的while循环上,发现hasError=true就跳出循环,走到和《发送到第一个DataNode异常》一样的逻辑处理中。
4、全量上报上报block reportofferService()(6小时或刚启动)
刷入journal 节点List<JournalAndStream> journals
StandbyCheckpointer
replicationThread
从第pos(0)个block 开始读currentNode = blockSeekTo(pos);
注册register()
启动BPServiceActor线程(1、向nameNode进行注册2、发送心跳3、上报block report4、执行响应command)
加入
一个BPServiceActor对应一个nameNode
文件的所有blockLocatedBlocks
DatanodeProtocolClientSideTranslatorPB
解析创建的目录path,生成二维数组FSDirectory.getPathComponentsForReservedPath(src);
EditLogTailer
DFSOutputStream
addPendingReplicationBlockInfo将增加的block加入最近变更的block中
FSEditLogLoader
创建空packet
Packet.writeTo()将packet发送到DataNode
checksum+chunk对.... .... 127 个
DataXceiverServer接收到链接请求peerServer.accept()
BlockReceiverblock接收器
如果是最后的packet,就说明已经发了两个packet过去了,这里就要等待这两个packet是否已经完整发送到其他DataNode上,ackQueue里面是通过ResponseProcessor进行处理的,如果处理完成,会移除ackQueue,这里就可以让dataQueue继续接收数据了。
3、如果有新加入的nameservices,就启动
新增、删除block,增量汇报blockReceivedAndDeleted()
checkpoint
选择srcNode也就是说这个block从哪个正常的DataNode去复制chooseSourceDatanode()
SafeModeMonitor不断检查能否离开safeMode
创建blockcreateNewBlock()blockId自增
发送packetsendPacket
移除
FileSystem
initialize初始化
DataNodeblock所在的副本0
HeartbeatManager心跳管理
a) 通过rpc请求namenode
内存结构中添加blockFSDirectory.addBlock()
启动nameNode集群也就是每一个BPOfferServicestartAll()
excludedNodes排除上次建立管道失败的DataNode
1、遍历nameservices,发现新加入的nameservices
添加契约addLease()
getAdditionalBlock()
NamespaceInfo== null ?
将packet移除dataQueue
2建立连接 发送空 block
启动startInfoServer()是一个Http server
pendingIncrementalBRperStorage()获取最近的block
写完日志后返回
DataNodeDataTransfer线程
NavigableSet<Lease> sortedLeases
2.6.1之前,active/standby nameNode的edits log都不落本地磁盘,在进行合并时,通过FSEditLogLoader(和standby定时同步的edits log用的组件是一样的)从journal node获取edits log和本地fsimage合并。2.6之后,nameNode本地都有edits log,启动时直接将edits log和fsimage进行合并,之后清除edits log。这样加速了这个合并过程。
写入journalNode
d) create
说明本次是第二个namenode返回了一个NamespaceInfo将两个NamespaceInfo进行校验,看看一些信息是否匹配比较了一下两个namenode返回的 * blockPoolId * namespaceId * clusterId是否全部一致
每2秒一次
CheckpointerThread
此时,就会创建写入本地edits log和发送到远程journal node上的输出流
通过RPC代理进行注册,返回的bpRegistration对象里包含了namenode注册成功以后给datanode返回的一些东西bpRegistration = bpNamenode.registerDatanode(bpRegistration);
阻塞
List<AsyncLogger> loggers
RECEIVED_BLOCK
block总数量blockTotal
INodeDirectory
通过NIO读取文件
操作内存数据结构
构建一个DatanodeRegistration对象相当于是一个datanode注册请求,这里面包含了DataNodeIdnamenode收到这个注册的请求,那么就可以识别出来是哪个datanode发起的注册最后返回bpRegistrationbpRegistration = bpos.createRegistration();
执行命令
利用可排序的集合,快速过期lease
3秒
3 ack响应
dataQueue
延迟执行一次block report,去汇报一次当前datanode节点上全量的block,此方法做一些时间的设定,之后就会在BPServiceActor.run()方法开始进行block reportscheduleBlockReport()
检查读写有error的磁盘,排除掉这些磁盘的读写,避免有datanode有一些启动的失败checkDiskError()
blockManager
执行command
0 条评论
回复 删除
下一页