05_2-HDFS文件上传--create创建输出流过程
2021-02-18 10:57:59 4 举报
HDFS文件上传源码解析
作者其他创作
大纲/内容
run
receiveBlock
在NameNode创建得时候会启动这个线程 2s检查一小时都没续约得,
第一个异常:创建管道异常?申请新块和管道
是否是最后的一个空packet
1、创建新的 LocatedBlockLocatedBlock = dfsClient.namenode.getAdditionalDatanode
create返回的是 FSDataOutputStream,然后封装成 HdfsDataOutputStream
如果第一个节点都没有超过一小时,直接return
packet (127个)64K
dataQueue是否为空并且判断hasError
mkdir
while重试10次
DataStreamer
没有数据要写入
第二步:写入本地磁盘 out.write
1、nextBlockOutputStream()2、initDataStreaming
blk_0002_checksum
基于输出流,向DataNode发送请求 Sender
向NameNode发送创建文件请求
lastUpdate
从in流中获取下一个packet
extends
是空的packet
获取DataNode返回数据initDataStreaming
FSOutputSummer
FSDataOutputStream
发送 send
否
DFSPacket 移除
失败
发送RPC调用
INodeDirectory
LocatedBlock
获取lease中所有path释放契约、移除lease
BlockConstructionStage.PIPELINE_SETUP_APPEND
WRITE_BLOCK
第二个异常:写第一个datanode异常
DatanodeInfo[]筛选出来的DataNode
setPipeline(nextBlockOutputStream()) ;申请block、并创建数据管道
是
设置管道状态PIPELINE_CLOSE
获取所有的client ArrayList
契约管理 LeaseManager
blk_0001
成功
如果租赁软限制时间到期,请恢复租赁
本地上传文件
加入管理
endBlock()关闭流、释放资源、重置各种状注意此时就会重置数据管道
封装 INodeFile
这里是一个线程,边写这边在边删除ack
DataXceiverServerDataXceiverServer
1
Packet
移除
写入的数据的剩余内容大于或等于 buf 的大小 ---这里避免多余一次复制
newStreamForCreate
返回的是 FSDataOutputStream
加入add
第二个异常不确定是不是这里也算?
创建 DFSOutputStream 输出流
chunk512字节
for循环client
判断是否需要添加新的DataNode1、副本数量 < 32、当前节点总数小于副本数除以2
需要判断一下
封装创建 new DataStreamerstage = BlockConstructionStage.PIPELINE_SETUP_CREATE
Sender(out).writeBlock
DistributedFileSystem
1秒一次
从 blockReplyStream 获取请求状态
DFSPacket
写入
优点:其实这里就是不用全部遍历了,我只用遍历第一个最久的,是否超过一小时,如果最久的都没有,那后面的肯定也就不会了。 * 这个是不是可以用在注册中心的那个租约上,那样我们也就不用遍历所有的了
第三个异常:DN节点传输数据异常
namenode.renewLease(clientName)
因为传递过来时,说明和下游的DataNode通信失败,所以需要把之前维护的block和文件的对应信息都删除
2.1 关闭各种流2.2 将ACKqueue中数据,重新加入到dataQueue2.3 清空ACKqueue2.4 重置数据流管道
dataQueue (LinkedList)
TreeSet<String>
Lease
封装返回消息BlockOpResponseProto
创建一个新的block
write
DFSClient
opWriteBlock(in)
FSNamesystem
申请block locateFollowingBlock
RPC
上一次续约得一个时间戳
返回
blockReceivedAndDeleted
失败,重试三次
NameNode
加入契约管理
本地下载文件
success = createBlockOutputStream
生成校验和 chunkSum
do locateFollowingBlock while(3次)
blk_0001_checksum
abandonBlock
BlockOpResponseProto
create
wait 等待 ackQueue中的消息消费完成
通过client获取契约 Lease
Block blockid自增
bloc (2048个packet)128M
NameNodeRpcServerNameNodeRpcServerNameNodeRpcServerNameNodeRpcServerNameNodeRpcServerNameNodeRpcServerNameNodeRpcServer
把当前这个包转发给下游DataNode
当前客户端针对哪些文件持有契约
创建输出流new DFSOutputStream
使用流上传文件
创建输出流 HdfsDataOutputStream
这里实际上是开启这个client 得 续约操作,因为在创建文件的过程中,在NameNode的create中,已经开启了一个契约如果超过一小时都没发送,那在NameNode那端得线程,就会定时检查Lease,超过一小时未续约,进行删除
传递过来是client
updateBlockForPipeline
一个block对应一个 BlockReceiver
2、processDatanodeError(
添加
移除文件 、 移除租约
public class HdfsText01 { public static void main(String[] args) throws Exception { //1 创建连接 Configuration conf = new Configuration(); //2 连接端口 conf.set(\"fs.defaultFS\
DataNode DataNode DataNode DataNode DataNode DataNode
数据上传过程中,出现了错误
返回 HdfsFileStatus
1、关闭ResponseProcessor响应线程
PacketReceiver用于DataNode之间写数据
ACKqueue
receivePacket 读取packet
是否最后一个包
write 数据写入
写入内存后判断是否需要进行 刷盘
没地方画了,画在最上面的容错
根据需要判断是否需要进行刷盘的操作
将chunk、checksum写入
wait 等待 ackQueue中的消息消费完成
//hdfs文件复制到本地(流)FSDataInputStream in = fs.open(new Path(\"/output\"));FileOutputStream out = new FileOutputStream(\"E:/mm.txt\
createRbwFile
循环读取
获取发送过来请求类型OP
TreeSet<Lease> sortedLeases 通过续约时间排序得集合
addBlock
磁盘文件
将block加入BlocksMap BlockManager
BlockReceivernew 创建核心组件
checksum
getAdditionalDatanode
哪个客户端持有得契约,client
while循环
INodeFile
1、通过之前的管道发送数据,block不变2、initDataStreaming
针对DataNode创建Socket链接createBlockOutputStream
blockReplyStream
1、从 BlockManager 中移除2、取消当前block和元数据文件的关联3、写日志、供standby同步
dfsClient.namenode.abandonBlock
datanode写数据失败副本数不足时调用
catch 异常
DdataNode之间传输失败的标识
while (receivePacket() >= 0)
hasError 如果报错的话,这里的的while循环也会退出
获取分配的DataNode
不是
Packet 添加
DataBlockScanner 线程会进行磁盘上的block进行扫描检查
移除 ACKqueue头
ACKqueue (LinkedList)
感觉这里应该是一个标识,这个空的packet就标识这个block写满了
是否是block中最后的一个空packet
2
请求是否成功
HdfsDataOutputStream
getEditLog().logSync()查看04 Mkdir创建过程
接收DataNode汇报block得信息写入block完成、删除block等
peerServer.accept() 阻塞等待连接
报错外层设置 catch hasError = true;
FsVolumeImpl创建文件
DataNode
3
异常时:失败重试
dir.addFile添加到文件目录树
renewLease
key:client名称
DataXceiverServer线程是以数据流的方式,为client提供block的数据流上传、数据流下载通信方式:SocketDataXceiverServer
createRbw
Socket
此时会把DdataNode连接失败的节点加入excute集合
第一步: NameNode 端创建一个 file 文件 ,文件名blk_00001
注意此时会把DdataNode不通的节点传递过去,告诉NameNode,下次不要把这个节点给我
重新获取block、重置管道如果这里发生错误,失败重试3次
报错
Socket 创建
大概看了一下,如果是在datanode同步数据的时候出错,此时好像会停止这个数据的线程写入,然后给client返回对应的异常信息,然后client端的 hasError 就会设置成true,然后会再重进行block的选择等
循环遍历所有DataNode返回的写入状态
这个异常应该是第二次循环才会改过来有的
参考一下 04 图 ,创建路径
TreeSet<Lease> sortedLeases
HdfsDataOutputStream
private void endBlock() { if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug(\"Closing old block \" + block); } this.setName(\"DataStreamer for file \
启动这个流 out.start()
无限while 循环
创建文件的操作,包括目录树和editlog的操作
初始化一个 ResponseProcessor处理响应请求的
线程 run
此时会告诉NameNode,这个DataNode节点不通
DataStreamer是一个核心线程,主要负责数据上传到 DataNode,他是负责通过一个数据管道(pipeline) 将数据包 (packets)发送到DataNode
一个FsVolume可以理解成一个本地磁盘的一个文件组,包含一堆的block文件,DataNode上包含了多个FsVolume
getNewBlockTargets
获取 第一个 lease
blk_0003_checksum
packetReceiver.mirrorPacketTo第一步: 首先将数据包写入镜像
关闭各种流 finalizeBlock完成block的写入
这里会有一个心跳包的判断,这个代码应该在创建block的时候发了一个测试的心跳packetisHeartbeatPacket
1、向NameNode报备2、记录异常Node3、重试
blk_0002
设置 mirrorError = true
2.41、移除那个坏的datanode节点2、重置数据流管道setPipeline
从dataQueue中获取数据getFirst()
获取 BPOfferService
是否是新的block 创建管道
返回 LocatedBlock
for (BPServiceActor actor : bpServices)可以i理解成就是循环向active namenode 和standby namenode发送通知
DFSOutputStream.
和对应的INodeFile绑定之误认为是添加到树,其实我感觉应该是Block和对应的File文件节点绑定
处理请求processOp(op)
响应结果
重要
success = createBlockOutputStream此处流程就和前面是一样的了,参照获取block时得图
INodeDirectory
addDatanode2ExistingPipeline
blockReceiver.receiveBlock
是否是最后一个空packet
发生异常
holder
再次获取 LocatedBlock向NameNode发送RPC请求
1、移除 dataQueue 2、添加 ACKqueue
删除ACKqueue中第一个节点,说明写入成功
buf = chunk*9 = 512字节 * 9 buf初始化的大小是chunk*9的大小,chunk默认是512字节
写数据
NIO
机架感知 BlockPlacementPolicy
接收packet的核心逻辑 :接收并处理数据包
2、重置数据管道 setPipeline(lb)
renew(); 续约
超过
判断是否需要添加新的DataNode
getEditLog().logOpenFile写入内存Editslog
创建通信管道 Pipeline
开启文件的续约操作
第三步:写入对应的校验和文件checksumOut.write
应该是开始直接写数据了
//流上传文件 FileInputStream in=new FileInputStream(\"E://haha.txt\");//读取本地文件 FSDataOutputStream out = fs.create(new Path(\"/output2\
FsDatasetImpl
创建接收DataNode的返回数据流 (输入流)
此时其实传递过来得是一个空得block
读取第一个
写入的数据的剩余内容 < buf 的大小 -- 规避了数组越界问题
excludedNodes.put 记录异常DataNode
startFileInternal
如果此时NameNode正在启动中,则休眠五秒
return false不休眠,继续处理,就是跳出 while
执行完之后,也走上面方法
3、重新和新的DataNode创建连接,发送请求 TRANSFER_BLOCK注意此时的发送类型
success是否成功
PacketResponder
写满的packet放入 dataQueue
休眠1s
等待消费完成之后再移除添加
创建block
Lease 两个客户端得排序主要是,先通过最后一次续约得时间,如果时间一样的话,再通holder,也就是client name得顺序
好像不是向NameNode发送请求,但是是notify什么、还没看懂
这个while里面的每一步报错,都会设置 hasError = true;
return DFSOutputStream
创建一个Socket把当前这个包转发给下游DataNode
packet
添加契约管理LeaseManager
文件是否被覆写
logOpenFile
尝试收回租约时可能记录了交易。即使抛出异常,也需要对其进行同步。
创建FsVolumeReference
在文件目录树中添加了文件的目录结构
如果写入的数据== blockSize
DFSOutputStream
blk_0003
DataStreamer 核心线程
创建一个空的packet放入 dataQueue
chunk
契约管理器 LeaseManager
waitForAckHead获取ack中的头数据
写入的长度read > buf
startFile
DataXceiver 创建核心线程
创建 PacketResponder ,开启线程,暂时没懂、
读取下游ACK
写满放入
INodeDirectory 目录
超过十次
addLease
key:文件path
这里应该会通知namenode,然后需要注意的是,这里会通知到集群中所有NameNode
建立连接
1、从 sortedLeases 移除契约2、更新一下lastUpdate为当前时间3、再次添加契约到 sortedLeases
应该是Namenode更新block时间戳,这样异常DataNode过期数据块会被删除
读取发送过来数据
猜测这里应该是一个统一返回吧,就是返回所有datanode的写入状态,成功失败
1、创建多少个副本2、每个副本都在哪台DataNode上
writeChecksumChunks
未超过
向上游DataNode发送一个ACK消息
switch(op)
FileSystem # copyFromLocalFile
one.writeTo(blockStream)写数据
return
while 无限循环
循环while从头开始,重新写
checksum4字节
参考一下 06 图 ,write写数据HdfsDataOutputStream
判断是否超过1小时
发送RPC
如果写入数据长度大于缓冲区长度(4608),将其直接发送到底层流(packet)
将 chunk 和chunck写入currentPacket
DataNode
直接return,就不会进行写磁盘了
向上游DataNode发送一个ACK消息(不知道为啥又一次)
在写完一个block之后,其实这里就会重置 PIPELINE_SETUP_CREATE具体代码看最下面的 engBlock
当前packet为空,新建一个packet
ResponseProcessor响应处理器
BlockConstructionStage.PIPELINE_SETUP_CREATE
currentPacket
dfsClient.namenode.create
数据写入失败时得处理方式
需要注意的是:此时只是修改了一个状态值,也就是 readyToSend = true然后这个值得true得引用实际上是在 sendImmediately() 方法中,然后这个方法其实是 BPServiceActor 线程中,如果 readyToSend = true 为 true得时候,此时 就会调用 ibrManager.sendIBRs 方法,里面会通知namenode对应03图得最下面 (心跳)
计算数据包得大小computePacketChunkSize
如果不足一个chunk,就缓存到本地buffer,如果还有下一次写入,就填充这个chunk,满一个chunk再flush
0 条评论
下一页