2026/4/18 8:01:48
网站建设
项目流程
南昌网站改版,南山区住房和建设局官方网站,金融机构网站建设费用,网站快慢由什么决定前文中#xff0c;我们已经了解了 Flink 的三种执行图是怎么生成的。今天继续看一下 Flink 集群是如何启动的。
启动脚本
集群启动脚本的位置在#xff1a;
flink-dist/src/main/flink-bin/bin/start-cluster.sh脚本会负责启动 JobManager 和 TaskManager#xff0c;我们主要…前文中我们已经了解了 Flink 的三种执行图是怎么生成的。今天继续看一下 Flink 集群是如何启动的。启动脚本集群启动脚本的位置在flink-dist/src/main/flink-bin/bin/start-cluster.sh脚本会负责启动 JobManager 和 TaskManager我们主要关注 standalone 启动模式具体的流程见下图。从图中可以看出 JobManager 是通过 jobmanager.sh 文件启动的TaskManager 是通过taskmanager.sh 启动的两者都调用了 flink-daemon.sh通过传递不同的参数最终运行不同的 Java 类。case$DAEMONin(taskexecutor)CLASS_TO_RUNorg.apache.flink.runtime.taskexecutor.TaskManagerRunner;;(zookeeper)CLASS_TO_RUNorg.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer;;(historyserver)CLASS_TO_RUNorg.apache.flink.runtime.webmonitor.history.HistoryServer;;(standalonesession)CLASS_TO_RUNorg.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;;(standalonejob)CLASS_TO_RUNorg.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint;;(sql-gateway)CLASS_TO_RUNorg.apache.flink.table.gateway.SqlGatewaySQL_GATEWAY_CLASSPATHfindSqlGatewayJar:findFlinkPythonJar;;(*)echoUnknown daemon ${DAEMON}.$USAGE.exit1;;esacJobManager 启动流程在 StandaloneSessionClusterEntrypoint 的 main 方法中主要就是加载各种配置和环境变量然后调用 ClusterEntrypoint.runClusterEntrypoint 来启动集群。跟着调用链一直找到 ClusterEntrypoint.runCluster 方法这里会启动 ResourceManager、DispatcherRunner 等组件。privatevoidrunCluster(Configurationconfiguration,PluginManagerpluginManager)throwsException{synchronized(lock){// 初始化各种服务initializeServices(configuration,pluginManager);// 创建 DispatcherResourceManagerComponentFactory// 包含了三个核心组件的 Factory// DispatcherRunnerFactory、ResourceManagerFactory、RestEndpointFactoryfinalDispatcherResourceManagerComponentFactorydispatcherResourceManagerComponentFactorycreateDispatcherResourceManagerComponentFactory(configuration);// 启动 ResourceManager、DispatcherRunner、WebMonitorEndpointclusterComponentdispatcherResourceManagerComponentFactory.create(configuration,resourceId.unwrap(),ioExecutor,commonRpcService,haServices,blobServer,heartbeatServices,delegationTokenManager,metricRegistry,executionGraphInfoStore,newRpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),failureEnrichers,this);// 关闭服务clusterComponent.getShutDownFuture().whenComplete((ApplicationStatusapplicationStatus,Throwablethrowable)-{if(throwable!null){shutDownAsync(ApplicationStatus.UNKNOWN,ShutdownBehaviour.GRACEFUL_SHUTDOWN,ExceptionUtils.stringifyException(throwable),false);}else{// This is the general shutdown path. If a separate more// specific shutdown was// already triggered, this will do nothingshutDownAsync(applicationStatus,ShutdownBehaviour.GRACEFUL_SHUTDOWN,null,true);}});}}下面来详细看一下这几个方法 initializeServices 就是负责初始化各种服务有几个比较重要的可以着重关注下// 初始化并启动一个通用的 RPC ServicecommonRpcServiceRpcUtils.createRemoteRpcService(...);// 创建一个 IO 线程池线程数量位 CPU 核数 * 4ioExecutorExecutors.newFixedThreadPool(...);// 创建 HA 服务组件根据配置初始化 Standalone、ZK、K8S 三种haServicescreateHaServices(configuration,ioExecutor,rpcSystem);// 创建并启动 blobServer,blobServer 可以理解为是 Flink 内部的blobServerBlobUtils.createBlobServer(...);blobServer.start();// 创建心跳服务heartbeatServicescreateHeartbeatServices(configuration);// 创建一个监控服务processMetricGroupMetricUtils.instantiateProcessMetricGroup(...);createDispatcherResourceManagerComponentFactory 这个方法就是创建了三个工厂类不需要过多介绍。我们重点关注 dispatcherResourceManagerComponentFactory.create 方法即 ResourceManager、DispatcherRunner、WebMonitorEndpoint 是如何启动的。WebMonitorEndpointWebMonitorEndpoint 的启动流程图如下图中细箭头代表同一个方法中顺序调用粗箭头代表进入上一个方法内部的调用。WebMonitorEndpoint 创建和启动步骤如下通过工厂创建出了 WebMonitorEndpoint这里就是比较常规的初始化操作。调用 WebMonitorEndpoint 的 start 方法开始启动start 方法内部先是创建了一个 Router 并调用 initializeHandlers 创建了一大堆 handler是真的一大堆这个方法有接近一千行都是在创建 handler创建完成之后对 handler 进行排序和去重再把它们都注册到 Router 中。这里排序是为了确保路由匹配的正确性排序规则是先静态路径/jobs/overview后动态路径/jobs/:jobid假如我们没有排序先注册了 /jobs/:jobid 后注册 /jobs/overview 这时当我们请求 /jobs/overview 时就会被错误的路由到 /jobs/:jobid 上去。是调用 startInternal 方法在 startInternal 方法内部只有 leader 选举和启动缓存清理任务两个步骤。ResourceManagerResourceManager 创建和启动步骤如下调用 ResourceManagerServiceImpl.create 方法创建 ResourceManagerService这里只是创建 ResourceManager 服务实际创建 ResourceManager 在后面的步骤中。调用 resourceManagerService.start 方法启动服务这里就是启动选主服务standalne 模式直接调用 grantLeadership 成为 leader。成为 leader 后就会调用 startNewLeaderResourceManager 方法这个方法中会调用 resourceManagerFactory.createResourceManager 正式创建 resourceManager。创建完成后就会调用 resourceManager.start 来启动它。启动后会回调 ResourceManager.onStart 方法。这里调用 startHeartbeatServices 启动了两个心跳服务一个是 ResourceManager 和 TaskManager 之间的心跳一个是 ResourceManager 和 JobManager 之间的心跳然后会启动 SlotManager。SlotManager 可以被当作 Flink 集群的资源调度中心。它会负责管理集群中的所有 Slot 资源也需要响应 JobManager 的资源请求。DispatcherRunner先创建工厂创建完成后调用 DefaultDispatcherRunner.create 创建出 DispatcherRunner接着是调用 start 启动选主流程。选主完成后就调用 startNewDispatcherLeaderProcess 启动新的流程。启动新的流程需要先关闭旧流程然后创建新的 dispatcherLeaderProcess并调用 start 启动。启动时会回调 onStart 方法。回调方法中先启动 executionPlanStore它主要是用于持久化 JobGraph。然后恢复执行计划重建状态如果是从失败中恢复实例化 Dispatcher完成作业启动。TaskManager 启动流程TaskManager 是 Flink 的执行节点其最小执行单元是 slot。TaskManager 启动流程也主要是和资源管理相关包括 slot 列表的管理和与 ResourceManager 的通信。TaskManager 启动流程大体分为以下几部分构建并启动 TaskManagerRunner蓝色部分启动 TaskExecutor红色部分完成与 ResourceManager 的连接橙色部分启动 TaskManagerRunner在 TaskManagerRunner 的 start 方法中有两个步骤第一步是调用 startTaskManagerRunnerServices 创建和启动了很多服务这一点和 JobManager 的启动流程比较像。这些服务包括了高可用服务、心跳服务、监控指标服务等这里也创建了 taskExecutorService它的启动在第二步。第二步是调用 taskExecutorService.start 方法启动 TaskExecutorService它内部主要负责启动 TaskExecutor。启动 TaskExecutorTaskExecutor 是 TaskManager 内部的一个核心组件负责帮助 TaskManager 完成 task 的部署和执行等核心操作。在上一步调用 taskExecutor 的 start 方法后会回调 onStart 方法这里主要是三个步骤连接 ResourceManager 以及注册监听启动 taskSlotTable连接 JobMaster 以及注册监听第一步我们在下面详细解释。第二步启动的 TaskSlotTable 是 TaskManager 中负责资源的核心组件它维护了一个 Slot 列表管理每个 Slot 的状态负责 Slot 的分配和释放。第三步主要是和 JobMaster 建立连接并保持心跳同时也会接收 Slot 申请的请求。连接 ResourceManagerTaskExecutor 注册完监听之后会收到 ResourceManagerLeaderListener.notifyLeaderAddress 方法回调。回调方法中会创建一个 TaskExecutorToResourceManagerConnection 实例并启动它。这个类是用来将 TaskExecutor 注册到 ResourceManager注册成功会回调 onRegistrationSuccess 方法。回调成功的方法中TaskManager 会调用 resourceManagerGateway.sendSlotReport 将 Slot 的状态进行上报。总结本文介绍了 Flink 集群在 Standalone 模式下的启动过程其中 JobManager 重点介绍了 WebMonitorEndpoint、ResourceManager 和 DispatcherRunner 这三个组件的启动过程。TaskManager 主要介绍了启动 TaskExecutor 和连接 ResourceManager 的过程。