来源于源码阅读笔记。
前提:
• 机器故障是常态
• 文件不能丢失
• 需要对文件进行冗余的拷贝备份
思路:
• 不足拷贝数的:及时复制
• 超过拷贝数的:删除多余的
• 无效的:直接删除
几个常驻内存的队列
• NeededReplications
需要进行replicate 的 blocks
• PendingReplications
正在进行replicate 的 blocks
• ExcessReplicateMap
超过Replicator 数的 blocks
• RecentInvalidateSets
当前状态是失效的blocks
UnderReplicatedBlocks
• NeededReplications所属的类
• 保存所有当前需要 Replicate 的 block 信息
• 每个 Block 都有不同的 Replicate 优先级
0为最高
4表示不需要进行 Replica
优先级队列
0 只有一个 Replica 的 block
1 当前 Replica*3< 期望 Replica 数的 block
2 其他
3 所有 Replica 都在一个 Rack 的 block
PendingReplications
• 正在等待 DataNode 进行 Replicate 的 blocks
• pendingReplicationMonitor线程对其进行监视
• 监视超时仍未 Replicate 完成的 block
• 超时设置为 dfs.replication.pending.timeout.sec
PendingReplicationMonitor
• 当 NameNode 收到 blockReceive 的信息,将对应等待 replica 的 block 移除,表示 replicate 成功
• 当发现超时的 block ,将其加入 timeoutItems 队列
ReplicationMonitor
• 独立的线程执行主要的 Replicate 工作
• 间隔: dfs.replication.interval 默认 3 秒
• computedDatanodeWork
– computeDatanodeWork
– processPendingReplications
ComputeDatanodework
• 执行 block replication 和 invalidateion
• 具体的操作将于下次 heartbeat 时被通知到相对应的 datanode
• Safemode时不执行
• 几个参数:
– blockToProcess:一次工作最多能 replicate 的 block 个数
heartbeatSize * REPLICATION_WORK_MULTIPLIE_PRE_ITERATION(默认为 2 ,即活着 dn 的两倍)
– nodesToProcess:一次工作最多进行 invalidate 的 dn 个数
heartbeatSize*INVALIDATE_WORK_PCT_PRE_ITERATION (默认为 32% ,即 1/3 的 dn )
– workFound:如果没有需要 Replicate 的 block ,则执行 invalidation
( Heartbeat.size()实际是当前收到的所有 heartbeat 的数目,即活着的 dn 的个数 )
执行步骤
• (1)获取一个 srcNode 即发起 replicate 的 datanode
• (2)排除已经在 pending 并且个数足够的 replica
• (3)选取一个 TargetNode 即需要将 replica 传输至 的 datanode
• (4)更新 srcNode 在 NameNode 中的信息,加入 replicatedblocks 对象与 targetNode
• (5)更新 targetNode 的 curApproxBlocksScheduled 信息
• (6)最后将此 block 从 needed 队列移除,加入 pending 队列
( TSP问题,实际是按照树的深度之和,计算两个 dn 距离,利用两次循环(选择排序)得出 pipeline )
获取srcNode 的算法
• 期望:获取一个正处于 DECOMMISION_INPROGRESS 状态的 datanode
原因:最不忙(没有写的traffic )
• 不使用已经 decommissioned 的 datanode
• 如果都不满足,则随机选择一个为达到 replication limit 的 datanode
computeInvalidateWork流程
• 处理 recentInvalidateSets 队列中已经失效的 block
recentInvalidateSets: TreeSet<DN,list<block>>
• 共执行 nodesToProcess 次循环
• 每次循环,取出头一个 DN 对应的 blocklist
• 从中取出不超过 blockInvalidateLimit 个 block
blockInvalidateLimit = max(100, 20 * heartbeatinterval / 1000)
• 剩余的继续放回队列中
• 将选出的 block 更新进对应的 datanode 中
ProcessPendingReplications
• 处理超时的 replica
• 循环 timeoutItems 中的对象,将其重新放回 needed 队列
DataNode heartbeat 后的工作
• 生成 replicate command (DNA_TRANSFER)
maxReplicationStreams – xmitsInProgress 个
(dfs.max-repl-streams,2) (并发的 xceriver 个数, dn 的 threadGroup.activeCount)
• 生成 invalidate command (DNA_INVALIDATE)
blockInvalidateLimit个
max(100, 20 * heartbeatInterval / 1000 )
heartbeat频率的 20 倍,即一次最多 20 个
NeededReplications更新
(每隔(dfs.namenode.decommission.interval,30) * 1000 间隔检测一次)
• 1.NameNode启动, leave safemode 时
• 2.Decommission Manager线程,检测处于 Decommission 状态的 datanode(1)
• 3.File complete
• 4.checkLease Manager
RecentInvalidate更新
• 1.Excess Replica
• 2.setReplica 变小
• 3.blockReport通知
• 4.删除文件
• 5.DiskError
ExcessReplicateMap
• 保存超过 Replica 数的 block
• 每当某 datanode 加入一个新的 block ,选择另外一个 datanode(1) ,并将其加入
recentInvalidate队列,等待删除
• 更新时刻:
– setRep 变小
– addStroedBlock
• 算法:
传入的参数是nonExcess list
1.从所有的 datanodes 中,生成一个 map<Rack,list<dn>>;
2.从 map 中分为两个集合 priSet (多于一个 dn 的 Rack ), remains (仅有一个的)
3.先保证满足 delHint( 只有 blockReport 给出 )
4.从 priSet 中选剩下空间最小的
5.从 remain 中选剩下空间最小的