Flink1.7.2 Dataset 并行计算源码分析
概述
- 了解Flink处理流程(用户程序 -> JobGrapth -> ExecutionGraph -> JobVertex -> ExecutionVertex -> 并行度 -> Task(DataSourceTask,BatchTask,DataSinkTask)
- 了解ExecutionVetex的构建,Task的构建,执行,任务之间的调用关系
原理分析
输入数据
c a a
b c a
程序
package com.opensourceteams.module.bigdata.flink.example.dataset.worldcount
import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
import org.apache.flink.api.scala.ExecutionEnvironment
/**
* 批处理,DataSet WordCount分析
*/
object WordCountRun {
def main(args: Array[String]): Unit = {
//调试设置超时问题
val env : ExecutionEnvironment= ExecutionEnvironment.createLocalEnvironment(ConfigurationUtil.getConfiguration(true))
env.setParallelism(2)
val dataSet = env.readTextFile("file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/src/main/resources/data/line.txt")
import org.apache.flink.streaming.api.scala._
val result = dataSet.flatMap(x => x.split(" ")).map((_,1)).groupBy(0).sum(1)
result.print()
}
}
源码分析
JobMaster
public JobMaster(
RpcService rpcService,
JobMasterConfiguration jobMasterConfiguration,
ResourceID resourceId,
JobGraph jobGraph,
HighAvailabilityServices highAvailabilityService,
SlotPoolFactory slotPoolFactory,
JobManagerSharedServices jobManagerSharedServices,
HeartbeatServices heartbeatServices,
BlobServer blobServer,
JobManagerJobMetricGroupFactory jobMetricGroupFactory,
OnCompletionActions jobCompletionActions,
FatalErrorHandler fatalErrorHandler,
ClassLoader userCodeLoader) throws Exception {
super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));
final JobMasterGateway selfGateway = getSelfGateway(JobMasterGateway.class);
this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration);
this.resourceId = checkNotNull(resourceId);
this.jobGraph = checkNotNull(jobGraph);
this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
this.highAvailabilityServices = checkNotNull(highAvailabilityService);
this.blobServer = checkNotNull(blobServer);
this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
this.jobCompletionActions = checkNotNull(jobCompletionActions);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.userCodeLoader = checkNotNull(userCodeLoader);
this.jobMetricGroupFactory = checkNotNull(jobMetricGroupFactory);
this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(selfGateway),
rpcService.getScheduledExecutor(),
log);
this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
resourceId,
new ResourceManagerHeartbeatListener(),
rpcService.getScheduledExecutor(),
log);
final String jobName = jobGraph.getName();
final JobID jid = jobGraph.getJobID();
log.info("Initializing job {} ({}).", jobName, jid);
final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
jobGraph.getSerializedExecutionConfig()
.deserializeValue(userCodeLoader)
.getRestartStrategy();
this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration,
jobManagerSharedServices.getRestartStrategyFactory(),
jobGraph.isCheckpointingEnabled());
log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobName, jid);
resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
this.slotPool = checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID());
this.slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
this.registeredTaskManagers = new HashMap<>(4);
this.backPressureStatsTracker = checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker());
this.lastInternalSavepoint = null;
this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
this.jobStatusListener = null;
this.resourceManagerConnection = null;
this.establishedResourceManagerConnection = null;
}
ExecutionGraph
ExecutionGraph.scheduleForExecution()
- 负责Execution的调度,也就是负责把ExecutionGrapth转成ExecutionJobVertex,ExecutionJobVertex转成ExecutionVertex,再转成任务,这是真正的开始逻辑的地方
- 更新当前Job的状态,即更新ExecutionGraph的状态,从CREATED更新到RUNNING
transitionState(JobStatus.CREATED, JobStatus.RUNNING)
Job Flink Java Job at Mon Mar 11 18:57:37 CST 2019 (f24b82ed1ec3e1c90455c342a9dfc21e) switched from state CREATED to RUNNING.
- 默认的作业调度模式 LAZY_FROM_SOURCES,
- LAZY_FROM_SOURCES:从sources开始安排任务。 一旦输入数据准备就绪,就开始下游任务,(刚开始只有Sources任务,下游任务都是未开始的) ;
- EAGER : 立即安排所有任务
- ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
- 调用 ExecutionGraph.scheduleLazy() //延迟调度
public void scheduleForExecution() throws JobException {
final long currentGlobalModVersion = globalModVersion;
if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
final CompletableFuture<Void> newSchedulingFuture;
switch (scheduleMode) {
case LAZY_FROM_SOURCES:
newSchedulingFuture = scheduleLazy(slotProvider);
break;
case EAGER:
newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
break;
default:
throw new JobException("Schedule mode is invalid.");
}
if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
schedulingFuture = newSchedulingFuture;
newSchedulingFuture.whenCompleteAsync(
(Void ignored, Throwable throwable) -> {
if (throwable != null && !(throwable instanceof CancellationException)) {
// only fail if the scheduling future was not canceled
failGlobal(ExceptionUtils.stripCompletionException(throwable));
}
},
futureExecutor);
} else {
newSchedulingFuture.cancel(false);
}
}
else {
throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
}
}
JobStatus
- 作业的状态 CREATED(已创建) -> RUNNING(运行中) -> FINISHED(已完成) 等
/**
* Possible states of a job once it has been accepted by the job manager.
*/
public enum JobStatus {
/** Job is newly created, no task has started to run. */
CREATED(TerminalState.NON_TERMINAL),
/** Some tasks are scheduled or running, some may be pending, some may be finished. */
RUNNING(TerminalState.NON_TERMINAL),
/** The job has failed and is currently waiting for the cleanup to complete */
FAILING(TerminalState.NON_TERMINAL),
/** The job has failed with a non-recoverable task failure */
FAILED(TerminalState.GLOBALLY),
/** Job is being cancelled */
CANCELLING(TerminalState.NON_TERMINAL),
/** Job has been cancelled */
CANCELED(TerminalState.GLOBALLY),
/** All of the job's tasks have successfully finished. */
FINISHED(TerminalState.GLOBALLY),
/** The job is currently undergoing a reset and total restart */
RESTARTING(TerminalState.NON_TERMINAL),
/** The job has been suspended and is currently waiting for the cleanup to complete */
SUSPENDING(TerminalState.NON_TERMINAL),
/**
* The job has been suspended which means that it has been stopped but not been removed from a
* potential HA job store.
*/
SUSPENDED(TerminalState.LOCALLY),
/** The job is currently reconciling and waits for task execution report to recover state. */
RECONCILING(TerminalState.NON_TERMINAL);
// ----------------------------
ScheduleMode
- 作业调度模式,即ExecutionGraph调度模式(LAZY_FROM_SOURCES,EAGER)
- LAZY_FROM_SOURCES:从sources开始安排任务。 一旦输入数据准备就绪,就开始下游任务,(刚开始只有Sources任务,下游任务都是未开始的)
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobgraph;
/**
* The ScheduleMode decides how tasks of an execution graph are started.
*/
public enum ScheduleMode {
/** Schedule tasks lazily from the sources. Downstream tasks are started once their input data are ready */
LAZY_FROM_SOURCES,
/** Schedules all tasks immediately. */
EAGER;
/**
* Returns whether we are allowed to deploy consumers lazily.
*/
public boolean allowLazyDeployment() {
return this == LAZY_FROM_SOURCES;
}
}
ExecutionGraph.scheduleLazy
-
-
0 = jobVertex = {InputFormatVertex@7675} "CHAIN DataSource (at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:19) (org.apache.flink.api.java.io.TextInp) -> FlatMap (FlatMap at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Map (Map at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Combine (SUM(1)) (org.apache.flink.runtime.operators.DataSourceTask)"
1 = jobVertex = {JobVertex@7695} "Reduce (SUM(1)) (org.apache.flink.runtime.operators.BatchTask)"
2 = jobVertex = {OutputFormatVertex@6665} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)"
- ExecutionJobVertex (执行流程:CREATED -> DEPLOYING ),转成对应的Task(执行流程:CREATED -->DEPLOYING --> RUNNING)
verticesInCreationOrder = {ArrayList@6145} size = 3
0 = {ExecutionJobVertex@6484}
stateMonitor = {Object@6608}
graph = {ExecutionGraph@5602}
jobVertex = {InputFormatVertex@6609} "CHAIN DataSource (at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:19) (org.apache.flink.api.java.io.TextInp) -> FlatMap (FlatMap at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Map (Map at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Combine (SUM(1)) (org.apache.flink.runtime.operators.DataSourceTask)"
operatorIDs = {Collections$UnmodifiableRandomAccessList@6610} size = 1
userDefinedOperatorIds = {Collections$UnmodifiableRandomAccessList@6611} size = 1
taskVertices = {ExecutionVertex[2]@6612}
producedDataSets = {IntermediateResult[1]@6614}
inputs = {ArrayList@6616} size = 0
parallelism = 2
slotSharingGroup = {SlotSharingGroup@6617} "SlotSharingGroup [9eb9f93248a641c71df925ec6245124a, b2e911c37d2ae462430e12812fb033ad, a2139d4cdf059b384d883251604a5f2e]"
coLocationGroup = null
inputSplits = {FileInputSplit[2]@6618}
maxParallelismConfigured = true
maxParallelism = 2
serializedTaskInformation = null
taskInformationBlobKey = null
taskInformationOrBlobKey = null
splitAssigner = {LocatableInputSplitAssigner@6620}
1 = {ExecutionJobVertex@6606}
stateMonitor = {Object@6653}
graph = {ExecutionGraph@5602}
jobVertex = {JobVertex@6654} "Reduce (SUM(1)) (org.apache.flink.runtime.operators.BatchTask)"
operatorIDs = {Collections$UnmodifiableRandomAccessList@6655} size = 1
userDefinedOperatorIds = {Collections$UnmodifiableRandomAccessList@6656} size = 1
taskVertices = {ExecutionVertex[2]@6658}
producedDataSets = {IntermediateResult[1]@6659}
inputs = {ArrayList@6660} size = 1
parallelism = 2
slotSharingGroup = {SlotSharingGroup@6617} "SlotSharingGroup [9eb9f93248a641c71df925ec6245124a, b2e911c37d2ae462430e12812fb033ad, a2139d4cdf059b384d883251604a5f2e]"
coLocationGroup = null
inputSplits = null
maxParallelismConfigured = true
maxParallelism = 2
serializedTaskInformation = null
taskInformationBlobKey = null
taskInformationOrBlobKey = null
splitAssigner = null
2 = {ExecutionJobVertex@6607}
stateMonitor = {Object@6664}
graph = {ExecutionGraph@5602}
jobVertex = {OutputFormatVertex@6665} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)"
operatorIDs = {Collections$UnmodifiableRandomAccessList@6666} size = 1
userDefinedOperatorIds = {Collections$UnmodifiableRandomAccessList@6667} size = 1
taskVertices = {ExecutionVertex[2]@6668}
producedDataSets = {IntermediateResult[0]@6669}
inputs = {ArrayList@6670} size = 1
parallelism = 2
slotSharingGroup = {SlotSharingGroup@6617} "SlotSharingGroup [9eb9f93248a641c71df925ec6245124a, b2e911c37d2ae462430e12812fb033ad, a2139d4cdf059b384d883251604a5f2e]"
coLocationGroup = null
inputSplits = null
maxParallelismConfigured = true
maxParallelism = 2
serializedTaskInformation = null
taskInformationBlobKey = null
taskInformationOrBlobKey = null
splitAssigner = null
private CompletableFuture<Void> scheduleLazy(SlotProvider slotProvider) {
final ArrayList<CompletableFuture<Void>> schedulingFutures = new ArrayList<>(numVerticesTotal);
// simply take the vertices without inputs.
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
if (ejv.getJobVertex().isInputVertex()) {
final CompletableFuture<Void> schedulingJobVertexFuture = ejv.scheduleAll(
slotProvider,
allowQueuedScheduling,
LocationPreferenceConstraint.ALL, // since it is an input vertex, the input based location preferences should be empty
Collections.emptySet());
schedulingFutures.add(schedulingJobVertexFuture);
}
}
return FutureUtils.waitForAll(schedulingFutures);
}
ExecutionJobVertex.scheduleAll
- 有几个并行度把ExecutionJobVertex转成对应个数的 ExecutionVertex
- 调用ExecutionVertex.scheduleForExecution() 处理
- Execution 状态为 CREATED
//---------------------------------------------------------------------------------------------
// Actions
//---------------------------------------------------------------------------------------------
/**
* Schedules all execution vertices of this ExecutionJobVertex.
*
* @param slotProvider to allocate the slots from
* @param queued if the allocations can be queued
* @param locationPreferenceConstraint constraint for the location preferences
* @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph.
* Can be empty if the allocation ids are not required for scheduling.
* @return Future which is completed once all {@link Execution} could be deployed
*/
public CompletableFuture<Void> scheduleAll(
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint,
@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {
final ExecutionVertex[] vertices = this.taskVertices;
final ArrayList<CompletableFuture<Void>> scheduleFutures = new ArrayList<>(vertices.length);
// kick off the tasks
for (ExecutionVertex ev : vertices) {
scheduleFutures.add(ev.scheduleForExecution(
slotProvider,
queued,
locationPreferenceConstraint,
allPreviousExecutionGraphAllocationIds));
}
return FutureUtils.waitForAll(scheduleFutures);
}
ExecutionVertex.scheduleForExecution()
- 调用 Execution.scheduleForExecution
/**
* Schedules the current execution of this ExecutionVertex.
*
* @param slotProvider to allocate the slots from
* @param queued if the allocation can be queued
* @param locationPreferenceConstraint constraint for the location preferences
* @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph.
* Can be empty if the allocation ids are not required for scheduling.
* @return Future which is completed once the execution is deployed. The future
* can also completed exceptionally.
*/
public CompletableFuture<Void> scheduleForExecution(
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint,
@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {
return this.currentExecution.scheduleForExecution(
slotProvider,
queued,
locationPreferenceConstraint,
allPreviousExecutionGraphAllocationIds);
}
Execution.scheduleForExecution
- 分配Slot给Execution
final CompletableFuture<Execution> allocationFuture = allocateAndAssignSlotForExecution(
slotProvider,
queued,
locationPreferenceConstraint,
allPreviousExecutionGraphAllocationIds,
allocationTimeout);
- 调用Execution.deploy()函数,部署Execution到分给的slot中
/**
* NOTE: This method only throws exceptions if it is in an illegal state to be scheduled, or if the tasks needs
* to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
* error sets the vertex state to failed and triggers the recovery logic.
*
* @param slotProvider The slot provider to use to allocate slot for this execution attempt.
* @param queued Flag to indicate whether the scheduler may queue this task if it cannot
* immediately deploy it.
* @param locationPreferenceConstraint constraint for the location preferences
* @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph.
* Can be empty if the allocation ids are not required for scheduling.
* @return Future which is completed once the Execution has been deployed
*/
public CompletableFuture<Void> scheduleForExecution(
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint,
@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {
final Time allocationTimeout = vertex.getExecutionGraph().getAllocationTimeout();
try {
final CompletableFuture<Execution> allocationFuture = allocateAndAssignSlotForExecution(
slotProvider,
queued,
locationPreferenceConstraint,
allPreviousExecutionGraphAllocationIds,
allocationTimeout);
// IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
// that we directly deploy the tasks if the slot allocation future is completed. This is
// necessary for immediate deployment.
final CompletableFuture<Void> deploymentFuture = allocationFuture.thenAccept(
(FutureConsumerWithException<Execution, Exception>) value -> deploy());
deploymentFuture.whenComplete(
(Void ignored, Throwable failure) -> {
if (failure != null) {
final Throwable stripCompletionException = ExceptionUtils.stripCompletionException(failure);
final Throwable schedulingFailureCause;
if (stripCompletionException instanceof TimeoutException) {
schedulingFailureCause = new NoResourceAvailableException(
"Could not allocate enough slots within timeout of " + allocationTimeout + " to run the job. " +
"Please make sure that the cluster has enough resources.");
} else {
schedulingFailureCause = stripCompletionException;
}
markFailed(schedulingFailureCause);
}
});
// if tasks have to scheduled immediately check that the task has been deployed
if (!queued && !deploymentFuture.isDone()) {
deploymentFuture.completeExceptionally(new IllegalArgumentException("The slot allocation future has not been completed yet."));
}
return deploymentFuture;
} catch (IllegalExecutionStateException e) {
return FutureUtils.completedExceptionally(e);
}
}
Execution.deploy()
- Execution 状态从SCHDULED到DEPLOYING
- 构建部署对象
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
attemptId,
slot,
taskRestore,
attemptNumber);
- 调用TaskExecutor.submitTask
/**
* Deploys the execution to the previously assigned resource.
*
* @throws JobException if the execution cannot be deployed to the assigned resource
*/
public void deploy() throws JobException {
final LogicalSlot slot = assignedResource;
checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");
// Check if the TaskManager died in the meantime
// This only speeds up the response to TaskManagers failing concurrently to deployments.
// The more general check is the rpcTimeout of the deployment call
if (!slot.isAlive()) {
throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
}
// make sure exactly one deployment call happens from the correct state
// note: the transition from CREATED to DEPLOYING is for testing purposes only
ExecutionState previous = this.state;
if (previous == SCHEDULED || previous == CREATED) {
if (!transitionState(previous, DEPLOYING)) {
// race condition, someone else beat us to the deploying call.
// this should actually not happen and indicates a race somewhere else
throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
}
}
else {
// vertex may have been cancelled, or it was already scheduled
throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous);
}
if (this != slot.getPayload()) {
throw new IllegalStateException(
String.format("The execution %s has not been assigned to the assigned slot.", this));
}
try {
// race double check, did we fail/cancel and do we need to release the slot?
if (this.state != DEPLOYING) {
slot.releaseSlot(new FlinkException("Actual state of execution " + this + " (" + state + ") does not match expected state DEPLOYING."));
return;
}
if (LOG.isInfoEnabled()) {
LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(),
attemptNumber, getAssignedResourceLocation().getHostname()));
}
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
attemptId,
slot,
taskRestore,
attemptNumber);
// null taskRestore to let it be GC'ed
taskRestore = null;
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout);
submitResultFuture.whenCompleteAsync(
(ack, failure) -> {
// only respond to the failure case
if (failure != null) {
if (failure instanceof TimeoutException) {
String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
markFailed(new Exception(
"Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
+ ") not responding after a rpcTimeout of " + rpcTimeout, failure));
} else {
markFailed(failure);
}
}
},
executor);
}
catch (Throwable t) {
markFailed(t);
ExceptionUtils.rethrow(t);
}
}
ExecutionState
- Execution状态 CREATED(已创建) -> SCHEDULED(已调度) -> DEPLOYING(已部署) -> RUNNING(运行中) -> FINISHED(已完成) 等
package org.apache.flink.runtime.execution;
/**
* An enumeration of all states that a task can be in during its execution.
* Tasks usually start in the state {@code CREATED} and switch states according to
* this diagram:
* <pre>{@code
*
* CREATED -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
* | | | |
* | | | +------+
* | | V V
* | | CANCELLING -----+----> CANCELED
* | | |
* | +-------------------------+
* |
* | ... -> FAILED
* V
* RECONCILING -> RUNNING | FINISHED | CANCELED | FAILED
*
* }</pre>
*
* <p>It is possible to enter the {@code RECONCILING} state from {@code CREATED}
* state if job manager fail over, and the {@code RECONCILING} state can switch into
* any existing task state.
*
* <p>It is possible to enter the {@code FAILED} state from any other state.
*
* <p>The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are
* considered terminal states.
*/
public enum ExecutionState {
CREATED,
SCHEDULED,
DEPLOYING,
RUNNING,
/**
* This state marks "successfully completed". It can only be reached when a
* program reaches the "end of its input". The "end of input" can be reached
* when consuming a bounded input (fix set of files, bounded query, etc) or
* when stopping a program (not cancelling!) which make the input look like
* it reached its end at a specific point.
*/
FINISHED,
CANCELING,
CANCELED,
FAILED,
RECONCILING;
public boolean isTerminal() {
return this == FINISHED || this == CANCELED || this == FAILED;
}
}
TaskExecutor.submitTask
- 构建Task,Task 默认的状态为CREATED
Task task = new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId(),
tdd.getAllocationId(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber(),
tdd.getProducedPartitions(),
tdd.getInputGates(),
tdd.getTargetSlotNumber(),
taskExecutorServices.getMemoryManager(),
taskExecutorServices.getIOManager(),
taskExecutorServices.getNetworkEnvironment(),
taskExecutorServices.getBroadcastVariableManager(),
taskStateManager,
taskManagerActions,
inputSplitProvider,
checkpointResponder,
blobCacheService,
libraryCache,
fileCache,
taskManagerConfiguration,
taskMetricGroup,
resultPartitionConsumableNotifier,
partitionStateChecker,
getRpcService().getExecutor());
- 调用task.startTaskThread(); //调用task线程的run()函数
// ----------------------------------------------------------------------
// Task lifecycle RPCs
// ----------------------------------------------------------------------
@Override
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,
Time timeout) {
try {
final JobID jobId = tdd.getJobId();
final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId);
if (jobManagerConnection == null) {
final String message = "Could not submit task because there is no JobManager " +
"associated for the job " + jobId + '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) {
final String message = "Rejecting the task submission because the job manager leader id " +
jobMasterId + " does not match the expected job manager leader id " +
jobManagerConnection.getJobMasterId() + '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {
final String message = "No task slot allocated for job ID " + jobId +
" and allocation ID " + tdd.getAllocationId() + '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
// re-integrate offloaded data:
try {
tdd.loadBigData(blobCacheService.getPermanentBlobService());
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException("Could not re-integrate offloaded TaskDeploymentDescriptor data.", e);
}
// deserialize the pre-serialized information
final JobInformation jobInformation;
final TaskInformation taskInformation;
try {
jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException("Could not deserialize the job or task information.", e);
}
if (!jobId.equals(jobInformation.getJobId())) {
throw new TaskSubmissionException(
"Inconsistent job ID information inside TaskDeploymentDescriptor (" +
tdd.getJobId() + " vs. " + jobInformation.getJobId() + ")");
}
TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(
jobInformation.getJobId(),
jobInformation.getJobName(),
taskInformation.getJobVertexId(),
tdd.getExecutionAttemptId(),
taskInformation.getTaskName(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber());
InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(
jobManagerConnection.getJobManagerGateway(),
taskInformation.getJobVertexId(),
tdd.getExecutionAttemptId(),
taskManagerConfiguration.getTimeout());
TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask(
jobId,
tdd.getAllocationId(),
taskInformation.getJobVertexId(),
tdd.getSubtaskIndex());
final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();
final TaskStateManager taskStateManager = new TaskStateManagerImpl(
jobId,
tdd.getExecutionAttemptId(),
localStateStore,
taskRestore,
checkpointResponder);
Task task = new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId(),
tdd.getAllocationId(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber(),
tdd.getProducedPartitions(),
tdd.getInputGates(),
tdd.getTargetSlotNumber(),
taskExecutorServices.getMemoryManager(),
taskExecutorServices.getIOManager(),
taskExecutorServices.getNetworkEnvironment(),
taskExecutorServices.getBroadcastVariableManager(),
taskStateManager,
taskManagerActions,
inputSplitProvider,
checkpointResponder,
blobCacheService,
libraryCache,
fileCache,
taskManagerConfiguration,
taskMetricGroup,
resultPartitionConsumableNotifier,
partitionStateChecker,
getRpcService().getExecutor());
log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
boolean taskAdded;
try {
taskAdded = taskSlotTable.addTask(task);
} catch (SlotNotFoundException | SlotNotActiveException e) {
throw new TaskSubmissionException("Could not submit task.", e);
}
if (taskAdded) {
task.startTaskThread();
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager already contains a task for id " +
task.getExecutionId() + '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
} catch (TaskSubmissionException e) {
return FutureUtils.completedExceptionally(e);
}
}
Task.run
- 这才开始处理Task,任务的状态用的是ExecutionState中的状态值
- 更新Task状态从CREATED 到 DEPLOYING
- 加载这个Task的jar文件
// first of all, get a user-code classloader
// this may involve downloading the job's JAR files and/or classes
LOG.info("Loading JAR files for task {}.", this);
userCodeClassLoader = createUserCodeClassloader();
final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);
- 构建任务运行环境
Environment env = new RuntimeEnvironment(
jobId,
vertexId,
executionId,
executionConfig,
taskInfo,
jobConfiguration,
taskConfiguration,
userCodeClassLoader,
memoryManager,
ioManager,
broadcastVariableManager,
taskStateManager,
accumulatorRegistry,
kvStateRegistry,
inputSplitProvider,
distributedCacheEntries,
producedPartitions,
inputGates,
network.getTaskEventDispatcher(),
checkpointResponder,
taskManagerConfig,
metrics,
this);
- 更新当前任务状态从 DEPLOYING 到 RUNNING
transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)
- 调用DataSourceTask.invoke(),会根据具体的任务,调用具体任务的函数
/**
* The core work method that bootstraps the task and executes its code.
*/
@Override
public void run() {
// ----------------------------
// Initial State transition
// ----------------------------
while (true) {
ExecutionState current = this.executionState;
if (current == ExecutionState.CREATED) {
if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
// success, we can start our work
break;
}
}
else if (current == ExecutionState.FAILED) {
// we were immediately failed. tell the TaskManager that we reached our final state
notifyFinalState();
if (metrics != null) {
metrics.close();
}
return;
}
else if (current == ExecutionState.CANCELING) {
if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
// we were immediately canceled. tell the TaskManager that we reached our final state
notifyFinalState();
if (metrics != null) {
metrics.close();
}
return;
}
}
else {
if (metrics != null) {
metrics.close();
}
throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
}
}
// all resource acquisitions and registrations from here on
// need to be undone in the end
Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
AbstractInvokable invokable = null;
try {
// ----------------------------
// Task Bootstrap - We periodically
// check for canceling as a shortcut
// ----------------------------
// activate safety net for task thread
LOG.info("Creating FileSystem stream leak safety net for task {}", this);
FileSystemSafetyNet.initializeSafetyNetForThread();
blobService.getPermanentBlobService().registerJob(jobId);
// first of all, get a user-code classloader
// this may involve downloading the job's JAR files and/or classes
LOG.info("Loading JAR files for task {}.", this);
userCodeClassLoader = createUserCodeClassloader();
final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);
if (executionConfig.getTaskCancellationInterval() >= 0) {
// override task cancellation interval from Flink config if set in ExecutionConfig
taskCancellationInterval = executionConfig.getTaskCancellationInterval();
}
if (executionConfig.getTaskCancellationTimeout() >= 0) {
// override task cancellation timeout from Flink config if set in ExecutionConfig
taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
}
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
// ----------------------------------------------------------------
// register the task with the network stack
// this operation may fail if the system does not have enough
// memory to run the necessary data exchanges
// the registration must also strictly be undone
// ----------------------------------------------------------------
LOG.info("Registering task at network: {}.", this);
network.registerTask(this);
// add metrics for buffers
this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
// register detailed network metrics, if configured
if (taskManagerConfig.getConfiguration().getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS)) {
// similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network");
MetricGroup outputGroup = networkGroup.addGroup("Output");
MetricGroup inputGroup = networkGroup.addGroup("Input");
// output metrics
for (int i = 0; i < producedPartitions.length; i++) {
ResultPartitionMetrics.registerQueueLengthMetrics(
outputGroup.addGroup(i), producedPartitions[i]);
}
for (int i = 0; i < inputGates.length; i++) {
InputGateMetrics.registerQueueLengthMetrics(
inputGroup.addGroup(i), inputGates[i]);
}
}
// next, kick off the background copying of files for the distributed cache
try {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
DistributedCache.readFileInfoFromConfig(jobConfiguration)) {
LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
Future<Path> cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId, executionId);
distributedCacheEntries.put(entry.getKey(), cp);
}
}
catch (Exception e) {
throw new Exception(
String.format("Exception while adding files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId), e);
}
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
// ----------------------------------------------------------------
// call the user code initialization methods
// ----------------------------------------------------------------
TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());
Environment env = new RuntimeEnvironment(
jobId,
vertexId,
executionId,
executionConfig,
taskInfo,
jobConfiguration,
taskConfiguration,
userCodeClassLoader,
memoryManager,
ioManager,
broadcastVariableManager,
taskStateManager,
accumulatorRegistry,
kvStateRegistry,
inputSplitProvider,
distributedCacheEntries,
producedPartitions,
inputGates,
network.getTaskEventDispatcher(),
checkpointResponder,
taskManagerConfig,
metrics,
this);
// now load and instantiate the task's invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);
// run the invokable
invokable.invoke();
// make sure, we enter the catch block if the task leaves the invoke() method due
// to the fact that it has been canceled
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
// ----------------------------------------------------------------
// finalization of a successful execution
// ----------------------------------------------------------------
// finish the produced partitions. if this fails, we consider the execution failed.
for (ResultPartition partition : producedPartitions) {
if (partition != null) {
partition.finish();
}
}
// try to mark the task as finished
// if that fails, the task was canceled/failed in the meantime
if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
throw new CancelTaskException();
}
}
catch (Throwable t) {
// unwrap wrapped exceptions to make stack traces more compact
if (t instanceof WrappingRuntimeException) {
t = ((WrappingRuntimeException) t).unwrap();
}
// ----------------------------------------------------------------
// the execution failed. either the invokable code properly failed, or
// an exception was thrown as a side effect of cancelling
// ----------------------------------------------------------------
try {
// check if the exception is unrecoverable
if (ExceptionUtils.isJvmFatalError(t) ||
(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError())) {
// terminate the JVM immediately
// don't attempt a clean shutdown, because we cannot expect the clean shutdown to complete
try {
LOG.error("Encountered fatal error {} - terminating the JVM", t.getClass().getName(), t);
} finally {
Runtime.getRuntime().halt(-1);
}
}
// transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED
// loop for multiple retries during concurrent state changes via calls to cancel() or
// to failExternally()
while (true) {
ExecutionState current = this.executionState;
if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
if (t instanceof CancelTaskException) {
if (transitionState(current, ExecutionState.CANCELED)) {
cancelInvokable(invokable);
break;
}
}
else {
if (transitionState(current, ExecutionState.FAILED, t)) {
// proper failure of the task. record the exception as the root cause
failureCause = t;
cancelInvokable(invokable);
break;
}
}
}
else if (current == ExecutionState.CANCELING) {
if (transitionState(current, ExecutionState.CANCELED)) {
break;
}
}
else if (current == ExecutionState.FAILED) {
// in state failed already, no transition necessary any more
break;
}
// unexpected state, go to failed
else if (transitionState(current, ExecutionState.FAILED, t)) {
LOG.error("Unexpected state in task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, current);
break;
}
// else fall through the loop and
}
}
catch (Throwable tt) {
String message = String.format("FATAL - exception in exception handler of task %s (%s).", taskNameWithSubtask, executionId);
LOG.error(message, tt);
notifyFatalError(message, tt);
}
}
finally {
try {
LOG.info("Freeing task resources for {} ({}).", taskNameWithSubtask, executionId);
// clear the reference to the invokable. this helps guard against holding references
// to the invokable and its structures in cases where this Task object is still referenced
this.invokable = null;
// stop the async dispatcher.
// copy dispatcher reference to stack, against concurrent release
ExecutorService dispatcher = this.asyncCallDispatcher;
if (dispatcher != null && !dispatcher.isShutdown()) {
dispatcher.shutdownNow();
}
// free the network resources
network.unregisterTask(this);
// free memory resources
if (invokable != null) {
memoryManager.releaseAll(invokable);
}
// remove all of the tasks library resources
libraryCache.unregisterTask(jobId, executionId);
fileCache.releaseJob(jobId, executionId);
blobService.getPermanentBlobService().releaseJob(jobId);
// close and de-activate safety net for task thread
LOG.info("Ensuring all FileSystem streams are closed for task {}", this);
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
notifyFinalState();
}
catch (Throwable t) {
// an error in the resource cleanup is fatal
String message = String.format("FATAL - exception in resource cleanup of task %s (%s).", taskNameWithSubtask, executionId);
LOG.error(message, t);
notifyFatalError(message, t);
}
// un-register the metrics at the end so that the task may already be
// counted as finished when this happens
// errors here will only be logged
try {
metrics.close();
}
catch (Throwable t) {
LOG.error("Error during metrics de-registration of task {} ({}).", taskNameWithSubtask, executionId, t);
}
}
}
DataSourceTask.invoke()
- Transformation chain
// start all chained tasks
BatchTask.openChainedTasks(this.chainedTasks, this);
this.chainedTasks = {ArrayList@7851} size = 3
0 = {ChainedFlatMapDriver@7850}
1 = {ChainedMapDriver@7988}
2 = {SynchronousChainedCombineDriver@7989}
- 得到输入分片,读取文件的块位置信息
// get input splits to read
final Iterator<InputSplit> splitIterator = getInputSplits();
- 得到文件位置信息
file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/src/main/resources/data/line.txt:0+6
- 循环读取分片信息,读到的数据是按行的
while (!this.taskCanceled && !format.reachedEnd()) {
OT returned;
if ((returned = format.nextRecord(serializer.createInstance())) != null) {
output.collect(returned);
}
}
/**
* Create an Invokable task and set its environment.
*
* @param environment The environment assigned to this invokable.
*/
public DataSourceTask(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
// --------------------------------------------------------------------
// Initialize
// --------------------------------------------------------------------
initInputFormat();
LOG.debug(getLogString("Start registering input and output"));
try {
initOutputs(getUserCodeClassLoader());
} catch (Exception ex) {
throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " +
ex.getMessage(), ex);
}
LOG.debug(getLogString("Finished registering input and output"));
// --------------------------------------------------------------------
// Invoke
// --------------------------------------------------------------------
LOG.debug(getLogString("Starting data source operator"));
RuntimeContext ctx = createRuntimeContext();
final Counter numRecordsOut;
{
Counter tmpNumRecordsOut;
try {
OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
ioMetricGroup.reuseInputMetricsForTask();
if (this.config.getNumberOfChainedStubs() == 0) {
ioMetricGroup.reuseOutputMetricsForTask();
}
tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
tmpNumRecordsOut = new SimpleCounter();
}
numRecordsOut = tmpNumRecordsOut;
}
Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
((RichInputFormat) this.format).setRuntimeContext(ctx);
LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
((RichInputFormat) this.format).openInputFormat();
LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
}
ExecutionConfig executionConfig = getExecutionConfig();
boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
try {
// start all chained tasks
BatchTask.openChainedTasks(this.chainedTasks, this);
// get input splits to read
final Iterator<InputSplit> splitIterator = getInputSplits();
// for each assigned input split
while (!this.taskCanceled && splitIterator.hasNext())
{
// get start and end
final InputSplit split = splitIterator.next();
LOG.debug(getLogString("Opening input split " + split.toString()));
final InputFormat<OT, InputSplit> format = this.format;
// open input format
format.open(split);
LOG.debug(getLogString("Starting to read input from split " + split.toString()));
try {
final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut);
if (objectReuseEnabled) {
OT reuse = serializer.createInstance();
// as long as there is data to read
while (!this.taskCanceled && !format.reachedEnd()) {
OT returned;
if ((returned = format.nextRecord(reuse)) != null) {
output.collect(returned);
}
}
} else {
// as long as there is data to read
while (!this.taskCanceled && !format.reachedEnd()) {
OT returned;
if ((returned = format.nextRecord(serializer.createInstance())) != null) {
output.collect(returned);
}
}
}
if (LOG.isDebugEnabled() && !this.taskCanceled) {
LOG.debug(getLogString("Closing input split " + split.toString()));
}
} finally {
// close. We close here such that a regular close throwing an exception marks a task as failed.
format.close();
}
completedSplitsCounter.inc();
} // end for all input splits
// close the collector. if it is a chaining task collector, it will close its chained tasks
this.output.close();
// close all chained tasks letting them report failure
BatchTask.closeChainedTasks(this.chainedTasks, this);
}
catch (Exception ex) {
// close the input, but do not report any exceptions, since we already have another root cause
try {
this.format.close();
} catch (Throwable ignored) {}
BatchTask.cancelChainedTasks(this.chainedTasks);
ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
if (ex instanceof CancelTaskException) {
// forward canceling exception
throw ex;
}
else if (!this.taskCanceled) {
// drop exception, if the task was canceled
BatchTask.logAndThrowException(ex, this);
}
} finally {
BatchTask.clearWriters(eventualOutputs);
// --------------------------------------------------------------------
// Closing
// --------------------------------------------------------------------
if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
((RichInputFormat) this.format).closeInputFormat();
LOG.debug(getLogString("Rich Source detected. Closing the InputFormat."));
}
}
if (!this.taskCanceled) {
LOG.debug(getLogString("Finished data source operator"));
}
else {
LOG.debug(getLogString("Data source operator cancelled"));
}
}
DelimitedInputFormat
DelimitedInputFormat.nextRecord
- 调用 DelimitedInputFormat.readLine()
public OT nextRecord(OT record) throws IOException {
if (readLine()) {
return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
} else {
this.end = true;
return null;
}
}
DelimitedInputFormat.readLine()
- 具体读取文件数据的方法,怎么读文件数据的逻辑,在这里
protected final boolean readLine() throws IOException {
if (this.stream == null || this.overLimit) {
return false;
}
int countInWrapBuffer = 0;
// position of matching positions in the delimiter byte array
int delimPos = 0;
while (true) {
if (this.readPos >= this.limit) {
// readBuffer is completely consumed. Fill it again but keep partially read delimiter bytes.
if (!fillBuffer(delimPos)) {
int countInReadBuffer = delimPos;
if (countInWrapBuffer + countInReadBuffer > 0) {
// we have bytes left to emit
if (countInReadBuffer > 0) {
// we have bytes left in the readBuffer. Move them into the wrapBuffer
if (this.wrapBuffer.length - countInWrapBuffer < countInReadBuffer) {
// reallocate
byte[] tmp = new byte[countInWrapBuffer + countInReadBuffer];
System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
this.wrapBuffer = tmp;
}
// copy readBuffer bytes to wrapBuffer
System.arraycopy(this.readBuffer, 0, this.wrapBuffer, countInWrapBuffer, countInReadBuffer);
countInWrapBuffer += countInReadBuffer;
}
this.offset += countInWrapBuffer;
setResult(this.wrapBuffer, 0, countInWrapBuffer);
return true;
} else {
return false;
}
}
}
int startPos = this.readPos - delimPos;
int count;
// Search for next occurrence of delimiter in read buffer.
while (this.readPos < this.limit && delimPos < this.delimiter.length) {
if ((this.readBuffer[this.readPos]) == this.delimiter[delimPos]) {
// Found the expected delimiter character. Continue looking for the next character of delimiter.
delimPos++;
} else {
// Delimiter does not match.
// We have to reset the read position to the character after the first matching character
// and search for the whole delimiter again.
readPos -= delimPos;
delimPos = 0;
}
readPos++;
}
// check why we dropped out
if (delimPos == this.delimiter.length) {
// we found a delimiter
int readBufferBytesRead = this.readPos - startPos;
this.offset += countInWrapBuffer + readBufferBytesRead;
count = readBufferBytesRead - this.delimiter.length;
// copy to byte array
if (countInWrapBuffer > 0) {
// check wrap buffer size
if (this.wrapBuffer.length < countInWrapBuffer + count) {
final byte[] nb = new byte[countInWrapBuffer + count];
System.arraycopy(this.wrapBuffer, 0, nb, 0, countInWrapBuffer);
this.wrapBuffer = nb;
}
if (count >= 0) {
System.arraycopy(this.readBuffer, 0, this.wrapBuffer, countInWrapBuffer, count);
}
setResult(this.wrapBuffer, 0, countInWrapBuffer + count);
return true;
} else {
setResult(this.readBuffer, startPos, count);
return true;
}
} else {
// we reached the end of the readBuffer
count = this.limit - startPos;
// check against the maximum record length
if (((long) countInWrapBuffer) + count > this.lineLengthLimit) {
throw new IOException("The record length exceeded the maximum record length (" +
this.lineLengthLimit + ").");
}
// Compute number of bytes to move to wrapBuffer
// Chars of partially read delimiter must remain in the readBuffer. We might need to go back.
int bytesToMove = count - delimPos;
// ensure wrapBuffer is large enough
if (this.wrapBuffer.length - countInWrapBuffer < bytesToMove) {
// reallocate
byte[] tmp = new byte[Math.max(this.wrapBuffer.length * 2, countInWrapBuffer + bytesToMove)];
System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
this.wrapBuffer = tmp;
}
// copy readBuffer to wrapBuffer (except delimiter chars)
System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, bytesToMove);
countInWrapBuffer += bytesToMove;
// move delimiter chars to the beginning of the readBuffer
System.arraycopy(this.readBuffer, this.readPos - delimPos, this.readBuffer, 0, delimPos);
}
}
}