本文主要介绍公司 Flink 1.10 接入 K8S 的一些经验,于 2020 年 2 月发布的 flink 1.10 进一步增加对容器的支持,特别是支持原生的 K8S 形态部署,至此 Flink On K8S 可支持三种部署方式:

  • Flink session cluster on Kubernetes:job manager 和 task manager 节点以 K8S deployment 的形式长期运行,俗称会话集群,用户可以往会话集群提交多个作业。
  • Flink job cluster on Kubernetes: 将作业制作成对应的镜像,每次启动一个作业,均需要创建一个 flink 集群,该集群只运行一个作业,当作业结束后,销毁 flink 集群,这是蘑菇街选择的方式。
  • Native Kubernetes Setup:前两种方式需要用户提前指定资源规格,资源申请过少导致任务无法正常运行,反之则存在浪费。该场景支持按需动态的申请资源,优化资源的利用,但目前处于 beta 阶段。

官网文档就 Flink session cluster on Kubernetes 给出了详细的介绍和部署步骤,本文简要介绍下原理。在该模式下,job manager 和 task manager 均以 deployment 方式部署,外部通过 node port 访问 job manager 的 ui。之后集群将长期运行,由 deployment 保证 job 和 task 的副本数,同时建议将 checkpoint 定期保存在 HDFS,从而保证集群的高可靠。由于集群长期运行,task manager 无需频繁扩缩,故适合生命周期的较短的作业。

Job cluster 为每个作业创建一个专属集群和分配独立的资源,首先需要将作业代码制作成镜像,然后基于该镜像启动容器实例,其中 job manager 以 K8S job 形式运行,task manager 以 K8S deployment 形式运行,容器启动后,自动执行对应路径下的业务代码,为了提升可靠性,建议将 checkpoint 定期保存在 HDFS 中。当作业完成后,需要用户手动清理 job 和 deployment,这种方式下适合生命周期较长的作业,公司的 flink 作业运行时间较长,所以选择该方式,整体而言,这种方式比较成熟稳定。由于官网的步骤比较含糊,故下文介绍完整的步骤。

制作镜像

我们基于 flink:1.10-scala_2.11 制作业务镜像,首先将业务代码制作成 jar 包,下载 hadoop jar 包,设置 hdfs 的配置文件 hdfs-site.xml,业务镜像的 Dockerfile 如下。

FROM flink:1.10-scala_2.11

RUN mkdir -p /etc/hadoop/conf/

ADD hdfs-site.xml /etc/hadoop/conf/hdfs-site.xml
ADD flink-shaded-hadoop-2-uber-2.8.3-8.0.jar $FLINK_HOME/lib/hadoop.jar
ADD job.jar $FLINK_HOME/lib/

创建集群

创建集群前需要为 Job manager 创建 service,它有四个端口,其中 rpc 和 blob 端口用于和 task manager 之间的通信,query 和 ui 端口用于对外部提供服务。当前基于 node port 类型的 service 存在两点缺陷:

  • job manager 和 task manager 直接的通信需要经过 iptables 或者 ipvs 转发,增加网络开销。事实上,headless 的 service 既能满足服务发现的要求,又能减少网络开销。
  • Serverless K8S(如 AWS EKS) 一般不支持 node port 类型的 service,不利于 flink 往云上迁移。

增对上述两个缺点,故建议对 rpc 和 blob 端口采用 headless service,对 ui 和 port 端口采用 node port 类型的 service。如下两个 patch 正在解决此类问题:

另外在容器环境下,jvm 获取的内存信息通常是宿主机的信息,可以通过 taskmanager.memory.process.size 设置成 pod 的规格解决。

本文通过如下 job-cluster-service.yaml 创建 Headless service:

apiVersion: v1
kind: Service
metadata:
  name: flink-job-cluster
  labels:
    app: flink
    component: job-cluster
spec:
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  clusterIP: None
  selector:
    app: flink
    component: job-cluster

为 ui 和 query 端口创建 node port service:

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  ports:
  - name: rest
    port: 8081
    nodePort: 30081
  - name: query
    port: 6125
    nodePort: 30025
  type: NodePort
  selector:
    app: flink
    component: jobmanager

通过如下 job-cluster.yaml 创建 job manager。

apiVersion: batch/v1
kind: Job
metadata:
  name: job-cluster
spec:
  template:
    metadata:
      labels:
        app: flink
        component: job-cluster
    spec:
      restartPolicy: OnFailure
      containers:
      - name: job-cluster
        image: {your image}
        command: ["/opt/flink/bin/standalone-job.sh"]
        args: ["start-foreground",
               "-Djobmanager.rpc.address=job-cluster",
               "-Dparallelism.default={job parallelism}",
               "-Dblob.server.port=6124",
               "-Dqueryable-state.server.ports=6125"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 6125
          name: query
        - containerPort: 8081
          name: ui

通过如下 task-manager.yaml 创建 task manager。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: task-manager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: task-manager
  template:
    metadata:
      labels:
        app: flink
        component: task-manager
    spec:
      containers:
      - name: flink-task-manager
        image: {your image}
        command: ["/opt/flink/bin/taskmanager.sh"]
        args: ["start-foreground",
               "-Djobmanager.rpc.address=job-cluster"]

Native Kubernetes Setup

Native Kubernetes Setup 是更理想 Flink On K8S 部署方式,支持资源的动态分配,但是处于 beta 阶段,故暂不采纳。官网给出了详细的部署介绍,本文不再累述。其原理简要如下:

native_kubernetes

  1. (图 1-2 步骤)采用 K8S deployment 创建 job manager。
  2. (图 3)用户向 job manager 提交作业。
  3. (图 4-10)收到作业请求后,job manager 根据当前资源池的情况决定是否需要扩容资源。如果资源充足,将任务下发到 task manager 节点。如果资源不足,需要向 K8S 申请扩容 task manager,最后再将任务下发给给 task manager。