Flink On K8S
本文主要介绍公司 Flink 1.10 接入 K8S 的一些经验,于 2020 年 2 月发布的 flink 1.10 进一步增加对容器的支持,特别是支持原生的 K8S 形态部署,至此 Flink On K8S 可支持三种部署方式:
- Flink session cluster on Kubernetes:job manager 和 task manager 节点以 K8S deployment 的形式长期运行,俗称会话集群,用户可以往会话集群提交多个作业。
- Flink job cluster on Kubernetes: 将作业制作成对应的镜像,每次启动一个作业,均需要创建一个 flink 集群,该集群只运行一个作业,当作业结束后,销毁 flink 集群,这是蘑菇街选择的方式。
- Native Kubernetes Setup:前两种方式需要用户提前指定资源规格,资源申请过少导致任务无法正常运行,反之则存在浪费。该场景支持按需动态的申请资源,优化资源的利用,但目前处于 beta 阶段。
Flink session cluster on Kubernetes
官网文档就 Flink session cluster on Kubernetes 给出了详细的介绍和部署步骤,本文简要介绍下原理。在该模式下,job manager 和 task manager 均以 deployment 方式部署,外部通过 node port 访问 job manager 的 ui。之后集群将长期运行,由 deployment 保证 job 和 task 的副本数,同时建议将 checkpoint 定期保存在 HDFS,从而保证集群的高可靠。由于集群长期运行,task manager 无需频繁扩缩,故适合生命周期的较短的作业。
Flink job cluster on Kubernetes
Job cluster 为每个作业创建一个专属集群和分配独立的资源,首先需要将作业代码制作成镜像,然后基于该镜像启动容器实例,其中 job manager 以 K8S job 形式运行,task manager 以 K8S deployment 形式运行,容器启动后,自动执行对应路径下的业务代码,为了提升可靠性,建议将 checkpoint 定期保存在 HDFS 中。当作业完成后,需要用户手动清理 job 和 deployment,这种方式下适合生命周期较长的作业,公司的 flink 作业运行时间较长,所以选择该方式,整体而言,这种方式比较成熟稳定。由于官网的步骤比较含糊,故下文介绍完整的步骤。
制作镜像
我们基于 flink:1.10-scala_2.11 制作业务镜像,首先将业务代码制作成 jar 包,下载 hadoop jar 包,设置 hdfs 的配置文件 hdfs-site.xml,业务镜像的 Dockerfile 如下。
FROM flink:1.10-scala_2.11
RUN mkdir -p /etc/hadoop/conf/
ADD hdfs-site.xml /etc/hadoop/conf/hdfs-site.xml
ADD flink-shaded-hadoop-2-uber-2.8.3-8.0.jar $FLINK_HOME/lib/hadoop.jar
ADD job.jar $FLINK_HOME/lib/
创建集群
创建集群前需要为 Job manager 创建 service,它有四个端口,其中 rpc 和 blob 端口用于和 task manager 之间的通信,query 和 ui 端口用于对外部提供服务。当前基于 node port 类型的 service 存在两点缺陷:
- job manager 和 task manager 直接的通信需要经过 iptables 或者 ipvs 转发,增加网络开销。事实上,headless 的 service 既能满足服务发现的要求,又能减少网络开销。
- Serverless K8S(如 AWS EKS) 一般不支持 node port 类型的 service,不利于 flink 往云上迁移。
增对上述两个缺点,故建议对 rpc 和 blob 端口采用 headless service,对 ui 和 port 端口采用 node port 类型的 service。如下两个 patch 正在解决此类问题:
另外在容器环境下,jvm 获取的内存信息通常是宿主机的信息,可以通过 taskmanager.memory.process.size 设置成 pod 的规格解决。
本文通过如下 job-cluster-service.yaml 创建 Headless service:
apiVersion: v1
kind: Service
metadata:
name: flink-job-cluster
labels:
app: flink
component: job-cluster
spec:
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
clusterIP: None
selector:
app: flink
component: job-cluster
为 ui 和 query 端口创建 node port service:
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
ports:
- name: rest
port: 8081
nodePort: 30081
- name: query
port: 6125
nodePort: 30025
type: NodePort
selector:
app: flink
component: jobmanager
通过如下 job-cluster.yaml 创建 job manager。
apiVersion: batch/v1
kind: Job
metadata:
name: job-cluster
spec:
template:
metadata:
labels:
app: flink
component: job-cluster
spec:
restartPolicy: OnFailure
containers:
- name: job-cluster
image: {your image}
command: ["/opt/flink/bin/standalone-job.sh"]
args: ["start-foreground",
"-Djobmanager.rpc.address=job-cluster",
"-Dparallelism.default={job parallelism}",
"-Dblob.server.port=6124",
"-Dqueryable-state.server.ports=6125"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 6125
name: query
- containerPort: 8081
name: ui
通过如下 task-manager.yaml 创建 task manager。
apiVersion: apps/v1
kind: Deployment
metadata:
name: task-manager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: task-manager
template:
metadata:
labels:
app: flink
component: task-manager
spec:
containers:
- name: flink-task-manager
image: {your image}
command: ["/opt/flink/bin/taskmanager.sh"]
args: ["start-foreground",
"-Djobmanager.rpc.address=job-cluster"]
Native Kubernetes Setup
Native Kubernetes Setup 是更理想 Flink On K8S 部署方式,支持资源的动态分配,但是处于 beta 阶段,故暂不采纳。官网给出了详细的部署介绍,本文不再累述。其原理简要如下:
- (图 1-2 步骤)采用 K8S deployment 创建 job manager。
- (图 3)用户向 job manager 提交作业。
- (图 4-10)收到作业请求后,job manager 根据当前资源池的情况决定是否需要扩容资源。如果资源充足,将任务下发到 task manager 节点。如果资源不足,需要向 K8S 申请扩容 task manager,最后再将任务下发给给 task manager。