Flink 源码分析 — StreamExecutionEnvironment

Wei Wang
74 min readJan 29, 2019

--

StreamExecutionEnvironment类表示运行一个Flink任务所需的环境,分为本地LocalStreamEnvironment和远程RemoteStreamEnvironment两种。这个环境让我们可以配置参数来控制如何运行Flink任务。我们来看看这个环境类中有哪些内容:

private final ExecutionConfig config = new ExecutionConfig();

StreamExecutionEnvironment包含一个ExecutionConfig实例,负责设置默认的任务并发度(当一个function没有显式指定时适用),失败重试次数及间隔,数据传递模式(batch或pipelined),开启UDF代码分析模式,注册序列化方式等等配置。

特别一提的是ClosureCleaner,开启后可以分析用户代码,将不需要的closure置为null,从而在大多数情况下使得闭包或匿名类可以序列化。用户代码必须是可以序列化的,以做到在集群不同节点之间传输任务。

之后是一个针对checkpoint检查点功能的配置类CheckpointConfig

private final CheckpointConfig checkpointCfg = new CheckpointConfig();

该配置包含checkpoint模式(默认EXACTLY_ONCE),checkpoint超时时限,触发间隔,并发checkpoint数量,清理持久化的checkpoint文件(任务取消时删除还是保留对应checkpoint),失败处理策略等等配置。

下一个变量transformations值得关注,它是保存了该任务所有的StreamTransformation实例的集合。

protected final List<StreamTransformation<?>> transformations = new ArrayList<>();

一个StreamTransformation<T>表示会生成一个DataStream的操作单元,每一个DataStream<T>都包含指向其生成源StreamTransformation<T>的引用。调用DataStream的方法(比如map)时,Flink会根据计算拓扑结构生成一个由StreamTransformation组成的树状结构,只有当真正执行任务计算时用StreamGraphGenerator将其转换为StreamGraph。需要注意的是,并不是所有的方法都会产生实际操作单元,比如unionsplitselectrebalancepartition等对操作单元进行归类,整理的操作不会生成操作单元。稍后再来看看StreamGraph相关的源码,我们先看看StreamTransformation<T>类:

每个实例有一个int类型的唯一id,通过一个static的递增的idCounter获得(应该是线程安全的吧,都没有并发保护)。同时还有一个String类型的uid,由用户指定并且在任务重启前后保持一致。

protected static Integer idCounter = 0;

public static int getNewNodeId() {
idCounter++;
return idCounter;
}

输出类型通过TypeInformation类封装,用来生成序列化用的serializers和比较大小用的comparators,以及进行一些类型检查。

protected TypeInformation<T> outputType;

以下变量可以设置资源需求,ResourceSpec类可以指定该StreamTransformation所需的资源,包括CPU数量,heap内存,direct内存,native内存等。

private ResourceSpec minResources = ResourceSpec.DEFAULT;
private ResourceSpec preferredResources = ResourceSpec.DEFAULT;

hashCode和equals方法被重写如下:

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof StreamTransformation)) {
return false;
}

StreamTransformation<?> that = (StreamTransformation<?>) o;

if (bufferTimeout != that.bufferTimeout) {
return false;
}
if (id != that.id) {
return false;
}
if (parallelism != that.parallelism) {
return false;
}
if (!name.equals(that.name)) {
return false;
}
return outputType != null ? outputType.equals(that.outputType) : that.outputType == null;
}

@Override
public int hashCode() {
int result = id;
result = 31 * result + name.hashCode();
result = 31 * result + (outputType != null ? outputType.hashCode() : 0);
result = 31 * result + parallelism;
result = 31 * result + (int) (bufferTimeout ^ (bufferTimeout >>> 32));
return result;
}

大概了解完StreamTransformation类了,让我们回到StreamExecutionEnvironment类。存储键值对及状态快照的组件被抽象为StateBackend类。该类必须是可以序列化的,因为需要和任务代码一起被发送到多个分布式的节点上并行运行。因此通常AbstractStateBackend的子类实现是以工厂模式的形式,保证序列化及反序列化之后能够还原正确的状态并指向正确的存储服务,这样做会很轻量级,提高序列化的效率。StateBackend的实现也必须是线程安全的,以便多个Operator并发地使用。

通过设置bufferTimeout可以控制输出缓存flush的间隔,用来平衡延迟和吞吐量。

再来看看构造计算拓扑DAG用的API。addSource方法用来添加一个数据源到计算任务中。默认情况下数据源是非并行的,用户需要实现ParallelSourceFunction接口或者继承RichParallelSourceFunction来实现可并行的数据源。

addSource方法将一个StreamFunction封装为StreamSource,当数据源开始执行时调用SourceFunction#run(SourceContext<T> ctx)方法,持续地向SourceContext发送生成的数据。

public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {

// code omitted...

boolean isParallel = function instanceof ParallelSourceFunction;

clean(function);
StreamSource<OUT, ?> sourceOperator;
if (function instanceof StoppableFunction) {
sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
} else {
sourceOperator = new StreamSource<>(function);
}

return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}

如何生成整个任务的DAG计算图呢?getStreamGraph方法会调用StreamGraphGenerator#generate方法,使用StreamExecutionEnvironment及其包含的所有transformations生成计算图。

@Internal
public StreamGraph getStreamGraph() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
return StreamGraphGenerator.generate(this, transformations);
}
public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
return new StreamGraphGenerator(env).generateInternal(transformations);
}
private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
for (StreamTransformation<?> transformation: transformations) {
// 遍历所有transformation并转换为计算图
transform(transformation);
}
return streamGraph;
}

具体转换的方法在StreamGraphGenerator#transform方法中定义,直接返回已经被转换过的实例,否则根据StreamTransformation的具体类型调用相应处理逻辑:

private Collection<Integer> transform(StreamTransformation<?> transform) {   // 直接返回已经完成转换的实例
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}

LOG.debug("Transforming " + transform);

if (transform.getMaxParallelism() <= 0) {

// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from theExecutionConfig.
int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}

// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();
// 根据不同的类型调用相应的转换逻辑
Collection<Integer> transformedIds;
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}

// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}

if (transform.getBufferTimeout() >= 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
}
if (transform.getUid() != null) {
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}

if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}

return transformedIds;
}

首先来看看transformOneInputTransform的实现。它首先拿到输入的StreamTransformation实例,递归地调用transform处理输入实例,然后决定当前实例属于哪个资源共享组(slot),将其添加为DAG中的一个Operator(同时添加相对应的edge),并设置partitioning所需的key serializer和该Operator的并发度。

private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {   // 递归地处理输入的StreamTransformation实例
Collection<Integer> inputIds = transform(transform.getInput());

// the recursive call might have already transformed this
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
// 判断该实例属于哪一个资源共享slot槽位
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
// 添加相应的Operator到DAG中
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getCoLocationGroupKey(),
transform.getOperator(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());

if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}

streamGraph.setParallelism(transform.getId(), transform.getParallelism());
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

for (Integer inputId: inputIds) {
// 添加相应的边到DAG中
streamGraph.addEdge(inputId, transform.getId(), 0);
}

return Collections.singleton(transform.getId());
}

其他StreamTransform类型的处理方式大致也是这几个步骤:1)递归处理输入的StreamTransform实例,2)确定资源共享slot槽位,3)添加Operator节点和相应的edge到DAG中,4)设置并行度和partition所需的serializer等。

最后一步就是通过调用StreamExecutionEnvironment#execute方法真正启动任务了。本地模式和远程模式分别实现了execute方法,先来看看本地模式的实现:

@Override
public JobExecutionResult execute(String jobName) throws Exception {
// transform the streaming program into a JobGraph
// 将定义的StreamTransformation集合转换为DAG计算图
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
// 将DAG计算图转换为任务图
JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setAllowQueuedScheduling(true);

Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");

// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);

if (!configuration.contains(RestOptions.PORT)) {
configuration.setInteger(RestOptions.PORT, 0);
}

int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();

if (LOG.isInfoEnabled()) {
LOG.info("Running job on local embedded Flink mini cluster");
}

MiniCluster miniCluster = new MiniCluster(cfg);

try {
miniCluster.start();
configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());

return miniCluster.executeJobBlocking(jobGraph);
}
finally {
transformations.clear();
miniCluster.close();
}
}

在执行前必须把StreamGraph转换成JobGraph,大致需要以下几个步骤:

1)计算各个节点的哈希值
2)如果可行,串联合并相邻的计算步骤以提高执行效率
3)设置计算图中的边并保存到配置中
4)设置资源共享槽位和相关联节点的位置
5)配置Checkpoint机制
6)将用户提供的文件上传到分布式缓存中

public JobGraph getJobGraph(@Nullable JobID jobID) {
// temporarily forbid checkpointing for iterative jobs
if (isIterative() && checkpointConfig.isCheckpointingEnabled() && !checkpointConfig.isForceCheckpointing()) {
throw new UnsupportedOperationException(
"Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "
+ "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
}

return StreamingJobGraphGenerator.createJobGraph(this, jobID);
}
public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
}
private JobGraph createJobGraph() {

// make sure that all vertices start immediately
jobGraph.setScheduleMode(ScheduleMode.EAGER);

// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}

Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
// 将可以串联合并的Operator合并
setChaining(hashes, legacyHashes, chainedOperatorHashes);
// 设置任务图中的边,并且写入配置
setPhysicalEdges();
// 设置资源共享槽位和相关联节点的位置
setSlotSharingAndCoLocation();

// 配置Checkpoint机制
configureCheckpointing();

// 将用户提供的文件上传到分布式缓存中 JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(), jobGraph);

// set the ExecutionConfig last when it has been finalized
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}

return jobGraph;
}

得到JobGraph之后,我们需要提交它到一个实现了JobExecutor接口的服务。本地模式使用MiniCluster类实现JobExecutor#executeJobBlocking(JobGraph job)来执行JobGraph

@Override
public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job, "job is null");

final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);

final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
(JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));

final JobResult jobResult;

try {
jobResult = jobResultFuture.get();
} catch (ExecutionException e) {
throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e));
}

try {
return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new JobExecutionException(job.getJobID(), e);
}
}

上面的方法调用submitJob方法提交任务,具体步骤包括开启queued scheduling,上传任务所需的jar文件到Blob文件服务端,向DispatcherGateway提交任务。代码如下:

public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
final DispatcherGateway dispatcherGateway;
try {
dispatcherGateway = getDispatcherGateway();
} catch (LeaderRetrievalException | InterruptedException e) {
ExceptionUtils.checkInterrupted(e);
return FutureUtils.completedExceptionally(e);
}

// we have to allow queued scheduling in Flip-6 mode because we need to request slots
// from the ResourceManager
jobGraph.setAllowQueuedScheduling(true);

final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGateway);

final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);

final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose(
(Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout));

return acknowledgeCompletableFuture.thenApply(
(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
}

在Dispatcher类中,提交过程实现如下:

@Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
return internalSubmitJob(jobGraph).whenCompleteAsync((acknowledge, throwable) -> {
if (throwable != null) {
cleanUpJobData(jobGraph.getJobID(), true);
}
}, getRpcService().getExecutor());
}

private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
final JobID jobId = jobGraph.getJobID();

log.info("Submitting job {} ({}).", jobId, jobGraph.getName());
final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;

try {
jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
} catch (IOException e) {
return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), e));
}

if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunnerFutures.containsKey(jobId)) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
} else {
final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobId, jobGraph, this::persistAndRunJob)
.thenApply(ignored -> Acknowledge.get());

return persistAndRunFuture.exceptionally(
(Throwable throwable) -> {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
log.error("Failed to submit job {}.", jobId, strippedThrowable);
throw new CompletionException(
new JobSubmissionException(jobId, "Failed to submit job.", strippedThrowable));
});
}
}

persistAndRunJob方法保存提交的JobGraphSubmittedJobGraph,然后运行任务,当任务执行时抛出异常则删除该任务。

private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph));

final CompletableFuture<Void> runJobFuture = runJob(jobGraph);

return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
if (throwable != null) {
submittedJobGraphStore.removeJobGraph(jobGraph.getJobID());
}
}));
}
private CompletableFuture<Void> runJob(JobGraph jobGraph) {
Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));

final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);

jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);

return jobManagerRunnerFuture
.thenApply(FunctionUtils.nullFn())
.whenCompleteAsync(
(ignored, throwable) -> {
if (throwable != null) {
jobManagerRunnerFutures.remove(jobGraph.getJobID());
}
},
getMainThreadExecutor());
}
private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
final RpcService rpcService = getRpcService();

final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(() ->
jobManagerRunnerFactory.createJobManagerRunner(
ResourceID.generate(),
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
blobServer,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler)
),
rpcService.getExecutor());

return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
}

我们看到它创建了一个JobManagerRunner实例,为该任务创建一个JobMaster实例,同时创建ExecutionGraph并保存在JobMaster中。最终是startJobManagerRunner方法真正地开始执行任务。

private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
jobManagerRunner.getResultFuture().whenCompleteAsync(
(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
// check if we are still the active JobManagerRunner by checking the identity
//noinspection ObjectEquality
if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) {
if (archivedExecutionGraph != null) {
jobReachedGloballyTerminalState(archivedExecutionGraph);
} else {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);

if (strippedThrowable instanceof JobNotFinishedException) {
jobNotFinished(jobId);
} else {
jobMasterFailed(jobId, strippedThrowable);
}
}
} else {
log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
}
}, getMainThreadExecutor());

jobManagerRunner.start();

return jobManagerRunner;
}

JobManagerRunnerstart方法体现了分布式系统中主从一致性的处理方式。JobManagerRunner类本身实现了LeaderContender接口,顾名思义是拥有参与leader竞争的功能。它调用start方法会将自身传递给LeaderElectionServicestart方法启动竞选服务并尝试成为leader,当成功成为leader时该服务回调竞选者(JobManagerRunner)的grantLeadership方法,从而调用verifyJobSchedulingStatusAndStartJobManager方法启动对应的JobMaster的执行过程。

public void start() throws Exception {
try {
// 竞争leader,成功时竞选服务会回调grantLeadership方法
leaderElectionService.start(this);
} catch (Exception e) {
log.error("Could not start the JobManager because the leader election service did not start.", e);
throw new Exception("Could not start the leader election service.", e);
}
}
@Override
public void grantLeadership(final UUID leaderSessionID) {
synchronized (lock) {
if (shutdown) {
log.info("JobManagerRunner already shutdown.");
return;
}

try {
verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
} catch (Exception e) {
handleJobManagerRunnerError(e);
}
}
}
private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) throws Exception {
final JobSchedulingStatus jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobGraph.getJobID());

if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
log.info("Granted leader ship but job {} has been finished. ", jobGraph.getJobID());
jobFinishedByOther();
} else {
log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());

runningJobsRegistry.setJobRunning(jobGraph.getJobID());

final CompletableFuture<Acknowledge> startFuture = jobMaster.start(new JobMasterId(leaderSessionId), rpcTimeout);
final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;

startFuture.whenCompleteAsync(
(Acknowledge ack, Throwable throwable) -> {
if (throwable != null) {
handleJobManagerRunnerError(new FlinkException("Could not start the job manager.", throwable));
} else {
confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture);
}
},
jobManagerSharedServices.getScheduledExecutorService());
}
}
public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId, final Time timeout) throws Exception {
// make sure we receive RPC and async calls
super.start();

return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), timeout);
}
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
validateRunsInMainThread();

checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");

if (Objects.equals(getFencingToken(), newJobMasterId)) {
log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);

return Acknowledge.get();
}

setNewFencingToken(newJobMasterId);

startJobMasterServices();

log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID());

resetAndScheduleExecutionGraph();

return Acknowledge.get();
}

resetAndScheduleExecutionGraph方法将JobGraph转换为ExecutionGraph并安排执行。

private void resetAndScheduleExecutionGraph() throws Exception {
validateRunsInMainThread();
final CompletableFuture<Void> executionGraphAssignedFuture; if (executionGraph.getState() == JobStatus.CREATED) {
executionGraphAssignedFuture = CompletableFuture.completedFuture(null);
} else {
suspendAndClearExecutionGraphFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled."));
final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
final ExecutionGraph newExecutionGraph = createAndRestoreExecutionGraph(newJobManagerJobMetricGroup);
executionGraphAssignedFuture = executionGraph.getTerminationFuture().handleAsync(
(JobStatus ignored, Throwable throwable) -> {
assignExecutionGraph(newExecutionGraph, newJobManagerJobMetricGroup);
return null;
},
getMainThreadExecutor());
} executionGraphAssignedFuture.thenRun(this::scheduleExecutionGraph);
}

createAndRestoreExecutionGraph方法会根据JobGraph创建一个新的ExecutionGraph,并从保存的检查点恢复其状态。ExecutionGraphBuilder.buildGraph方法描述了创建过程。JobGraph中的所有节点将按照拓扑顺序添加到已有或新建的ExecutionGraph中:

private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws Exception {   ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup);   final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();   if (checkpointCoordinator != null) {
// check whether we find a valid checkpoint
if (!checkpointCoordinator.restoreLatestCheckpointedState(
newExecutionGraph.getAllVertices(),
false,
false)) {
// check whether we can restore from a savepoint
// 尝试从检查点恢复ExecutionGraph的状态
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
}
}
return newExecutionGraph;
}
private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws JobExecutionException, JobException {
return ExecutionGraphBuilder.buildGraph(
null,
jobGraph,
jobMasterConfiguration.getConfiguration(),
scheduledExecutorService,
scheduledExecutorService,
slotPool.getSlotProvider(),
userCodeLoader,
highAvailabilityServices.getCheckpointRecoveryFactory(),
rpcTimeout,
restartStrategy,
currentJobManagerJobMetricGroup,
blobServer,
jobMasterConfiguration.getSlotRequestTimeout(),
log);
}

具体添加过程在ExecutionGraph.attachJobGraph方法中定义。

public void attachJobGraph(List<JobVertex> topologicallySorted) throws JobException {   LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
"vertices and {} intermediate results.",
topologicallySorted.size(), tasks.size(), intermediateResults.size());
final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologicallySorted.size());
final long createTimestamp = System.currentTimeMillis();
for (JobVertex jobVertex : topologicallySorted) { if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
this.isStoppable = false;
}
// create the execution job vertex and attach it to the graph
ExecutionJobVertex ejv = new ExecutionJobVertex(
this,
jobVertex,
1,
rpcTimeout,
globalModVersion,
createTimestamp);
ejv.connectToPredecessors(this.intermediateResults); ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
if (previousTask != null) {
throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
jobVertex.getID(), ejv, previousTask));
}
for (IntermediateResult res : ejv.getProducedDataSets()) {
IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
if (previousDataSet != null) {
throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
res.getId(), res, previousDataSet));
}
}
this.verticesInCreationOrder.add(ejv);
this.numVerticesTotal += ejv.getParallelism();
newExecJobVertices.add(ejv);
}
terminationFuture = new CompletableFuture<>();
failoverStrategy.notifyNewVertices(newExecJobVertices);
}

创建好ExecutionGraph后就可以调用scheduleExecutionGraph安排执行了。Flink支持两种执行模式,LAZY_FROM_SOURCE模式只有在一个Operator的输入数据就绪时才初始化该节点,EAGER模式会在一开始就按拓扑顺序加载计算图中的所有节点。

private void scheduleExecutionGraph() {
checkState(jobStatusListener == null);
// register self as job status change listener
jobStatusListener = new JobManagerJobStatusListener();
executionGraph.registerJobStatusListener(jobStatusListener);
try {
executionGraph.scheduleForExecution();
}
catch (Throwable t) {
executionGraph.failGlobal(t);
}
}
public void scheduleForExecution() throws JobException {

final long currentGlobalModVersion = globalModVersion;

if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {

final CompletableFuture<Void> newSchedulingFuture;

switch (scheduleMode) {

case LAZY_FROM_SOURCES:
// 只有在一个Operator节点的输入数据就绪时才初始化
newSchedulingFuture = scheduleLazy(slotProvider);
break;

case EAGER:
// 一开始就初始化所有Operator节点
newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
break;

default:
throw new JobException("Schedule mode is invalid.");
}

if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
schedulingFuture = newSchedulingFuture;

newSchedulingFuture.whenCompleteAsync(
(Void ignored, Throwable throwable) -> {
if (throwable != null && !(throwable instanceof CancellationException)) {
// only fail if the scheduling future was not canceled
failGlobal(ExceptionUtils.stripCompletionException(throwable));
}
},
futureExecutor);
} else {
newSchedulingFuture.cancel(false);
}
}
else {
throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
}
}

EAGER模式的初始化会按照拓扑顺序依次为每一个ExecutionJobVertex(异步地)分配资源,分配完成后会返回一个Execution集合表示该任务的一次执行,并依次调用Execution.deploy部署到分配好到资源上。

private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) {
checkState(state == JobStatus.RUNNING, "job is not running currently");

// Important: reserve all the space we need up front.
// that way we do not have any operation that can fail between allocating the slots
// and adding them to the list. If we had a failure in between there, that would
// cause the slots to get lost
final boolean queued = allowQueuedScheduling;

// collecting all the slots may resize and fail in that operation without slots getting lost
final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());

final Set<AllocationID> allPreviousAllocationIds =
Collections.unmodifiableSet(computeAllPriorAllocationIdsIfRequiredByScheduling());

// allocate the slots (obtain all their futures
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
// these calls are not blocking, they only return futures
Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
slotProvider,
queued,
LocationPreferenceConstraint.ALL,
allPreviousAllocationIds,
timeout);

allAllocationFutures.addAll(allocationFutures);
}

// this future is complete once all slot futures are complete.
// the future fails once one slot future fails.
final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);

final CompletableFuture<Void> currentSchedulingFuture = allAllocationsFuture
.thenAccept(
(Collection<Execution> executionsToDeploy) -> {
for (Execution execution : executionsToDeploy) {
try {
execution.deploy();
} catch (Throwable t) {
throw new CompletionException(
new FlinkException(
String.format("Could not deploy execution %s.", execution),
t));
}
}
})
// Generate a more specific failure message for the eager scheduling
.exceptionally(
(Throwable throwable) -> {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
final Throwable resultThrowable;

if (strippedThrowable instanceof TimeoutException) {
int numTotal = allAllocationsFuture.getNumFuturesTotal();
int numComplete = allAllocationsFuture.getNumFuturesCompleted();
String message = "Could not allocate all requires slots within timeout of " +
timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete;

resultThrowable = new NoResourceAvailableException(message);
} else {
resultThrowable = strippedThrowable;
}

throw new CompletionException(resultThrowable);
});

return currentSchedulingFuture;
}

好的,已经很接近整个流程到终点了!Execution类代表一次具体的执行,来看看它是怎么部署的。

public void deploy() throws JobException {
final LogicalSlot slot = assignedResource;

checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");

// Check if the TaskManager died in the meantime
// This only speeds up the response to TaskManagers failing concurrently to deployments.
// The more general check is the rpcTimeout of the deployment call
if (!slot.isAlive()) {
throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
}

// make sure exactly one deployment call happens from the correct state
// note: the transition from CREATED to DEPLOYING is for testing purposes only
ExecutionState previous = this.state;
if (previous == SCHEDULED || previous == CREATED) {
if (!transitionState(previous, DEPLOYING)) {
// race condition, someone else beat us to the deploying call.
// this should actually not happen and indicates a race somewhere else
throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
}
}
else {
// vertex may have been cancelled, or it was already scheduled
throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous);
}

if (this != slot.getPayload()) {
throw new IllegalStateException(
String.format("The execution %s has not been assigned to the assigned slot.", this));
}

try {

// race double check, did we fail/cancel and do we need to release the slot?
if (this.state != DEPLOYING) {
slot.releaseSlot(new FlinkException("Actual state of execution " + this + " (" + state + ") does not match expected state DEPLOYING."));
return;
}

if (LOG.isInfoEnabled()) {
LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(),
attemptNumber, getAssignedResourceLocation()));
}

final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
attemptId,
slot,
taskRestore,
attemptNumber);

// null taskRestore to let it be GC'ed
taskRestore = null;

final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout);

submitResultFuture.whenCompleteAsync(
(ack, failure) -> {
// only respond to the failure case
if (failure != null) {
if (failure instanceof TimeoutException) {
String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';

markFailed(new Exception(
"Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
+ ") not responding after a rpcTimeout of " + rpcTimeout, failure));
} else {
markFailed(failure);
}
}
},
executor);
}
catch (Throwable t) {
markFailed(t);
ExceptionUtils.rethrow(t);
}
}

TaskManagerGateway接口定义了和TaskManager通信的方法,有两种具体实现,分别基于Actor模式和RPC模式。基于RPC的实现会包含一个TaskExecutorGateway的实现类TaskExecutor来代理提交任务的实际工作。

@Override
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,
Time timeout) {

try {
final JobID jobId = tdd.getJobId();
final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId);

if (jobManagerConnection == null) {
final String message = "Could not submit task because there is no JobManager " +
"associated for the job " + jobId + '.';

log.debug(message);
throw new TaskSubmissionException(message);
}

if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) {
final String message = "Rejecting the task submission because the job manager leader id " +
jobMasterId + " does not match the expected job manager leader id " +
jobManagerConnection.getJobMasterId() + '.';

log.debug(message);
throw new TaskSubmissionException(message);
}

if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {
final String message = "No task slot allocated for job ID " + jobId +
" and allocation ID " + tdd.getAllocationId() + '.';
log.debug(message);
throw new TaskSubmissionException(message);
}

// re-integrate offloaded data:
try {
// 从文件服务中读取该任务所需的数据 tdd.loadBigData(blobCacheService.getPermanentBlobService());
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException("Could not re-integrate offloaded TaskDeploymentDescriptor data.", e);
}

// deserialize the pre-serialized information
// 反序列化数据以初始化任务
final JobInformation jobInformation;
final TaskInformation taskInformation;
try {
jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException("Could not deserialize the job or task information.", e);
}

if (!jobId.equals(jobInformation.getJobId())) {
throw new TaskSubmissionException(
"Inconsistent job ID information inside TaskDeploymentDescriptor (" +
tdd.getJobId() + " vs. " + jobInformation.getJobId() + ")");
}

TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(
jobInformation.getJobId(),
jobInformation.getJobName(),
taskInformation.getJobVertexId(),
tdd.getExecutionAttemptId(),
taskInformation.getTaskName(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber());

InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(
jobManagerConnection.getJobManagerGateway(),
taskInformation.getJobVertexId(),
tdd.getExecutionAttemptId(),
taskManagerConfiguration.getTimeout());

TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();

LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();

final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask(
jobId,
tdd.getAllocationId(),
taskInformation.getJobVertexId(),
tdd.getSubtaskIndex());

final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();

final TaskStateManager taskStateManager = new TaskStateManagerImpl(
jobId,
tdd.getExecutionAttemptId(),
localStateStore,
taskRestore,
checkpointResponder);

Task task = new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId(),
tdd.getAllocationId(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber(),
tdd.getProducedPartitions(),
tdd.getInputGates(),
tdd.getTargetSlotNumber(),
taskExecutorServices.getMemoryManager(),
taskExecutorServices.getIOManager(),
taskExecutorServices.getNetworkEnvironment(),
taskExecutorServices.getBroadcastVariableManager(),
taskStateManager,
taskManagerActions,
inputSplitProvider,
checkpointResponder,
blobCacheService,
libraryCache,
fileCache,
taskManagerConfiguration,
taskMetricGroup,
resultPartitionConsumableNotifier,
partitionStateChecker,
getRpcService().getExecutor());

log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());

boolean taskAdded;

try {
taskAdded = taskSlotTable.addTask(task);
} catch (SlotNotFoundException | SlotNotActiveException e) {
throw new TaskSubmissionException("Could not submit task.", e);
}

if (taskAdded) {
task.startTaskThread();

return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager already contains a task for id " +
task.getExecutionId() + '.';

log.debug(message);
throw new TaskSubmissionException(message);
}
} catch (TaskSubmissionException e) {
return FutureUtils.completedExceptionally(e);
}
}

至此,startTaskThread方法就真正地启动任务对应的线程运行了。由于篇幅限制,下一篇文章再来分析Task类及其子类的细节。

至此我们可以看到,一个任务的DAG计算图大致经历以下三个过程:

  1. StreamGraph
    最接近代码所表达的逻辑层面的计算拓扑结构,按照用户代码的执行顺序向StreamExecutionEnvironment添加StreamTransformation构成流式图。
  2. JobGraph
    StreamGraph生成,将可以串联合并的节点进行合并,设置节点之间的边,安排资源共享slot槽位和放置相关联的节点,上传任务所需的文件,设置检查点配置等。相当于经过部分初始化和优化处理的任务图。
  3. ExecutionGraph
    由JobGraph转换而来,包含了任务具体执行所需的内容,是最贴近底层实现的执行图。

--

--

Wei Wang

Distributed Storage Systems R&D. C++/Java/Scala/Golang/Python