天天看点

Hadoop Replication策略

来源于源码阅读笔记。

前提:

• 机器故障是常态 

• 文件不能丢失 

• 需要对文件进行冗余的拷贝备份

思路:

• 不足拷贝数的:及时复制 

• 超过拷贝数的:删除多余的 

• 无效的:直接删除

几个常驻内存的队列

•  NeededReplications 

    需要进行replicate 的 blocks 

•  PendingReplications 

    正在进行replicate 的 blocks 

•  ExcessReplicateMap 

    超过Replicator 数的 blocks 

•  RecentInvalidateSets 

当前状态是失效的blocks

UnderReplicatedBlocks

•  NeededReplications所属的类 

• 保存所有当前需要 Replicate 的 block 信息

• 每个 Block 都有不同的 Replicate 优先级 

    0为最高 

    4表示不需要进行 Replica

优先级队列

            0                              只有一个 Replica 的 block

            1                      当前 Replica*3< 期望 Replica 数的 block

            2                                         其他

            3                        所有 Replica 都在一个 Rack 的 block 

PendingReplications

• 正在等待 DataNode 进行 Replicate 的 blocks

 • pendingReplicationMonitor线程对其进行监视

 • 监视超时仍未 Replicate 完成的 block

 • 超时设置为      dfs.replication.pending.timeout.sec

PendingReplicationMonitor

 • 当 NameNode 收到 blockReceive 的信息,将对应等待 replica 的 block 移除,表示 replicate 成功

 • 当发现超时的 block ,将其加入 timeoutItems 队列

 ReplicationMonitor

• 独立的线程执行主要的 Replicate 工作

• 间隔:  dfs.replication.interval      默认 3 秒

•  computedDatanodeWork

               – computeDatanodeWork 

               – processPendingReplications

ComputeDatanodework

• 执行 block replication 和 invalidateion

• 具体的操作将于下次 heartbeat 时被通知到相对应的 datanode

•  Safemode时不执行

• 几个参数:

    – blockToProcess:一次工作最多能 replicate 的 block 个数

      heartbeatSize * REPLICATION_WORK_MULTIPLIE_PRE_ITERATION(默认为 2 ,即活着 dn 的两倍)

    – nodesToProcess:一次工作最多进行 invalidate 的 dn 个数

      heartbeatSize*INVALIDATE_WORK_PCT_PRE_ITERATION (默认为 32% ,即 1/3 的 dn )

    – workFound:如果没有需要 Replicate 的 block ,则执行  invalidation

      ( Heartbeat.size()实际是当前收到的所有 heartbeat 的数目,即活着的 dn 的个数 )

执行步骤

•  (1)获取一个 srcNode  即发起 replicate 的 datanode

•  (2)排除已经在 pending 并且个数足够的 replica

•  (3)选取一个 TargetNode  即需要将 replica 传输至 的 datanode

•  (4)更新 srcNode 在 NameNode 中的信息,加入 replicatedblocks 对象与 targetNode

•  (5)更新 targetNode 的 curApproxBlocksScheduled 信息

•  (6)最后将此 block 从 needed 队列移除,加入  pending 队列

( TSP问题,实际是按照树的深度之和,计算两个 dn 距离,利用两次循环(选择排序)得出 pipeline )

获取srcNode 的算法

• 期望:获取一个正处于  DECOMMISION_INPROGRESS 状态的 datanode

   原因:最不忙(没有写的traffic )

• 不使用已经 decommissioned 的 datanode

• 如果都不满足,则随机选择一个为达到  replication limit 的 datanode

computeInvalidateWork流程

• 处理 recentInvalidateSets 队列中已经失效的 block

   recentInvalidateSets: TreeSet<DN,list<block>>

• 共执行 nodesToProcess 次循环

• 每次循环,取出头一个 DN 对应的 blocklist

• 从中取出不超过 blockInvalidateLimit 个 block

   blockInvalidateLimit = max(100, 20 * heartbeatinterval / 1000)

• 剩余的继续放回队列中

• 将选出的 block 更新进对应的 datanode 中

ProcessPendingReplications

• 处理超时的 replica

• 循环 timeoutItems 中的对象,将其重新放回 needed 队列

DataNode heartbeat 后的工作

•  生成 replicate command (DNA_TRANSFER) 

   maxReplicationStreams      –    xmitsInProgress 个

   (dfs.max-repl-streams,2)        (并发的 xceriver 个数, dn 的 threadGroup.activeCount)

•  生成 invalidate command  (DNA_INVALIDATE)     

   blockInvalidateLimit个

   max(100, 20 * heartbeatInterval / 1000 )

   heartbeat频率的 20 倍,即一次最多 20 个

 NeededReplications更新

(每隔(dfs.namenode.decommission.interval,30) * 1000 间隔检测一次)

•  1.NameNode启动, leave safemode 时

•  2.Decommission Manager线程,检测处于 Decommission 状态的 datanode(1)

•  3.File complete

•  4.checkLease Manager

RecentInvalidate更新

•  1.Excess Replica

•  2.setReplica 变小

•  3.blockReport通知

•  4.删除文件

•  5.DiskError

ExcessReplicateMap

• 保存超过 Replica 数的 block

• 每当某 datanode 加入一个新的 block ,选择另外一个 datanode(1) ,并将其加入

   recentInvalidate队列,等待删除

• 更新时刻:

     – setRep 变小

     – addStroedBlock 

• 算法:

  传入的参数是nonExcess list

  1.从所有的 datanodes 中,生成一个 map<Rack,list<dn>>;

  2.从 map 中分为两个集合 priSet (多于一个 dn 的 Rack ), remains (仅有一个的)

  3.先保证满足 delHint( 只有 blockReport 给出 )

  4.从 priSet 中选剩下空间最小的

  5.从 remain 中选剩下空间最小的

继续阅读