以 Flink 和 Spark 为代表的散布式流批计算框架的高层资源治理平台逐渐从 Hadoop 生态的 YARN 转向 Kubernetes 生态的 k8s 原生 scheduler 以及周边资源调度器,比如 Volcano 和 Yunikorn 等。这篇文章繁难比拟一下两种计算框架在 Native Kubernetes 的允许和成功上的异同,以及关于运行到消费环境咱们还须要做些什么。
1. 什么是 Native
这里的 native 其实就是计算框架间接向 Kubernetes 放开资源。比如很多跑在 YARN 上方的计算框架,须要自己成功一个 AppMaster 来想 YARN 的 ResourceManager 来放开资源。Native K8s 相当于计算框架自己成功一个相似 AppMaster 的角色向 k8s 去放开资源,当然和 AppMaster 还是有差异的 (AppMaster 须要按 YARN 的规范启动成功)。
2. Spark on k8s 经常使用
提交作业
向 k8s 集群提交作业和往 YARN 上方提交很相似,命令如下,关键区别包括:
--master 参数指定 k8s 集群的 ApiServer须要经过参数 spark.kubernetes.container.image 指定在 k8s 运转作业的 image,指定 main jar,须要 driver 进程可访问:假设 driver 运转在 pod 中,jar 包须要蕴含在镜像中;假设 driver 运转在本地,那么 jar 须要在本地。经过 --name 或许 spark.app.name 指定 app 的名字,作业运转起来之后的 driver 命名会以 app 名字为前缀。当然也可以经过参数 spark.kubernetes.driver.pod.name 间接指定 dirver 的名字
提交完该命令之后,spark-submit 会创立一个 driver pod 和一个对应的 servcie,而后由 driver 创立 executor pod 并运转作业。
deploy-mode
和在 YARN 上方经常使用 Spark 一样,在 k8s 上方也允许 cluster 和 client 两种形式:
cluster mode: driver 在 k8s 集群上方以 pod 方式运转。 client mode: driver 运转在提交作业的中央,而后 driver 在 k8s 集群上方创立 executor。为了保障 executor 能够注册到 driver 上方,还须要提交作业的机器可以和 k8s 集群外部的 executor 网络连通(executor 可以访问到 driver,须要注册)。资源清算
这里的资源指的关键是作业的 driver 和 executor pod。spark 经过 k8s 的 onwer reference 机制将作业的各种资源衔接起来,这样当 driver pod 被删除的时刻,关联的 executor pod 也会被连带删除。但是假设没有 driver pod,也就是以 client 形式运转作业的话,如下两种状况触及到资源清算:
作业运转成功,driver 进程分开,executor pod 运转完智能分开driver 进程被杀掉,executor pod 连不上 driver 也会自行分开可以参考:
依赖治理
前面说到 main jar 包须要在 driver 进程可以访问到的中央,假设是 cluster 形式就须要将 main jar 打包到 spark 镜像中。但是在日常开发和调试中,每次从新 build 一个镜像的 effort 真实是太大了。spark 允许提交的时刻经常使用本地的文件,而后经常使用 s3 等作为中转:先上行上去,而后作业运转的时刻再从 s3 上方下载上去。上方是一个实例。
Pod Template
k8s 的 controller (比如 Deployment,Job)创立 Pod 的时刻依据 spec 中的 pod template 来创立。上方是一个 Job 的示例。
由于咱们经过 spark-submit 提交 spark 作业的时刻,最终的 k8s 资源(driver/executor pod)是由 spark 外部逻辑构建进去的。但是有的时刻咱们想要在 driver/executor pod 上做一些额外的上班,比如参与 sidecar 容器做一些日志搜集的上班。这种场景下 PodTemplate 就是一个比拟好的选用,同时 PodTemplate 也将 spark 和底层基础设备(k8s)解耦开。比如 k8s 颁布新版本允许一些新的特性,那么咱们只需修正咱们的 PodTemplate 即可,而不触及到 spark 的外部改变。
RBAC 全称是 Role-based access control,是 k8s 中的一套权限控制机制。深刻来说:
RBAC 中蕴含了一系列的权限设置,比如 create/delete/watch/list pod 等,这些权限汇合的实体叫 Role 或许 ClusterRole同时 RBAC 还蕴含了角色绑定相关(Role Binding),用于将 Role/ClusterRole 赋予一个或许一组用户,比如 Service Account 或许 UserAccount为了将 Spark 作业在 k8s 集群中运转起来,咱们还须要一套 RBAC 资源:
指定 namespace 下的 serviceaccount定义了权限规定的 Role 或许 ClusterRole,咱们可以经常使用经常出现的 ClusterRole "edit"(对简直一切资源具有操作权限,比如 create/delete/watch 等)绑定相关上方命令在 spark namespace 下为 serviceaccount spark 赋予了操作同 namespace 下其余资源的权限,那么只需 spark 的 driver pod 挂载了该 serviceaccount,它就可以创立 executor pod 了。
上方做一个繁难的演示:
经过如下命令提交作业 SparkPiSleep 到 k8s 集群中。
检查 k8s 集群中的资源
其中第一个就是 executor pod,第二个是 driver 的 pod。除此之外还创立了一个 service,可以经过该 service 访问到 driver pod,比如 Spark UI 都可以这样访问到。
上方再看一下 service owner reference,executor pod 也是相似的。
$kubectlgetsvctest12-9fd3c27b576039ae-driver-svc-nspark-oyamlapiVersion:v1kind:Servicemetadata:creationTimestamp:"2021-08-18T03:48:50Z"name:test12-9fd3c27b576039ae-driver-svcnamespace:spark#service的ownerReference指向了driverpod,只需driverpod被删除,该service也会被删除ownerReferences:-apiVersion:v1controller:truekind:Podname:test12-9fd3c27b576039ae-driveruid:56a50a66-68b5-42a0-b2f6-9a9443665d95resourceVersion:"9975441"uid:06c1349f-be52-4133-80d9-07af34419b1f
3. Flink on k8s 经常使用
Flink on k8s native 的成功允许两种形式:
application mode:在远程 k8s 集群中启动一个 flink 集群(jm 和 tm),driver 运转在 jm 中,也就是只允许 detached 形式,不允许 attached 形式。session mode:在远程 k8s 集群启动一个常驻的 flink 集群(只要 jm),而后向上方提交作业,依据实践状况选择启动多少个 tm。在消费上方经常使用普通不太倡导经常使用 session mode,所以上方关键探讨的是 application mode。
Flink 的 native k8s 形式是不须要指定 tm 个数的,jm 会依据用户的代码计算须要多少 tm。
提交作业
上方是一个繁难的提交命令,须要蕴含:
参数 run-application 指定是 application 形式参数 --target 指定运转在 k8s 上参数 kubernetes.container.image 指定作业运转经常使用的 flink 镜像最后须要指定 main jar,门路是镜像中的门路
资源清算
Flink 的 native 形式会先创立一个 JobManager 的 deployment,并将其托管给 k8s。同一个作业一切的相关资源的 owner reference 都指向该 Deployment,也就是说删除了该 deployment,一切相关的资源都会被清算掉。上方依据作业的运转状况探讨一下资源如何清算。
作业运转到终态(SUCCESS,FAILED,CANCELED 等)之后,Flink 会清算掉一切作业JobManager 进程启动失败(pod 中的 jm 容器启动失败),由于控制器是 Deployment,所以会不时重复拉起运转环节中,假设 JobManager 的 pod 被删除,Deployment 会从新拉起运转环节中,假设 JobManager 的 Deployment 被删除,那么关联的一切 k8s 资源都会被删除
Pod Template
Flink native 形式也允许 Pod Template,相似 Spark。
相似 Spark。
依赖文件治理
Flink 临时只允许 main jar 以及依赖文件在镜像中。也就是说用户要提交作业须要自己定制化镜像,体验不是很好。一种 workaroud 的方式是联合 PodTemplate:
假设依赖是本地文件,须要 upload 到一个 remote 存储做中转,比如各大云厂商的对象存储。假设依赖是远端文件,不须要 upload。运转时在 template 中经常使用 initContainer 将用户的 jar 以及依赖文件下载到 Flink 容器中,并加到 classpath 下运转。Flink 的作业 demo 就不在演示了。
4. Spark on Kubernetes 成功
Spark on Kubernetes 的成功比拟繁难:
Spark Client 创立一个 k8s pod 运转 driverdriver 创立 executor pod,而后开局运转作业作业运转完结之后 driver pod 进入到 Completed 形态,executor pod 会被清算掉。作业完结之后经过 driver pod 咱们还是可以检查 driver pod 的。
代码成功
Spark 的 native k8s 实现代码在 resource-managers/kubernetes module 中。咱们可以从 SparkSubmit 的代码开局剖析。咱们关键看一下 deploy-mode 为 cluster 形式的代码逻辑。
首先依据 spark.master 性能中 scheme 来判别是不是 on k8s。咱们上方也看到这特性能的方式为 --master k8s://。假设是 on k8s 的 cluster 形式,则去加载 Class org.apache.spark.deploy.k8s.submit.KubernetesClientApplication,并运转其中的 start 方法。childArgs 方法的**逻辑繁难来说就是依据 spark-submit 提交的参数结构出 driver pod 提交到 k8s 运转。
上方的代码的**就是最后创立 Client 并运转。这个 Client 是 Spark 封装进去的 Client,内置了 k8s client。
.watch(watcher)//Sendthelatestpodstateweknowtothewatchertomakesurewedidn'tmissanythingwatcher.eventReceived(Action.MODIFIED,podWithName.get())//Breakthewhileloopifthepodiscompletedorwedon'twanttowait//依据参数"spark.kubernetes.submission.waitAppCompletion"判别能否须要分开if(watcher.watchOrStop(sId)){watch.close()break}}}}
上方再繁难引见一下 Driver 如何治理 Executor 的流程。当 Spark Driver 运转 main 函数时,会创立一个 SparkSession,SparkSession 中蕴含了 SparkContext,SparkContext 须要创立一个 SchedulerBackend 会治理 Executor 的生命周期。对应到 k8s 上的 SchedulerBackend 其实就是 KubernetesClusterSchedulerBackend,上方关键看一下这个 backend 是如何创立进去的。大胆猜想一下,大略率也是依据 spark.master 的 url 的 scheme "k8s" 创立的。
上方是 SparkContext 创立 SchedulerBackend 的**代码逻辑。
.createTaskScheduler(sc,masterUrl)//上方创立进去的KubernetesClusterManager这里会创立出KubernetesClusterSchedulerBackendvalthrownewSparkException("Externalschedulercannotbeinstantiated",e)}}//方法getClsuterManager会经过ServiceLoader加载一切成功ExternalClusterManager的ClusterManager(KubernetesClusterManager和YarnClusterManager),而后经过masterurl启动filter,选出KubernetesClusterManagerprivatedefgetClusterManager(url:String):Option[ExternalClusterManager]={val
前面就是 KubernetesClusterSchedulerBackend 治理 Executor 的逻辑了。
可以繁难看一下创立 Executor 的代码逻辑。
5. Flink on Kubernetes 成功
Flink 的 Native K8s 成功:
Flink Client 创立 JobManager 的 Deployment,而后将 Deployment 托管给 k8sk8s 的 Deployment Controller 创立 JobManager 的 PodJobManager 内的 ResourceManager 担任先 Kubernetes Scheduler 恳求资源并创立 TaskManager 等相关资源并创立相关的 TaskManager Pod 并开局运转作业当作业运转到终态之后一切相关的 k8s 资源都被清算掉代码(基于分支 release-1.13)成功关键如下:
CliFrontend 作为 Flink Client 的入口依据命令行参数 run-application 判别经过方法 runApplication 去创立 ApplicationClusterKubernetesClusterDescriptor 经过方法 deployApplicationCluster 创立 JobManager 相关的 Deployment 和一些必要的资源JobManager 的成功类 JobMaster 经过 ResourceManager 调用类 KubernetesResourceManagerDriver 中的方法 requestResource 创立 TaskManager 等资源其中 KubernetesClusterDescriptor 成功自 interface ClusterDescriptor ,用来形容对 Flink 集群的操作。依据底层的资源经常使用不同, ClusterDescriptor 有不同的成功,包括 KubernetesClusterDescriptor、YarnClusterDescriptor、StandaloneClusterDescriptor。
上方繁难看一下 KubernetesClusterDescriptor 的**逻辑:创立 Application 集群。
deployApplicationCluster(finalClusterSpecificationclusterSpecification,finalApplicationConfigurationapplicationConfiguration)throwsClusterDeploymentException{//查问flink集群在k8s中能否存在if(client.getRestService(clusterId).isPresent()){thrownewClusterDeploymentException("TheFlinkcluster"+clusterId+"alreadyexists.");}finalKubernetesDeploymentTarget?ClusterEntrypoint.ExecutionMode.DETACHED:ClusterEntrypoint.ExecutionMode.NORMAL;flinkConfig.setString(ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE,executionMode.toString());flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS,entryPoint);//Rpc,blob,rest,taskManagerRpcportsneedtobeexposed,soupdatethemtofixedvalues.//将端口指定为固定值,繁难k8s的资源构建。由于pod的隔离性,所以没有端口抵触KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig,BlobServerOptions.PORT,Constants.BLOB_SERVER_PORT);KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig,TaskManagerOptions.RPC_PORT,Constants.TASK_MANAGER_RPC_PORT);KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig,RestOptions.BIND_PORT,Constants.REST_PORT);//HA性能if(HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)){flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID,clusterId);KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig,HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE,flinkConfig.get(JobManagerOptions.PORT));}try{finalKubernetesJobManagerParameters.buildKubernetesJobManagerSpecification(podTemplate,kubernetesJobManagerParameters);//**逻辑:在k8s中创立包括JobManagerDeployment在内k8s资源,比如Service和ConfigMapclient.createJobManagerComponent(kubernetesJobManagerSpec);returncreateClusterClientProvider(clusterId);}catch(Exceptione){//...}}}
上方代码中须要说的在构建 JobManager 的时刻补充 PodTemplate。繁难来说 PodTemplate 就是一个 Pod 文件。
第三步的 TaskManager 创立就不再赘述了。
7. 生态
这里生态这个词或许也不太适合,这里关键指的的假设要在消费上方经常使用该性能还有哪些可以做的。上方关键探讨在消费环境上方用来做 trouble-shooting 的两特性能:日志和监控。
日志
日志搜集关于线上系统是十分关键的一环,毫不夸张地说,80% 的缺点都可以经过日志查到要素。但是前面也说过,Flink 作业在作业运转到终态之后会清算掉一切资源,Spark 作业运转完只会保管 Driver Pod 的日志,那么咱们如何搜集到完整的作业日志呢?
有几种打算可供选用:
DaemonSet。每个 k8s 的 node 上方以 DaemonSet 方式部署日志搜集 agent,对 node 上方运转的一切容器日志启动一致搜集,并存储到相似 ElasticSearch 的一致日志搜查平台。SideCar。经常使用 Flink/Spark 提供的 PodTemplate 性能在主容器侧性能一个 SideCar 容器用来启动日志搜集,最后存储到一致的日志服务外面。这两种方式都有一个前提是有其余的日志服务提供存储、甚至搜查的性能,比如 ELK,或许各大云厂商的日志服务。
除此之外还有一种繁难的方式可以思索:应用 log4j 的裁减机制,自定义 log appender,在 appender 中定制化 append 逻辑,将日志间接搜集并存储到 remote storage,比如 hdfs,对象存储等。这种打算须要将自定义的 log appender 的 jar 包放到运转作业的 ClassPath 下,而且这种方式有或许会影响作业干流程的运转效率,对性能比拟敏感的作业并不太倡导经常使用这种方式。
监控
目前 Prometheus 曾经成为 k8s 生态的监控理想规范,上方咱们的探讨也是探讨如何将 Flink/Spark 的作业的目的对接到 Prometheus。上方先看一下 Prometheus 的架构。
其中的**在于 Prometheus Servier 搜集目的的方式是 pull 还是 push:
关于常驻的进程,比如在线服务,普通由 Prometheus Server 被动去进程暴显露来的 api pull 目的。关于会完结的进程目的搜集,比如 batch 作业,普通经常使用进程被动 push 的方式。详细流程是进程将目的 push 到常驻的 PushGateway,而后 Prometheus Server 去 PushGateway pull 目的。上方两种经常使用方式也是 Prometheus 官网倡导的经常使用方式,但是看完形容不难发现其实第一种场景也可以经常使用第二种处置方式。只不过第二种方式由于 PushGateway 是常驻的,对其稳固性要求会比拟高。
Flink 同时提供了 PrometheusReporter (将目的经过 api 泄露,由 Prometheus Server 来被动 pull 数据) 和 PrometheusPushGatewayReporter (将目的被动 push 给 PushGateway,Prometheus Server 不须要感知 Flink 作业)。
这两种方式中 PrometheusPushGatewayReporter 会更繁难一点,但是 PushGateway 或许会成为瓶颈。假设经常使用 PrometheusReporter 的方式,须要引入服务发现机制协助 Prometheus Server 智能发现运转的 Flink 作业的 Endpoint。Prometheus 目前允许的干流的服务发现机制关键有:
基于 Consul。Consul 是基于 etcd 的一套完整的服务注册与发现处置打算,要经常使用这种方式,咱们须要 Flink 对接 Consul。比如咱们在提交作业的时刻,将作业对应的 Service 启动捕捉并写入 Consul。基于文件。文件也就是 Prometheus 的性能文件,外面性能须要拉取 target 的 endpoint。文件这种方式原本是比拟鸡肋的,由于它须要 Prometheus Server 和 Flink 作业同时都可以访问,但是须要文件是 local 的。但是在 k8s 环境中,基于文件反而变的比拟繁难,咱们可以将 ConfigMap 挂载到 Prometheus Server 的 Pod 上方,Flink 作业修正 ConfigMap 就可以了。基于 Kubernetes 的服务发现机制。Kubernetes 的服务发现机制繁难来说就是 label select。可以参考
关于 Prometheus 允许的更多服务发现机制,可以参考:,繁难列举包括:
azureconsuldigitaloceandockerdockerswarmdnsec2eurekafilegcehetznerhttpkubernetes...
以批计算为代表的 Spark 经常使用 PushGateway 的方式来对接 Prometheus 是比拟好的方式,但是 Spark 官网并没有提供对 PushGateway 的允许,只允许了 Prometheus 的 Exporter,须要 Prometheus Server 被动去 pull 数据。
这里介绍经常使用基于 Kubernetes 的服务发现机制。
须要留意的是 Prometheus Server 拉取目的是按固定时时期隔启动拉取的,关于继续时期比拟短的批作业,有或许存在还没有拉取目的,作业就完结的状况。
8. 缺点
虽然 Spark 和 Flink 都成功了 native k8s 的形式,详细成功略有差异。但是在实践经常使用上发现两者的实如今某些场景下还是略有缺点的。
pod 不具有容错性 spark-submit 会先构建一个 k8s 的 driver pod,而后由 driver pod 启动 executor 的 pod。但是在 k8s 环境中并不太倡导间接构建 pod 资源,由于 pod 不具有容错性,pod 所在节点挂了之后 pod 就挂了。相熟 k8s scheduler 的同窗应该知道 pod 有一个字段叫 podName,scheduler 的**是为 pod 填充这个字段,也就是为 pod 选用一个适合的 node。一旦调度成功之后 pod 的该字段就固定上去了。这也是 pod 不具有 node 容错的要素。
Deployment 语义。 Deployment 可以以为是 ReplicaSet 的增强版,而 ReplicaSet 的官网定义如下。
A ReplicaSet's purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.
繁难来说,ReplicaSet 的目的是保障几个相反的 Pod 正本可以不连续的运转,说是为了线上服务量身定制的也不为过(线上服务最好是有形态且允许原地重启,比如 WebService)。但是虽然 Flink 以流式作业为主,但是咱们并不能繁难地将流式作业同等于有形态的 WebService。比如 Flink 作业的 Main Jar 假设写的有疑问,会造成 JobManager 的 Pod 不时启动失败,但是由于是 Deployment 语义的疑问会不时被重启。这个或许是 ByDesign 的,但是觉得并不太好。
Batch 作业处置。 由于 Flink 作业运转完一切资源包括 Deployment 都会被清算掉,拿不到最终的作业形态,不知道成功有否(流作业的话中止就可以以为是失败了)。关于这个疑问可以应用 Flink 自身的归档性能,将结果归档到外部的文件系统(兼容 s3 协定,比如阿里云对象存储 oss)中。触及到的性能如下:
s3.access-keys3.secret-keys3.regions3.endpointjobmanager.archive.fs.dir假设不想引入外部系统的话,须要变革 Flink 代码在作业运转成功之后将数据写到 k8s 的 api object 中,比如 ConfigMap 或许 Secret。
作业日志。 Spark 作业运转完结之后 Executor Pod 被清算掉,Driver Pod 被保管,咱们可以经过它检查到 Driver 的日志。Flink 作业完结之后就什么日志都检查不到了。
9. 总结
本文从经常使用方式、源码成功以及在消费系统上方如何补足周边系统地引见了 Spark 和 Flink 在 k8s 生态上的成功、通常以及对比。但是限于篇幅,很多内容来不迭探讨了,比如 shuffle 如何处置。假设你们公司也在做这方面的上班,置信还是有很多参考价值的,也欢迎留言交换。
另外,YARN 的时代曾经过去了, on k8s scheduler 将成为大数据计算以及 AI 框架的标配。但是 k8s scheduler 这种天生为在线服务设计的调度器在吞吐上方有很大的无余,并不是很符合大数据作业。k8s 社区的批调度器 kube-batch,以及基于 kube-batch 衍生进去的 Volcano 调度器,基于 YARN 的调度算法成功的 k8s 生态调度器 Yunikorn 也逐渐在大数据 on k8s 场景下锋芒毕露,不过这些都是后话了,前面有时期再专门写文章启动剖析对比。