天天看點

Flink 實作Locality 模式排程

聲明:本系列部落格部分是根據SGG的視訊整理而成,非常适合大家入門學習。部分文章是通過爬蟲等技術手段采集的,目的是學習分享,如果有版權問題請留言,随時删除。

背景

在計算與存儲一體化的情況,spark任務在排程task時會優先将其排程在資料所在的節點上或者相同的rack上,這樣可以減少資料在不同節點或者不同rack上移動所帶來的性能消耗;目前在Flink on yarn模式下,TaskExecutor的資源位置完全由yarn自主要制的,那麼就可能會造成任務所在的節點與kafka資料所在的節點不在同一個機房,進而産生跨機房的流量消耗,在這樣的一個環境背景下,需要将任務排程在資料所在機房,以減少流量消耗。(注:基于Flink-1.10.1)

Flink on Yarn排程流程

在Flink-1.9版本以前使用的排程模式是LAZY_FROM_SOURCES即以source-vertex為起始節點開始排程,當有資料輸出到下遊節點時開始排程下遊的vertex,以這種方式部署所有的vertex;在1.9及1.9版本以後使用EAGER排程模式即會立刻排程所有的vertex。下面看一下具體的排程流程圖:

Flink 實作Locality 模式排程

任務排程與部署是在JobMaster中通過DefaultScheduler完成,其會首先為所有的ExecutionVertex向SlotPoo(1)l申請資源然後部署,SlotPool會向ResourceManager中SlotManager(2)申請資源,如果沒有可用的資源,那麼就會向Yarn申請一個Container(3),待yarn配置設定了資源之後,回調給YarnResourceManager,進而啟動TaskExecutor(4),TaskExecutor啟動之後就會向YarnResourceManager彙報其資源情況(5),在YarnResourceManager進行資源比對之後就會向TaskExecutor申請資源(6),然後TaskExecutor會将自身的資源配置設定給SlotPool(7), 最後告知給DefaultScheduler(8)将任務部署到對應的TaskExecutor上。至此完成一次完整的任務排程過程。

在SlotPool向SlotManager申請資源前,會生成一個AllocationId的唯一辨別(資源ID),并且在申請的時候會将這個辨別一起攜帶過去,當TaskExecutor向YarnResourceManager彙報自身資源情況時,在YarnResourceManager中會做一個資源請求(攜帶AllocationId)與實際資源比對的過程,主要是通過資源大小(cpu、記憶體)比對,比對成功之後YarnResourceManager會向TaskExecutor發送一個申請slot請求(攜帶AllocationId),待請求成功之後TaskExecutor會将資源配置設定給對應的AllocationId的請求(7),完成資源比對過程。

Locality 排程實作分析

通常Flink與kafka是部署在不同的叢集上,這裡所說的Locality僅僅是實作rack級别的排程,即将任務排程在kafka對應分區資料所在的rack上,為了實作此功能,分為以下幾個步驟:

1)資料配置設定:Flink每一個Source-Task拉取partition是按照一定規則進行配置設定的,為了實作相同rack的partition在同一個task,是以需要改變其配置設定政策;為了保證每一個rack的資料都被消費到,需要對source并行度進行擴張,以前可能一個task消費所有rack的資料,現在需要每一個rack上的資料都有對應的task去拉取資料

實作:在flink-conf.yaml 中配置yarn叢集機器分布情況,包括ip以及對應的rack資訊,那麼任務啟動會擷取這些資訊;在StreamGraphGenerator中的transformSource方法提前生成每個source-task消費的對應topic與partition資訊,以及其需要排程到的rack資訊。這裡主要說明一下目前的配置設定政策:

例如:有a,b,c 三個rack, topic1對應partition:[0,1,2,3,4,5], 可通過KafkaConsumer的partitionsFor方法擷取對應的partition資訊,parition的分布情況是:a ->[0,1],b->[2,3],c->[4,5]
如果設定的并行度為:1 ,則配置設定規則是:task0(a)->[0,1],task1(b)->[2,3],task2(c)->[4,5]
如果設定的并行度為:4 ,則配置設定規則是:task0(a)->[0],task1(b)->[2],task2(c)->[4],task3(a)->[1],task4(b)->[3],task5(c)->[5]
注:task0 表示下标為0的task
           

擴充規則是:userSourceParallelism%numRack==0?userSourceParallelism:(1+userSourceParallelism/numRack)*numRack, 即生成的并行度是rack個數的整數倍。

生成的配置放在ExecutionConfig中的GlobalParameters中,實際效果圖:

Flink 實作Locality 模式排程

代表着下标為0的task消費partition-2,同時部署在rack-a中的機器上,下标為1的task消費partition-1,同時部署在rack-b的機器上,下标為2的task消費partition-0,同時部署在rack-c中的機器上。

2)資源申請:預設情況下在Flink向Yarn申請資源是不攜帶任何NodeManager資訊的,通常需要向yarn申請資源的流程是當遇到新的Source-Task時才會去走這個流程(根據slot-shared機制),是以隻需要在Source對應的ExecutionVetex上打上對應的rack标簽即可,将這個rack一直傳遞到YarnResourceManager端,然後擷取該rack對應的機器,從這些機器上申請資源。

實作:在申請資源前會給ExecutionVertex配置相關的資源資訊,在ExecutionVertexSchedulingRequirementsMapper.getPhysicalSlotResourceProfile中完成,是以在這裡對ExecutionVertex的資源資訊打上rack資訊

boolean hasNoConnectedInputs=
executionVertex.getJobVertex().getJobVertex().hasNoConnectedInputs();
        if(hasNoConnectedInputs){
            try{
                int index=executionVertex.getParallelSubtaskIndex();
                ExecutionConfig executionConfig=executionVertex.getJobVertex().getJobVertex().getJobGraph().getExecutionConfig();
                Map<String,String> map=executionConfig.getGlobalJobParameters().toMap();
                String index2Zone=map.get("index2Zone");
                String zone="";
                ObjectMapper objectMapper=new ObjectMapper();
        //index 表示該ExecutionVertext的下标Index
                zone=objectMapper.readTree(index2Zone).findValue(String.valueOf(index)).asText();

                //賦予區域資訊
                ResourceProfile resourceProfile1=resourceProfile.copy2ZoneUnknown(resourceProfile,zone);

                LOG.debug("vertexName:{},ResourceProfile:{}",executionVertex.getJobVertex().getName(),resourceProfile1);
                return resourceProfile1;

            }catch (Throwable e){
                LOG.error("parse resourceProfile error:{}",e);
            }
        }
           

在這裡重新定義了ResourceProfile,賦予了其rack資訊,ResourceProfile會一直傳遞到YarnResourceManager資源申請端:

public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
        if (!resourceProfilesPerWorker.iterator().next().isMatching(resourceProfile)) {
            return Collections.emptyList();
        }
        //zone 表示 rack資訊
        String zone=resourceProfile.getZone();
        if(zone!=null){
            requestYarnContainer(zone);
        }else{
            requestYarnContainer();
        }
        return resourceProfilesPerWorker;
    }
           

重新定義了requestYarnContainer流程,使請求包含rack資訊:

AMRMClient.ContainerRequest getContainerRequest(String zone) {
        String[] ipList= ResourceManager.ZONE_IPS.get(zone).split(",");//擷取該rack下的所有iplist
        LOG.debug("request slot from [{}] for zone [{}]",ipList,zone);
        AMRMClient.ContainerRequest request= new AMRMClient.ContainerRequest(
            getContainerResource(),
            ipList,
            null,
            RM_REQUEST_PRIORITY,false);//false:RelaxLocality表示不允許資源降級申請,一定要使其分布在指定的機器上
        containerRequestList.add(request);
        return request;
    }
           

由于yarn傳回的是一個滿足請求的一個資源集合,是以需要在滿足的集合中做資源過濾,将多餘資源傳回給yarn,是以在回調方法onContainersAllocated中:

public void onContainersAllocated(List<Container> containers) {
        runAsync(() -> {
            log.info("Received {} containers with {} pending container requests.", containers.size(), numPendingContainerRequests);
            //final Collection<AMRMClient.ContainerRequest> pendingReques  ts = getPendingRequests();

            //請求到的host
            List<String> requestedHost=new ArrayList<>();
            containers.stream().map(container -> container.getNodeId().getHost()).forEach(requestedHost::add);

            //擷取滿足比對的請求
            final Collection<AMRMClient.ContainerRequest> pendingRequests=containerRequestList.stream().map(containerRequest -> Tuple2.of(containerRequest.getNodes(),containerRequest))
                .filter(tuple2->
                    requestedHost.stream().filter(host->tuple2.f0.contains(host))
                        .count()>0
                )
                  .map(map->map.f1).collect(Collectors.toList());

           int matchRequest=pendingRequests.size();

           log.info("recevied container size : {}, matching request:{}",containers.size(),matchRequest);

            final Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator = pendingRequests.iterator();

            // number of allocated containers can be larger than the number of pending container requests
            //final int numAcceptedContainers = Math.min(containers.size(), numPendingContainerRequests);
            final int numAcceptedContainers = Math.min(matchRequest, numPendingContainerRequests);
            final List<Container> requiredContainers = containers.subList(0, numAcceptedContainers);
            final List<Container> excessContainers = containers.subList(numAcceptedContainers, containers.size());

            for (int i = 0; i < requiredContainers.size(); i++) {
                //removeContainerRequest(pendingRequestsIterator.next());
                AMRMClient.ContainerRequest needRemoveRequest=pendingRequestsIterator.next();
                containerRequestList.remove(needRemoveRequest);
                removeContainerRequest(needRemoveRequest);
            }
      //傳回多餘的資源
            excessContainers.forEach(this::returnExcessContainer);
            requiredContainers.forEach(this::startTaskExecutorInContainer);

            // if we are waiting for no further containers, we can go to the
            // regular heartbeat interval
            if (numPendingContainerRequests <= 0) {
                resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
            }
        });
    }
           

3) 資源比對:預設情況下,在YarnResourceManager中做配置設定到的資源與申請的資源比對時是按照大小進行的,是以需要改為按照rack進行比對

實作:比對的流程在SlotManager.findExactlyMatchingPendingTaskManagerSlot中:

private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile,String zone) {
        for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) {
            LOG.info("zone:{},request_zone:{}",zone,pendingTaskManagerSlot.getResourceProfile().getZone());

            /**
             * 區域比對
             */
            if(zone.equals(pendingTaskManagerSlot.getResourceProfile().getZone())){
                LOG.debug("get resource zone:{},resourceProfile:{}",zone,pendingTaskManagerSlot.getResourceProfile());
                return pendingTaskManagerSlot;
            }
           

完成了這個資源比對過程,并且在後續的流程中由AllocationId完成資源與具體的ExecutionVertex請求比對,就可以将ExecutionVertex部署到比對的機器上。

4) 指定source的消費資料:在資料配置設定中已經将每個task消費的資料指定好了,是以在source端隻需要擷取對應的分區資訊即可,同時需要放棄預設的配置設定政策

實作:FlinkKafkaConsumerBase.open 中:

final List<KafkaTopicPartition> allPartitions = new ArrayList<>();

        //從配置裡面擷取
        Map<String,String> globalMaps=getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap();
        String index2TopicPartitionsStr=globalMaps.get("index2TopicPartitions");
        ObjectMapper objectMapper=new ObjectMapper();

        JsonNode rootNode=objectMapper.readTree(index2TopicPartitionsStr);
        JsonNode topicPartitionNode=rootNode.findValue(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()));

        topicPartitionNode.fieldNames().forEachRemaining(topic->{
            JsonNode partitionsNode=topicPartitionNode.findValue(topic);
            partitionsNode.iterator().forEachRemaining(jsonNode -> {
                allPartitions.add(new KafkaTopicPartition(topic,jsonNode.asInt()));
            });
        });

        allPartitions.stream().forEach(x->{
            LOG.debug("consumer topic:{}, partition:{}",x.getTopic(),x.getPartition());
        });
           

allPartitions 就代表了該task需要消費的資料。

至此整個流程完成。

總結

在實作該方案前,也做過在任務排程後直接在FlinkKafkaConsumerBase中自定義partition的配置設定,即根據機器的所在rack去擷取對應的rack上的資料,但是經常會出現有資料的rack上沒有對應的rack任務,隻能做降級處理,将這些rack上的分區資料配置設定給其他rack上的任務,仍然會有部分的資料跨機房拉取,流量成本消耗縮減效果并不好,是以才做了這個Locality的方案,由于涉及的内容比較多,本文隻提供了一個實作的思路與關鍵的部分代碼。目前的實作方案仍然存在以下幾個限制:

1.一個任務隻能消費一個kafka叢集的資料,由于slot-share機制,不同的JobVertext可以配置設定到同一個Slot上,如果有多個kafka叢集的話,source就會對應多個JobVertex,那麼在後續的JobVertext在申請資源的時候就會尋找前面已經申請到資源的JobVertext,很有可能會比對到其他的rack的資源,目前并未對這塊進行改造。

2.一個TaskExecutor隻配置設定一個Slot,如果有多個slot的話,第一次申請後,後續SlotPool向YarnResourceManager申請資源時,直接發現有可用的Slot就會直接配置設定,很有可能會比對到其他的rack的資源,目前并未對這塊進行改造。

3.如果topic的partition在rack配置設定不均勻,可能會造成流量傾斜,是以需要在topic建立中做好partition的分布。

上一篇: Flink-Watermark
下一篇: 排程

繼續閱讀