Xử lý Parallel Kafka hàng loạt với Kotlin Coroutines trong Spring Boot

1124
29-06-2026
Xử lý Parallel Kafka hàng loạt với Kotlin Coroutines trong Spring Boot

Cùng tìm hiểu cách Kotlin Coroutines cải thiện khả năng xử lý hàng loạt của Spring Boot Kafka với khả năng thực thi song song, điều tiết tài nguyên và các thao tác database nhanh hơn.

Quản lý lưu lượng message lớn trong kiến trúc phân tán là rất quan trọng. Sử dụng hiệu quả các database và CPU resource cũng vô cùnkotlineg cần thiết. Có những cấu trúc cho phép chúng ta nhận message hàng loạt. Cấu trúc "BatchMessageListener" mặc định của Spring Kafka đáp ứng nhu cầu này. Tuy nhiên, việc xử lý các message này thường lần lượt gặp phải bottleneck.

Bài này sẽ thảo luận chi tiết về cấu trúc và cách sử dụng Kotlin Coroutines. Chúng ta sẽ xem cách tối đa hóa hiệu suất xử lý Kafka message bằng cách sử dụng các nguyên tắc Structured Concurrency và kỹ thuật Resource Throttling.

Nút thắt kiến trúc: Sequential I/O Blocking

Trên Kafka listener hiện tại:

Việc thực hiện các truy vấn cơ sở dữ liệu hoặc gọi dịch vụ bên ngoài cho mỗi message sẽ làm tăng thời gian xử lý tổng thể. Nếu tốc độ xử lý của một message chậm hơn tốc độ nhận message và thời gian max-poll-interval-ms bị vượt quá, consumer sẽ bị loại khỏi consumer group.

Quá trình rebalancing được kích hoạt và các partition của consumer đó được phân phối lại cho các consumer khác trong group.

@KafkaListener(topics = ["usage-pool-topic"])

fun usagePoolListener(records: List>) {

records.forEach { record ->

processRecord(record) // Network

latency + DB I/O blocking

}

}

Giải pháp

1. Batch-Fetch và cấu trúc In-Memory Map

Trước khi bắt đầu xử lý đồng thời (concurrent), dữ liệu được lấy tập trung từ tất cả các entity cần thiết. Nhiều truy vấn riêng lẻ được chuyển thành một truy vấn batch trước khi quá trình xử lý diễn ra, giúp giải quyết vấn đề N+1 query ngay tại tầng ứng dụng.

Toàn bộ dữ liệu được cache một lần rồi mới phân tách thành các tác vụ chạy concurrent, từ đó giảm đáng kể sự phụ thuộc vào database. Sau đó, dữ liệu được chuyển thành cấu trúc Map bằng hàm associateBy, cho phép truy cập theo key với thời gian truy xuất nhanh và an toàn. Nhờ vậy, các tác vụ concurrent có thể đọc dữ liệu trực tiếp từ memory thay vì liên tục truy vấn database.

val messages = records.map { objectMapper.readValue(it.value(), UsagePoolRecord::class.java) }

val usagePoolEntities = usagePoolRepository

.findByIds(messages.map { it.usagePoolId.toBigInteger() })

.associateBy { it.usagePoolId }

val lockEntities = lockRepository

.findByUserIds(messages.map { it.userId })

.associateBy { it.userId }

2. Structured Concurrency

Quản lý Memory bằng cách chia thành các khối (chunking)

Cấu trúc khối (chunk) phục vụ hai mục đích:

Ngăn chặn việc tạo ra các coroutine đồng thời. Điều này giúp tránh việc sử dụng memory khi không cần thiết.

Mỗi chunk ghi dữ liệu vào database sau khi tất cả các coroutine đã hoàn thành hoạt động của mình. Không gặp phải việc tiêu thụ tài nguyên kết nối (database pool) không cần thiết.

messages.chunked(150).forEach { chunk ->

// Each chunk of 150 records is processed concurrently

}

Phân lập tài nguyên với limitedParallelism

Tại sao lại cần limitedParallelism? Ví dụ, nếu database connection pool có X kết nối, việc giữ giới hạn parallelism dưới X sẽ ngăn ngừa lỗi "Connection Timeout".

messages.chunked(150).forEach { chunk ->

val deferredResults = chunk.map { record ->

CoroutineScope(Dispatchers.IO.limitedParallelism(15)).async {

try {

processRecord(record, usagePoolEntities, lockEntities)

} catch (e: Exception) {

log.error("Operation error: ${record.key()}", e)

buildErrorRecord(record, e)

}

}

}

val results = deferredResults.awaitAll() // Structural waiting

collectAndAggregate(results)

}

Lệnh Dispatchers.IO.limitedParallelism(X) giới hạn số lượng coroutine đồng thời xuống còn X, ngăn để DB connection pool không bị cạn kiệt.

Mỗi coroutine trả về một kết quả với lệnh async.

Lệnh awaitAll() chờ tất cả các coroutine trong chunk hoàn thành trước khi chuyển sang bước tiếp theo.

runBlocking

Hàm này chặn các caller cho đến khi tất cả các thao tác concurrent hoàn thành. Đây là cách tiếp cận đúng ở đây vì:

Nó đảm bảo rằng Kafka consumer vẫn bị chặn để duy trì cấu trúc cam kết offset của nó cho đến khi tất cả các record trong batch được xử lý.

Chúng ta vẫn được hưởng lợi từ parallelism của các thao tác concurrent trong block runBlocking.

3. Cấu trúc lưu kết quả an toàn cho đa luồng (Thread-Safe Result Structure)

Sau khi awaitAll() hoàn tất, kết quả từ các coroutine được thu thập vào các cấu trúc dữ liệu thread-safe, sau đó mới thực hiện một lần ghi dữ liệu theo batch. Việc sử dụng MutableList để lưu kết quả từ nhiều coroutine chạy song song có thể gây ra race condition và dẫn đến mất dữ liệu.

Trong trường hợp này, nên ưu tiên các cấu trúc dữ liệu lock-free như ConcurrentLinkedQueue. Thay vì sử dụng cơ chế đồng bộ (synchronized), ConcurrentLinkedQueue hoạt động dựa trên thuật toán CAS (Compare-And-Swap), giúp đảm bảo an toàn khi nhiều luồng cùng ghi dữ liệu và mang lại hiệu suất cao hơn trong các tác vụ có tần suất ghi lớn.

data class AggregatedRecords(

val processedSave: ConcurrentLinkedQueue = ConcurrentLinkedQueue(),

val toDelete: ConcurrentLinkedQueue = ConcurrentLinkedQueue(),

val retryQueue: ConcurrentLinkedQueue = ConcurrentLinkedQueue()

)

Giá trị trả về của DataIntegrityViolationException rất quan trọng. Khi hai phiên bản consumer xử lý cùng một record, một trong số chúng vi phạm ràng buộc duy nhất (Unique Constraint). Thay vì đánh fail toàn bộ batchi, việc xóa từng record một sẽ được thực hiện.

AggregatedRecords.processedSave

.chunked(150)

.forEach { batch ->

try {

processedRepository.saveAll(batch)

} catch (e: DataIntegrityViolationException) {

batch.forEach { record ->

try { processedRepository.save(record) }

catch (e: DataIntegrityViolationException) {}

}

}

}

4. Khả năng chịu lỗi trong các thao tác ghi

Các thao tác ghi hàng loạt (saveAll) đạt hiệu suất tốt. Tuy nhiên, lỗi "Unique Constraint" trong một single record có thể khiến toàn bộ batch bị lỗi.

Cấu trúc sau đây rất quan trọng để đáp ứng các yêu cầu Optimistic Locking hoặc Idempotency.

aggregatedRecords.processedSave.chunked(150).forEach { batch ->

try {

processedRepository.saveAll(batch)

} catch (e: DataIntegrityViolationException) {

// Fallback: Try one by one if batch fails

batch.forEach { record ->

try {

processedRepository.save(record)

} catch (innerException: DataIntegrityViolationException) {

log.warn("Duplicate record skipped: ${record.id}")

}

}

}

}

5. Data Flow Diagram

Ingress: Batch message từ Kafka được tiếp nhận thông qua runBlocking.

Preparation: Tất cả context data cần thiết được truy xuất hàng loạt từ DB.

Execution: Các coroutine được bắt đầu bất đồng bộ theo từng chunk.

Synchronization: Hệ thống sử dụng awaitAll() để chờ tất cả coroutine hoàn thành trước khi tiếp tục.

Egress: Kết quả thu thập được được lưu vĩnh viễn với saveAll.

Xử lý Parallel Kafka hàng loạt với Kotlin Coroutines trong Spring Boot - Ảnh 8.

Phân tích hiệu suất và kết quả

Xử lý Parallel Kafka hàng loạt với Kotlin Coroutines trong Spring Boot - Ảnh 9.

Kết luận

Việc xử lý Kafka message trong Spring Boot bằng Kotlin Coroutines không chỉ tăng tốc độ mà còn cải thiện khả năng đọc hiểu code và giúp quản lý tài nguyên với khả năng dự đoán tốt hơn.

Việc sử dụng runBlocking cho phép chúng ta xây dựng cầu nối giữa luồng Kafka Consumer hoạt động theo cơ chế blocking và mô hình coroutine bất đồng bộ, giúp tận dụng lợi ích của Coroutines mà vẫn đảm bảo cơ chế quản lý offset của Kafka hoạt động ổn định.

SHARE