← Назад ко всем вопросам

Что такое latest смещение консьюмера в Kafka

1️⃣ Как кратко ответить

Latest смещение консьюмера в Kafka — это позиция, с которой консьюмер начинает чтение сообщений из топика, начиная с самого последнего сообщения, которое было записано в момент старта консьюмера. Это позволяет игнорировать все предыдущие сообщения и обрабатывать только новые, поступающие после подключения.

2️⃣ Подробное объяснение темы

В Apache Kafka, когда консьюмер подключается к топику, он должен определить, с какого смещения (offset) начать чтение сообщений. Смещение — это уникальный идентификатор каждого сообщения в разделе (partition) топика, который указывает на его позицию. Kafka предоставляет несколько стратегий для определения начального смещения, и одна из них — это "latest".

Когда консьюмер использует "latest" смещение, он начинает чтение с самого последнего сообщения, которое было записано в топик на момент его подключения. Это означает, что все предыдущие сообщения, которые были записаны до подключения консьюмера, будут проигнорированы. Такая стратегия полезна в ситуациях, когда важны только новые данные, и нет необходимости обрабатывать старые сообщения.

Пример использования "latest" смещения:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Properties;
import java.util.Collections;
​
public class LatestOffsetConsumer {
    public static void main(String[] args) {
        // Создаем объект Properties для конфигурации консьюмера
        Properties props = new Properties();
​
        // Указываем адрес Kafka-брокера
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
​
        // Задаем идентификатор группы консьюмера
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");
​
        // Указываем, что консьюмер должен начинать чтение с последнего смещения
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
​
        // Указываем десериализаторы для ключей и значений сообщений
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
​
        // Создаем экземпляр KafkaConsumer с заданными свойствами
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
​
        // Подписываемся на топик "my-topic"
        consumer.subscribe(Collections.singletonList("my-topic"));
​
        // Бесконечный цикл для постоянного чтения новых сообщений
        try {
            while (true) {
                // Пытаемся получить новые сообщения из топика
                ConsumerRecords<String, String> records = consumer.poll(100);
​
                // Обрабатываем каждое полученное сообщение
                for (ConsumerRecord<String, String> record : records) {
                    // Выводим в консоль ключ и значение сообщения
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        } finally {
            // Закрываем консьюмер при завершении работы
            consumer.close();
        }
    }
}
  • Создание объекта Properties: Здесь мы задаем конфигурацию для консьюмера, включая адрес Kafka-брокера и идентификатор группы.
  • AUTO_OFFSET_RESET_CONFIG: Устанавливаем значение "latest", чтобы консьюмер начинал чтение с последнего смещения.
  • Десериализаторы: Указываем, как должны быть десериализованы ключи и значения сообщений.
  • KafkaConsumer: Создаем экземпляр консьюмера с заданными свойствами.
  • Подписка на топик: Консьюмер подписывается на топик "my-topic".
  • Бесконечный цикл: Консьюмер постоянно пытается получить новые сообщения из топика.
  • Обработка сообщений: Для каждого полученного сообщения выводим его смещение, ключ и значение.

Использование "latest" смещения позволяет консьюмеру обрабатывать только новые сообщения, что может быть полезно для приложений, которым не нужно обрабатывать исторические данные.

Тема: Kafka и брокеры
Стадия: Tech

🔒 Подпишись на бусти автора и стань Алигатором, чтобы получить полный доступ к функционалу сайта и отслеживать свой прогресс!

Твои заметки