Triển khai Apache Kafka trên Kubernetes (Kafka broker với persistent storage): Hướng dẫn đầy đủ
Chạy Kafka trên Kubernetes có thể cung cấp tính khả dụng cao, khả năng mở rộng và quản lý tài nguyên tốt hơn. Trong bài viết này, hãy cùng Bizfly Cloud triển khai các Kafka broker với persistent storage, cho phép truy cập một broker từ bên ngoài để testing và xác minh nó với một pod client Kafka tạm thời.
Tạo Namespace: Để tổ chức và phân lập tài nguyên, trước tiên chúng ta tạo một namespace cho Kafka. Tại đây chúng ta nhóm các tài nguyên liên quan (service, statefulset, config) cho Kafka một cách logic, ngăn ngừa xung đột về đặt tên và giúp quản lý các object này dễ dàng hơn.
$ kubectl create namespace kafka
namespace/kafka created
$ kubectl get namespaces
NAME STATUS AGE
kafka Active 5s
$ kubectl config set-context --current --namespace=kafka
Context "docker-desktop" modified.
Cấu hình Kafka: Chúng ta lưu trữ cấu hình Kafka trong một ConfigMap. Dưới đây là một cấu hình đơn giản trong kafka-config.yaml
$ kubectl get nodes -o wide
NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
docker-desktop Ready control-plane 68d v1.32.2 192.168.65.9 <none> Docker Desktop 6.10.14-linuxkit docker://28.3.0
Lấy internal-ip của máy đó và cài nó trong key advertised.listeners để cho phép truy cập
từ máy này.
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-config
data:
server.properties.template: |
process.roles=broker,controller
node.id=${BROKER_ID}
controller.quorum.voters=0@kafka-0.kafka-headless:9093,1@kafka-1.kafka-headless:9093,2@kafka-2.kafka-headless:9093
controller.listener.names=CONTROLLER
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.65.9:30090
inter.broker.listener.name=PLAINTEXT
log.dirs=/var/lib/kafka/data
num.partitions=3
default.replication.factor=3
min.insync.replicas=2
process.roles: Kafka sẽ hoạt động như một broker và cũng là một controller.
advertised.listeners: Các broker Kafka sẽ cho client biết cách để truy cập chúng. Đối với các pod bên trong cluster, chúng ta sử dụng kafka-$(HOSTNAME).kafka:9092 (DNS name). Từ bên ngoài cluster, chúng ta sử dụng external IP:port để kết nối với Kafka cluster.
num.partitions: Số lượng các broker sẽ lưu trữ bản copy của các partition của topic này. Các offset của consumer nội bộ được sao chép trên cả 3 broker.
default.replication.factor: Số lượng các bản sao của transaction log tồn tại. Ở đây, transaction log được sao chép trên cả 3 broker.
min.insync.replicas: Một transaction sẽ chỉ được commit nếu có ít nhất 2 bản sao được đồng bộ.
Bây giờ hãy tạo config map này để lưu trữ cấu hình Kafka trong ConfigMap.
$ kubectl apply -f kafka-config.yaml
configmap/kafka-config created
$ kubectl get configmap --namespace=kafka
NAME DATA AGE
kafka-config 10 9s
Triển khai Kafka StatefulSet: Chúng ta xác định một StatefulSet với 3 Kafka broker trong tệp kafka-statefulset.yaml.
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
serviceName: kafka-headless
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image: apache/kafka:3.7.0
imagePullPolicy: IfNotPresent
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: CLUSTER_ID
value: "4L6g3nShT-eMCtK-X86sw"
command:
- sh
- -euc
- |
export BROKER_ID=${HOSTNAME##*-}
export POD_NAME=${HOSTNAME}
sed "s/\${BROKER_ID}/${BROKER_ID}/g; s/\${POD_NAME}/${POD_NAME}/g" \
/opt/kafka/config/server.properties.template > /tmp/server.properties
/opt/kafka/bin/kafka-storage.sh format -t "$CLUSTER_ID" -c /tmp/server.properties || true
exec /opt/kafka/bin/kafka-server-start.sh /tmp/server.properties
ports:
- containerPort: 9092
name: broker
- containerPort: 9093
name: controller
volumeMounts:
- name: kafka-data
mountPath: /var/lib/kafka/data
- name: kafka-config
mountPath: /opt/kafka/config/server.properties.template
subPath: server.properties.template
volumes:
- name: kafka-config
configMap:
name: kafka-config
volumeClaimTemplates:
- metadata:
name: kafka-data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 1Gi
Chúng ta có,
export BROKER_ID=${HOSTNAME##*-} thiết lập ID broker duy nhất từ pod name.
export POD_NAME=${HOSTNAME} lưu tên máy chủ của pod để client truy cập.
sed … thay thế các placeholder trong config template bằng ID broker và pod name thực tế.
kafka-storage.sh format … || true khởi tạo Kafka storage
exec kafka-server-start.sh … khởi động tiến trình Kafka broker.
Bây giờ hãy tạo statefulset này và xác minh các pod.
$ kubectl apply -f kafka-statefulset.yaml
statefulset.apps/kafka created
$ kubectl get statefulset --namespace=kafka
NAME READY AGE
kafka 0/3 10s
$ kubectl get pods --namespace=kafka
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 0 39m
kafka-1 1/1 Running 0 53m
kafka-2 1/1 Running 0 53m
Vậy là các broker Kafka của chúng ta đang chạy trên cụm Kubernetes.
Tạo Headless service: Chúng ta xác định một headless service cho Kafka StatefulSet để có một DNS name cố định cho các broker Kafka trong tệp kafka-headless-service.yaml
apiVersion: v1
kind: Service
metadata:
name: kafka-headless
labels:
app: kafka
spec:
clusterIP: None
ports:
- name: broker
port: 9092
- name: controller
port: 9093
selector:
app: kafka
Tạo NodePort: Chúng ta xác định một nodeport service để truy cập từ bên ngoài cho việc thực hiện test.
apiVersion: v1
kind: Service
metadata:
name: kafka-0-external
spec:
type: NodePort
ports:
- port: 9092
targetPort: 9092
nodePort: 30090
selector:
statefulset.kubernetes.io/pod-name: kafka-0
Test: Chạy một pod ngắn hạn để test Kafka từ bên trong cụm. Từ shell, chúng ta tạo một topic, liệt kê các topic, gửi một message và nhận message đó từ Kafka.
$ kubectl run kafka-client --rm -it --image=apache/kafka:3.7.0 -- bash
kafka-client-test:/$ /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka-0:9092 --create --topic test-topic --partitions 3 --replication-factor 1
Created topic test-topic.
kafka-client-test:/$ /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka-0.kafka-headless:9092 --list
test-topic
kafka-client-test:/$ /opt/kafka/bin/kafka-console-producer.sh --broker-list kafka-0.kafka-headless:9092 --topic test-topic
>hello
kafka-client-test:/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka-0.kafka-headless:9092 --topic test-topic --from-beginning
Hello
Và như vậy là chúng ta đã kiểm thử thành công các Kafka broker từ một pod ngắn hạn khác trong cụm Kubernetes.
Kết luận
Với cấu hình này, chúng ta sẽ có cung cấp nền tảng vững chắc để xây dựng các đường dẫn dữ liệu thời gian thực, ứng dụng hướng sự kiện và kiến trúc microservices. Với các tính năng bổ sung như bảo mật (SSL/SASL) và giám sát (Prometheus/JMX), bạn có thể mở rộng cụm này cho các triển khai cấp độ sản xuất.



















