Что происходит с необработанными сообщениями в Kafka
1️⃣ Как кратко ответить
Необработанные сообщения в Kafka остаются в топике до тех пор, пока не истечет заданный период хранения (retention period) или не будет достигнут лимит по размеру. После этого они удаляются. Потребители могут обрабатывать сообщения в любое время до их удаления.
2️⃣ Подробное объяснение темы
Apache Kafka — это распределенная платформа потоковой передачи данных, которая позволяет публиковать и подписываться на потоки записей, аналогично системе обмена сообщениями. В Kafka сообщения организованы в топики, и каждый топик разбит на партиции. Понимание того, что происходит с необработанными сообщениями, важно для эффективного использования Kafka.
Хранение сообщений
Когда сообщения публикуются в Kafka, они записываются в определенный топик. Эти сообщения остаются в топике до тех пор, пока не истечет период хранения (retention period) или не будет достигнут лимит по размеру. Период хранения и лимит по размеру настраиваются на уровне топика и могут быть изменены в зависимости от требований приложения.
Период хранения (Retention Period)
Период хранения определяет, как долго сообщения будут храниться в Kafka, независимо от того, были они обработаны потребителями или нет. Например, если период хранения установлен на 7 дней, сообщения будут доступны для потребителей в течение этого времени. После истечения этого периода сообщения будут удалены, чтобы освободить место для новых данных.
Лимит по размеру
Помимо периода хранения, Kafka также может удалять сообщения, если общий размер данных в топике превышает заданный лимит. Это позволяет контролировать использование дискового пространства. Если лимит по размеру достигнут, старые сообщения будут удалены, чтобы освободить место для новых.
Обработка сообщений потребителями
Потребители в Kafka могут читать сообщения из топиков в любое время до их удаления. Kafka сохраняет смещение (offset) для каждого потребителя, что позволяет им продолжать чтение с того места, где они остановились. Это означает, что даже если сообщение было опубликовано, но еще не обработано, потребитель может вернуться и обработать его позже, при условии, что сообщение еще не удалено.
Пример кода
Рассмотрим пример, как потребитель может обрабатывать сообщения из топика:
package main
import (
"fmt"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
// Создаем новый читатель для топика "example-topic" на локальном хосте
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "example-topic",
Partition: 0,
})
// Закрываем читатель после завершения работы
defer r.Close()
// Бесконечный цикл для чтения сообщений
for {
// Читаем сообщение из топика
m, err := r.ReadMessage(context.Background())
if err != nil {
log.Fatal(err)
}
// Выводим ключ и значение сообщения
fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}
}
kafka.NewReader: Создает нового читателя для указанного топика и партиции.r.ReadMessage: Читает следующее сообщение из топика. Если сообщение доступно, возвращает его; если нет, ждет, пока сообщение не появится.m.Offset: Смещение сообщения, которое позволяет отслеживать, какие сообщения уже были обработаны.string(m.Key),string(m.Value): Преобразуют ключ и значение сообщения в строку для вывода.
Заключение
Необработанные сообщения в Kafka остаются доступными для потребителей до тех пор, пока не истечет период хранения или не будет достигнут лимит по размеру. Это позволяет потребителям обрабатывать сообщения в удобное для них время, обеспечивая гибкость и надежность в обработке потоков данных.
🔒 Подпишись на бусти автора и стань Алигатором, чтобы получить полный доступ к функционалу сайта и отслеживать свой прогресс!
Подписаться