外部访问 k8s 中的 kafka 集群
如果直接在云厂商提供的实例上搭建 kafka 集群可以说很简单,一般不会有什么困难。当我们选择把 kafka 部署到 k8s 里,希望利用 k8s 提供的编排能力来帮助我们更方便的管理 kafka 集群。在这种情况下部署会变得复杂起来,主要两个问题
- 有状态的服务部署
- 从 k8s 集群外访问
zookeeper 部署
我们都知道 kafka 依赖 zk, 所以首先需要在 k8s 集群部署 zookeeper。 zookeeper 是有状态的服务,所以选择的方式是 StatefulSet + PVC。
这里我们使用的 zk 镜像是 k8s 官方的 k8s.gcr.io/kubernetes-zookeeper:1.0-3.4.10
, 从这里 我们能看到,zk 在启动时候会自动创建配置文件并且根据 pod 的编号动态的把 myid 写入到 zk 的配置文件。
-
StatefulSet 部署 这里部署3副本的 zk 集群
StatefulSet 部署 yaml 示例如下:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
apiVersion: apps/v1 kind: StatefulSet metadata: name: zookeeper spec: selector: matchLabels: app: zookeeper serviceName: zookeeper-hs replicas: 3 updateStrategy: type: RollingUpdate podManagementPolicy: OrderedReady template: metadata: labels: app: zookeeper spec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: "app" operator: In values: - zookeeper topologyKey: "kubernetes.io/hostname" containers: - name: zookeeper imagePullPolicy: Always image: "k8s.gcr.io/kubernetes-zookeeper:1.0-3.4.10" command: - sh - -c - "start-zookeeper \ --servers=3 \ --data_dir=/var/lib/zookeeper/data \ --data_log_dir=/var/lib/zookeeper/data/log \ --conf_dir=/opt/zookeeper/conf \ --client_port=2181 \ --election_port=3888 \ --server_port=2888 \ --tick_time=2000 \ --init_limit=10 \ --sync_limit=5 \ --heap=512M \ --max_client_cnxns=60 \ --snap_retain_count=3 \ --purge_interval=12 \ --max_session_timeout=40000 \ --min_session_timeout=4000 \ --log_level=INFO" ports: - containerPort: 2181 name: client - containerPort: 2888 name: server - containerPort: 3888 name: leader-election volumeMounts: - name: zookeeper-data mountPath: /var/lib/zookeeper securityContext: runAsUser: 1000 fsGroup: 1000 volumeClaimTemplates: - metadata: name: zookeeper-data spec: accessModes: [ "ReadWriteOnce" ] storageClassName: alicloud-disk-efficiency resources: requests: storage: 20Gi
-
创建 headless service 和 service zk 集群节点之间通过 headless service 互通,客户端访问 zk 通过 service headless service 和 service yaml 示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
apiVersion: v1 kind: Service metadata: name: zookeeper-hs labels: app: zookeeper spec: ports: - port: 2888 name: server - port: 3888 name: leader-election clusterIP: None selector: app: zookeeper --- apiVersion: v1 kind: Service metadata: name: zookeeper labels: app: zookeeper spec: ports: - port: 2181 name: zookeeper-client selector: app: zookeeper
kafka 部署
把 kafka 部署到 k8s 后,我们肯定是通过 service 从 k8s 外部访问 kafaka。这里的 service 要么是 NodePort, 要么是 LoadBalancer 类型。我们使用的方式是 LoadBalancer。
我们先看下面这张图,这是 kafka 在集群中的网络拓扑。当我们通过地址 12.345.67:31090 访问到 kafka 后,kafka 返回的访问地址是类似这样的 endpoint kafka-0.kafka-hs.kafka.default.svc.cluster.local:9092
。这是 k8s 集群内部能访问的 headless service endpoint 地址,从集群外部自然使用这个地址是访问不通的。
所以,我们需要解决两个问题:
- kafka 不同的 pod 需要不通的对外能访问的地址
- 不能使用 kafka 默认的
advertised.listeners
解决方案
问题1,我们为每个 pod 创建类型是 LoadBalancer 的 service 并且监听不同的端口。这样通过 LB IP + port 就能找到特定的 kafka broker。
service 示例如下:
|
|
问题2,我们在容器启动的时候,执行脚本动态获取 pod 编号,生成容器需要的环境变量 KAFKA_CFG_ADVERTISED_LISTENERS(对应 kafka broker 的配置 advertised.listener
)
|
|
完整的 kafka StatefulSet 示例如下:
|
|
最终,我们从集群外面就能通过 12.345.67.8:9092,12.345.67.8:9093,12.345.67.8:9094
这样的地址访问到 k8s 中的 kafka 集群。