Build một hệ thống Kafka-Based FastAPI Microservices với Real-Time Event Streaming
Chúng ta sẽ tìm hiểu cách xây dựng hai microservice — một service publish message và một service consume message một cách bất đồng bộ. Dù bạn đang thiết kế hệ thống cho xử lý dữ liệu thời gian thực, streaming API, hay kiến trúc phân tán, bài viết này sẽ cung cấp một nền tảng thực hành cơ bản. Ý tưởng là bắt đầu với một ví dụ đơn giản để hiểu cách hoạt động, sau đó có thể mở rộng (scale) hệ thống.
Vì sao chọn Kafka và FastAPI?
Kafka là nền tảng tiêu chuẩn trong ngành cho distributed messaging và log streaming. Trong khi đó, FastAPI là một framework web Python hiện đại, nổi bật với hiệu năng cao và hỗ trợ bất đồng bộ (async).
Khi kết hợp Kafka với FastAPI, chúng ta có thể:
Tiếp nhận và xử lý lượng dữ liệu lớn (high-throughput) một cách hiệu quả
Stream message theo thời gian thực
Mở rộng các microservice độc lập với nhau
System Architecture
Chúng ta sẽ xây dựng hai microservice:
Microservice A: Nhận HTTP POST request và publish message lên Kafka.
Microservice B: Lắng nghe các Kafka topic và consume message theo thời gian thực.
Các thành phần này sẽ được orchestrate bằng Docker Compose, giúp việc thiết lập môi trường trở nên dễ dàng, portable và sẵn sàng cho production.
Client --> FastAPI Producer --> Kafka Topic --> FastAPI Consumer
Microservice A: Kafka Producer với FastAPI
Service này cung cấp một endpoint /produce. Nó sử dụng thư viện aiokafka để publish message lên Kafka theo cơ chế bất đồng bộ (asynchronously).
# microservice-a/app/main.py
import asyncio
from fastapi import FastAPI
from pydantic import BaseModel
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaConnectionError
app = FastAPI()
KAFKA_TOPIC = "test-topic"
KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
producer: AIOKafkaProducer | None = None
class Message(BaseModel):
content: str
@app.on_event("startup")
async def startup_event():
global producer
producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
while True:
try:
await producer.start()
print("Kafka producer connected")
break
except KafkaConnectionError:
print("Kafka producer not available. Retrying in 5 seconds...")
await asyncio.sleep(5)
@app.on_event("shutdown")
async def shutdown_event():
global producer
if producer:
await producer.stop()
print("Kafka producer stopped")
@app.post("/produce")
async def produce_message(message: Message):
if producer is None:
return {"error": "Kafka producer is not started"}
await producer.send_and_wait(KAFKA_TOPIC, message.content.encode("utf-8"))
return {"status": "message sent", "content": message.content}
Microservice B: Kafka Consumer với FastAPI
Service này subscribe vào cùng một Kafka topic và lắng nghe các message được gửi tới. Khi có message mới, service sẽ consume và in nội dung message ra.
# microservice-b/app/main.py
from fastapi import FastAPI
import asyncio
from aiokafka import AIOKafkaConsumer
from aiokafka.errors import KafkaConnectionError
app = FastAPI()
KAFKA_TOPIC = "test-topic"
KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
async def consume():
print(f"Entering in consume")
consumer = AIOKafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id="my-fastapi-consumer-group",
auto_offset_reset="earliest",
)
while True:
try:
await consumer.start()
print("Kafka consumer connected and listening...")
break
except KafkaConnectionError:
print("Kafka not available. Retrying in 5 seconds...")
await asyncio.sleep(5)
while True:
try:
print("Waiting for messages..")
async for msg in consumer:
print(f"Message received: {msg.value.decode('utf-8')}")
except Exception as e:
print(f"Error in consumer loop: {e}")
await asyncio.sleep(5)
@app.on_event("startup")
async def startup_event():
print("Startup event launched...")
await asyncio.sleep(10)
asyncio.create_task(consume())
@app.get("/")
async def root():
return {"message": "Microservice B is running!"}
Makefile dành cho Local Development
Để đơn giản hóa workflow của bạn, hãy sử dụng Makefile đi kèm:
# Variables
PROJECT_NAME=fastapi-microservices-kafka
COMPOSE=docker-compose
PY=python3
# Start development environment with hot reload
up:
@echo "Starting development environment..."
$(COMPOSE) up --build
# Stop and remove all containers, volumes, and networks
down:
@echo "Stopping and cleaning up containers and volumes..."
$(COMPOSE) down -v
# Stop containers without removing volumes
stop:
$(COMPOSE) down
# Rebuild images without using cache
rebuild:
$(COMPOSE) build --no-cache
# Send a test message to the Kafka producer
test-produce:
curl -X POST http://localhost:8001/produce \
-H "Content-Type: application/json" \
-d '{"content": "Test message from Makefile"}'
# View logs for the consumer microservice
logs-b:
$(COMPOSE) logs -f microservice-b
# View logs for the producer microservice
logs-a:
$(COMPOSE) logs -f microservice-a
# Clean up unused Docker data
clean:
docker system prune -f
# Open a shell in the producer container
sh-a:
$(COMPOSE) exec microservice-a sh
# Open a shell in the consumer container
sh-b:
$(COMPOSE) exec microservice-b sh
# Start production environment
up-prod:
@echo "Starting production environment..."
docker-compose -f docker-compose.yml -f docker-compose.prod.yml up --build -d
Test Kafka Pipeline
Sau khi các dịch vụ đã hoạt động (khởi tạo), hãy test dịch vụ producer bằng lệnh:
curl -X POST http://localhost:8000/produce \
-H "Content-Type: application/json" \
-d '{"content": "Hello Kafka from FastAPI!"}'
Bạn sẽ thấy thông báo được in trong nhật ký của người dùng bằng lệnh:
make logs-b
Real-Time Use Cases
Kiến trúc này có thể làm nền tảng cho nhiều hệ thống thời gian thực, chẳng hạn như:
Theo dõi đội xe theo thời gian thực (real-time fleet tracking)
Pipeline xử lý sự kiện tài chính
Ứng dụng crawler thu thập dữ liệu
Các công cụ tổng hợp và phân tích log
Bằng cách tận dụng độ bền và khả năng xử lý dữ liệu lớn của Kafka cùng với khả năng bất đồng bộ của FastAPI, bạn có thể xây dựng các ứng dụng phân tán hiệu năng cao.
Chúng ta đã xây dựng một hệ thống microservices nhẹ nhưng có khả năng mở rộng, sử dụng FastAPI và Kafka. Mô hình này rất phù hợp cho các developer muốn tích hợp cơ chế messaging thời gian thực vào hệ thống mà vẫn đảm bảo hiệu năng và khả năng bảo trì.
Theo Bizfly Cloud










