← Назад ко всем вопросам

Что такое ‘poison message’ и как с ним работать?

1️⃣ Как кратко ответить

Poison message — это сообщение в системе очередей, которое не может быть обработано из-за ошибок в данных или логике обработки. Для работы с poison message необходимо реализовать механизм их обнаружения и обработки, например, перемещение в отдельную очередь для дальнейшего анализа и исправления.

2️⃣ Подробное объяснение темы

Poison message — это сообщение, которое не может быть обработано системой очередей из-за ошибок, таких как некорректные данные или логические ошибки в коде обработчика. Эти сообщения могут вызывать повторные сбои в обработке, если не предпринять меры для их обработки.

Зачем это нужно

В системах, использующих очереди сообщений, таких как RabbitMQ, Apache Kafka или Amazon SQS, сообщения обрабатываются асинхронно. Если сообщение вызывает ошибку при обработке, оно может быть возвращено в очередь и попытка обработки будет повторена. Однако, если сообщение постоянно вызывает сбои, это может привести к бесконечному циклу повторных попыток, что негативно сказывается на производительности системы.

Как это работает

  1. Обнаружение poison message:

    • Система должна иметь механизм для отслеживания количества попыток обработки каждого сообщения. Если количество попыток превышает заданный порог, сообщение считается poison message.
  2. Обработка poison message:

    • Перемещение в "мертвую" очередь (dead-letter queue, DLQ): Poison message перемещается в отдельную очередь для дальнейшего анализа и исправления.
    • Логирование: Запись информации о poison message в лог для последующего анализа.
    • Уведомление: Отправка уведомления разработчикам или администраторам системы о возникновении poison message.

Пример кода

Рассмотрим пример на Java с использованием Amazon SQS и AWS SDK, где мы обрабатываем сообщения и перемещаем poison message в DLQ.

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageRequest;
​
public class SQSExample {
​
    private static final String QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue";
    private static final String DLQ_URL = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDLQ";
    private static final int MAX_RETRIES = 3;
​
    public static void main(String[] args) {
        AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
​
        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(QUEUE_URL)
                .withMaxNumberOfMessages(1)
                .withWaitTimeSeconds(10);
​
        while (true) {
            for (Message message : sqs.receiveMessage(receiveMessageRequest).getMessages()) {
                try {
                    processMessage(message);
                    sqs.deleteMessage(QUEUE_URL, message.getReceiptHandle());
                } catch (Exception e) {
                    handlePoisonMessage(sqs, message);
                }
            }
        }
    }
​
    private static void processMessage(Message message) throws Exception {
        // Логика обработки сообщения
        // Если возникает ошибка, выбрасывается исключение
        if (message.getBody().contains("error")) {
            throw new Exception("Processing error");
        }
    }
​
    private static void handlePoisonMessage(AmazonSQS sqs, Message message) {
        int retryCount = getRetryCount(message);
        if (retryCount >= MAX_RETRIES) {
            // Перемещение в DLQ
            sqs.sendMessage(new SendMessageRequest(DLQ_URL, message.getBody()));
            sqs.deleteMessage(QUEUE_URL, message.getReceiptHandle());
        } else {
            // Увеличение счетчика попыток
            incrementRetryCount(message);
        }
    }
​
    private static int getRetryCount(Message message) {
        // Получение количества попыток обработки из атрибутов сообщения
        String retryCountStr = message.getAttributes().get("ApproximateReceiveCount");
        return Integer.parseInt(retryCountStr);
    }
​
    private static void incrementRetryCount(Message message) {
        // Логика для увеличения счетчика попыток
        // В SQS это делается автоматически, поэтому здесь пусто
    }
}
  • AmazonSQS: Клиент для работы с Amazon SQS.
  • QUEUE_URL и DLQ_URL: URL основной очереди и очереди для poison message.
  • MAX_RETRIES: Максимальное количество попыток обработки сообщения.
  • processMessage: Метод, который обрабатывает сообщение. Если возникает ошибка, выбрасывается исключение.
  • handlePoisonMessage: Метод для обработки poison message. Если количество попыток превышает MAX_RETRIES, сообщение перемещается в DLQ.
  • getRetryCount: Получает количество попыток обработки сообщения.
  • incrementRetryCount: В SQS количество попыток увеличивается автоматически, поэтому метод пуст.

Этот пример демонстрирует, как можно обрабатывать poison message в системе очередей, чтобы избежать бесконечных циклов повторных попыток и обеспечить стабильность системы.

Тема: Очереди и брокеры сообщений
Стадия: Tech

🔒 Подпишись на бусти автора и стань Алигатором, чтобы получить полный доступ к функционалу сайта и отслеживать свой прогресс!

Твои заметки