Build một hệ thống Kafka-Based FastAPI Microservices với Real-Time Event Streaming

813
20-02-2026
Build một hệ thống Kafka-Based FastAPI Microservices với Real-Time Event Streaming

Chúng ta sẽ tìm hiểu cách xây dựng hai microservice — một service publish message và một service consume message một cách bất đồng bộ. Dù bạn đang thiết kế hệ thống cho xử lý dữ liệu thời gian thực, streaming API, hay kiến trúc phân tán, bài viết này sẽ cung cấp một nền tảng thực hành cơ bản. Ý tưởng là bắt đầu với một ví dụ đơn giản để hiểu cách hoạt động, sau đó có thể mở rộng (scale) hệ thống.

Vì sao chọn Kafka và FastAPI?

Kafka là nền tảng tiêu chuẩn trong ngành cho distributed messaging và log streaming. Trong khi đó, FastAPI là một framework web Python hiện đại, nổi bật với hiệu năng cao và hỗ trợ bất đồng bộ (async).

Khi kết hợp Kafka với FastAPI, chúng ta có thể:

Tiếp nhận và xử lý lượng dữ liệu lớn (high-throughput) một cách hiệu quả

Stream message theo thời gian thực

Mở rộng các microservice độc lập với nhau

System Architecture

Chúng ta sẽ xây dựng hai microservice:

Microservice A: Nhận HTTP POST request và publish message lên Kafka.

Microservice B: Lắng nghe các Kafka topic và consume message theo thời gian thực.

Các thành phần này sẽ được orchestrate bằng Docker Compose, giúp việc thiết lập môi trường trở nên dễ dàng, portable và sẵn sàng cho production.

Client --> FastAPI Producer --> Kafka Topic --> FastAPI Consumer

Microservice A: Kafka Producer với FastAPI

Service này cung cấp một endpoint /produce. Nó sử dụng thư viện aiokafka để publish message lên Kafka theo cơ chế bất đồng bộ (asynchronously).

# microservice-a/app/main.py

import asyncio

from fastapi import FastAPI

from pydantic import BaseModel

from aiokafka import AIOKafkaProducer

from aiokafka.errors import KafkaConnectionError

app = FastAPI()

KAFKA_TOPIC = "test-topic"

KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"

producer: AIOKafkaProducer | None = None

class Message(BaseModel):

content: str

@app.on_event("startup")

async def startup_event():

global producer

producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)

while True:

try:

await producer.start()

print("Kafka producer connected")

break

except KafkaConnectionError:

print("Kafka producer not available. Retrying in 5 seconds...")

await asyncio.sleep(5)

@app.on_event("shutdown")

async def shutdown_event():

global producer

if producer:

await producer.stop()

print("Kafka producer stopped")

@app.post("/produce")

async def produce_message(message: Message):

if producer is None:

return {"error": "Kafka producer is not started"}

await producer.send_and_wait(KAFKA_TOPIC, message.content.encode("utf-8"))

return {"status": "message sent", "content": message.content}

Microservice B: Kafka Consumer với FastAPI

Service này subscribe vào cùng một Kafka topic và lắng nghe các message được gửi tới. Khi có message mới, service sẽ consume và in nội dung message ra.

# microservice-b/app/main.py

from fastapi import FastAPI

import asyncio

from aiokafka import AIOKafkaConsumer

from aiokafka.errors import KafkaConnectionError

app = FastAPI()

KAFKA_TOPIC = "test-topic"

KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"

async def consume():

print(f"Entering in consume")

consumer = AIOKafkaConsumer(

KAFKA_TOPIC,

bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,

group_id="my-fastapi-consumer-group",

auto_offset_reset="earliest",

)

while True:

try:

await consumer.start()

print("Kafka consumer connected and listening...")

break

except KafkaConnectionError:

print("Kafka not available. Retrying in 5 seconds...")

await asyncio.sleep(5)

while True:

try:

print("Waiting for messages..")

async for msg in consumer:

print(f"Message received: {msg.value.decode('utf-8')}")

except Exception as e:

print(f"Error in consumer loop: {e}")

await asyncio.sleep(5)

@app.on_event("startup")

async def startup_event():

print("Startup event launched...")

await asyncio.sleep(10)

asyncio.create_task(consume())

@app.get("/")

async def root():

return {"message": "Microservice B is running!"}

Makefile dành cho Local Development

Để đơn giản hóa workflow của bạn, hãy sử dụng Makefile đi kèm:

# Variables

PROJECT_NAME=fastapi-microservices-kafka

COMPOSE=docker-compose

PY=python3

# Start development environment with hot reload

up:

@echo "Starting development environment..."

$(COMPOSE) up --build

# Stop and remove all containers, volumes, and networks

down:

@echo "Stopping and cleaning up containers and volumes..."

$(COMPOSE) down -v

# Stop containers without removing volumes

stop:

$(COMPOSE) down

# Rebuild images without using cache

rebuild:

$(COMPOSE) build --no-cache

# Send a test message to the Kafka producer

test-produce:

curl -X POST http://localhost:8001/produce \

-H "Content-Type: application/json" \

-d '{"content": "Test message from Makefile"}'

# View logs for the consumer microservice

logs-b:

$(COMPOSE) logs -f microservice-b

# View logs for the producer microservice

logs-a:

$(COMPOSE) logs -f microservice-a

# Clean up unused Docker data

clean:

docker system prune -f

# Open a shell in the producer container

sh-a:

$(COMPOSE) exec microservice-a sh

# Open a shell in the consumer container

sh-b:

$(COMPOSE) exec microservice-b sh

# Start production environment

up-prod:

@echo "Starting production environment..."

docker-compose -f docker-compose.yml -f docker-compose.prod.yml up --build -d

Test Kafka Pipeline

Sau khi các dịch vụ đã hoạt động (khởi tạo), hãy test dịch vụ producer bằng lệnh:

curl -X POST http://localhost:8000/produce \

-H "Content-Type: application/json" \

-d '{"content": "Hello Kafka from FastAPI!"}'

Bạn sẽ thấy thông báo được in trong nhật ký của người dùng bằng lệnh:

make logs-b

Real-Time Use Cases

Kiến trúc này có thể làm nền tảng cho nhiều hệ thống thời gian thực, chẳng hạn như:

Theo dõi đội xe theo thời gian thực (real-time fleet tracking)

Pipeline xử lý sự kiện tài chính

Ứng dụng crawler thu thập dữ liệu

Các công cụ tổng hợp và phân tích log

Bằng cách tận dụng độ bền và khả năng xử lý dữ liệu lớn của Kafka cùng với khả năng bất đồng bộ của FastAPI, bạn có thể xây dựng các ứng dụng phân tán hiệu năng cao.

Chúng ta đã xây dựng một hệ thống microservices nhẹ nhưng có khả năng mở rộng, sử dụng FastAPI và Kafka. Mô hình này rất phù hợp cho các developer muốn tích hợp cơ chế messaging thời gian thực vào hệ thống mà vẫn đảm bảo hiệu năng và khả năng bảo trì.

Theo Bizfly Cloud

SHARE