Listener Error Handlers

Starting with version 2.0, the @KafkaListener annotation has a new attribute: errorHandler.

You can use the errorHandler to provide the bean name of a KafkaListenerErrorHandler implementation.
This functional interface has one method, as the following listing shows:

@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message message, ListenerExecutionFailedException exception) throws Exception;

}

You have access to the spring-messaging Message object produced by the message converter and the exception that was thrown by the listener, which is wrapped in a ListenerExecutionFailedException.
The error handler can throw the original or a new exception, which is thrown to the container.
Anything returned by the error handler is ignored.

Starting with version 2.7, you can set the rawRecordHeader property on the MessagingMessageConverter and BatchMessagingMessageConverter which causes the raw ConsumerRecord to be added to the converted Message in the KafkaHeaders.RAW_DATA header.
This is useful, for example, if you wish to use a DeadLetterPublishingRecoverer in a listener error handler.
It might be used in a request/reply scenario where you wish to send a failure result to the sender, after some number of retries, after capturing the failed record in a dead letter topic.

@Bean
KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}

It has a sub-interface (ConsumerAwareListenerErrorHandler) that has access to the consumer object, through the following method:

Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer);

If your error handler implements this interface, you can, for example, adjust the offsets accordingly.
For example, to reset the offset to replay the failed message, you could do something like the following:

@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
    return (m, e, c) -> {
        this.listen3Exception = e;
        MessageHeaders headers = m.getHeaders();
        c.seek(new org.apache.kafka.common.TopicPartition(
                headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
                headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
                headers.get(KafkaHeaders.OFFSET, Long.class));
        return null;
    };
}

Similarly, you could do something like the following for a batch listener:

@Bean
public ConsumerAwareListenerErrorHandler listen10ErrorHandler() {
    return (m, e, c) -> {
        this.listen10Exception = e;
        MessageHeaders headers = m.getHeaders();
        List topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
        List partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
        List offsets = headers.get(KafkaHeaders.OFFSET, List.class);
        Map offsetsToReset = new HashMap<>();
        for (int i = 0; i < topics.size(); i++) {
            int index = i;
            offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
                    (k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
        }
        offsetsToReset.forEach((k, v) -> c.seek(k, v));
        return null;
    };
}

This resets each topic/partition in the batch to the lowest offset in the batch.

The preceding two examples are simplistic implementations, and you would probably want more checking in the error handler.
Container Error Handlers

Two error handler interfaces (ErrorHandler and BatchErrorHandler) are provided.
You must configure the appropriate type to match the message listener.

Starting with version 2.5, the default error handlers, when transactions are not being used, are the SeekToCurrentErrorHandler and RecoveringBatchErrorHandler with default configuration.
See Seek To Current Container Error Handlers and Recovering Batch Error Handler.
To restore the previous behavior, use the LoggingErrorHandler and BatchLoggingErrorHandler instead.

When transactions are being used, no error handlers are configured, by default, so that the exception will roll back the transaction.
Error handling for transactional containers are handled by the AfterRollbackProcessor.
If you provide a custom error handler when using transactions, it must throw an exception if you want the transaction rolled back.

Starting with version 2.3.2, these interfaces have a default method isAckAfterHandle() which is called by the container to determine whether the offset(s) should be committed if the error handler returns without throwing an exception.
Starting with version 2.4, this returns true by default.

Typically, the error handlers provided by the framework will throw an exception when the error is not «handled» (e.g. after performing a seek operation).
By default, such exceptions are logged by the container at ERROR level.
Starting with version 2.5, all the framework error handlers extend KafkaExceptionLogLevelAware which allows you to control the level at which these exceptions are logged.

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}

You can specify a global error handler to be used for all listeners in the container factory.
The following example shows how to do so:

@Bean
public KafkaListenerContainerFactory>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setErrorHandler(myErrorHandler);
    ...
    return factory;
}

Similarly, you can set a global batch error handler:

@Bean
public KafkaListenerContainerFactory>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setBatchErrorHandler(myBatchErrorHandler);
    ...
    return factory;
}

By default, if an annotated listener method throws an exception, it is thrown to the container, and the message is handled according to the container configuration.

If you are using Spring Boot, you simply need to add the error handler as a @Bean and boot will add it to the auto-configured factory.

Consumer-Aware Container Error Handlers

The container-level error handlers (ErrorHandler and BatchErrorHandler) have sub-interfaces called ConsumerAwareErrorHandler and ConsumerAwareBatchErrorHandler.
The handle method of the ConsumerAwareErrorHandler has the following signature:

void handle(Exception thrownException, ConsumerRecord data, Consumer consumer);

The handle method of the ConsumerAwareBatchErrorHandler has the following signature:

void handle(Exception thrownException, ConsumerRecords data, Consumer consumer);

Similar to the @KafkaListener error handlers, you can reset the offsets as needed, based on the data that failed.

Unlike the listener-level error handlers, however, you should set the ackOnError container property to false (default) when making adjustments.
Otherwise, any pending acks are applied after your repositioning.
Seek To Current Container Error Handlers

If an ErrorHandler implements RemainingRecordsErrorHandler, the error handler is provided with the failed record and any unprocessed records retrieved by the previous poll().
Those records are not passed to the listener after the handler exits.
The following listing shows the RemainingRecordsErrorHandler interface definition:

@FunctionalInterface
public interface RemainingRecordsErrorHandler extends ConsumerAwareErrorHandler {

    void handle(Exception thrownException, List> records, Consumer consumer);

}

This interface lets implementations seek all unprocessed topics and partitions so that the current record (and the others remaining) are retrieved by the next poll.
The SeekToCurrentErrorHandler does exactly this.

ackOnError must be false (which is the default).
Otherwise, if the container is stopped after the seek, but before the record is reprocessed, the record will be skipped when the container is restarted.

This is now the default error handler for record listeners.

The container commits any pending offset commits before calling the error handler.

To configure the listener container with this handler, add it to the container factory.

For example, with the @KafkaListener container factory, you can add SeekToCurrentErrorHandler as follows:

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

This will retry a delivery up to 2 times (3 delivery attempts) with a back off of 1 second, instead of the default configuration (FixedBackOff(0L, 9)).
Failures are simply logged after retries are exhausted.

As an example; if the poll returns six records (two from each partition 0, 1, 2) and the listener throws an exception on the fourth record, the container acknowledges the first three messages by committing their offsets.
The SeekToCurrentErrorHandler seeks to offset 1 for partition 1 and offset 0 for partition 2.
The next poll() returns the three unprocessed records.

If the AckMode was BATCH, the container commits the offsets for the first two partitions before calling the error handler.

Starting with version 2.2, the SeekToCurrentErrorHandler can now recover (skip) a record that keeps failing.
By default, after ten failures, the failed record is logged (at the ERROR level).
You can configure the handler with a custom recoverer (BiConsumer) and maximum failures.
Using a FixedBackOff with FixedBackOff.UNLIMITED_ATTEMPTS causes (effectively) infinite retries.
The following example configures recovery after three tries:

SeekToCurrentErrorHandler errorHandler =
    new SeekToCurrentErrorHandler((record, exception) -> {
        // recover after 3 failures, woth no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

Starting with version 2.2.4, when the container is configured with AckMode.MANUAL_IMMEDIATE, the error handler can be configured to commit the offset of recovered records; set the commitRecovered property to true.

When using transactions, similar functionality is provided by the DefaultAfterRollbackProcessor.
See After-rollback Processor.

Starting with version 2.3, the SeekToCurrentErrorHandler considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure.
The exceptions that are considered fatal, by default, are:

since these exceptions are unlikely to be resolved on a retried delivery.

You can add more exception types to the not-retryable category, or completely replace the map of classified exceptions.
See the Javadocs for SeekToCurrentErrorHandler.setClassifications() for more information, as well as those for the spring-retry BinaryExceptionClassifier.

Here is an example that adds IllegalArgumentException to the not-retryable exceptions:

@Bean
public SeekToCurrentErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler(recoverer);
    handler.addNotRetryableException(IllegalArgumentException.class);
    return handler;
}

Starting with version 2.7, the error handler can be configured with one or more RetryListener s, receiving notifications of retry and recovery progress.

@FunctionalInterface
public interface RetryListener {

	void failedDelivery(ConsumerRecord record, Exception ex, int deliveryAttempt);

	default void recovered(ConsumerRecord record, Exception ex) {
	}

	default void recoveryFailed(ConsumerRecord record, Exception original, Exception failure) {
	}

}

See the javadocs for more information.

The SeekToCurrentBatchErrorHandler seeks each partition to the first record in each partition in the batch, so the whole batch is replayed.
Also see Committing Offsets for an alternative.
Also see Retrying Batch Error Handler.
This error handler does not support recovery, because the framework cannot know which message in the batch is failing.

After seeking, an exception that wraps the ListenerExecutionFailedException is thrown.
This is to cause the transaction to roll back (if transactions are enabled).

Starting with version 2.3, a BackOff can be provided to the SeekToCurrentErrorHandler and DefaultAfterRollbackProcessor so that the consumer thread can sleep for some configurable time between delivery attempts.
Spring Framework provides two out of the box BackOff s, FixedBackOff and ExponentialBackOff.
The maximum back off time must not exceed the max.poll.interval.ms consumer property, to avoid a rebalance.

Previously, the configuration was «maxFailures» (which included the first delivery attempt).
When using a FixedBackOff, its maxAttempts property represents the number of delivery retries (one less than the old maxFailures property).
Also, maxFailures=-1 meant retry indefinitely with the old configuration, with a BackOff you would set the maxAttempts to Long.MAX_VALUE for a FixedBackOff and leave the maxElapsedTime to its default in an ExponentialBackOff.

The SeekToCurrentBatchErrorHandler can also be configured with a BackOff to add a delay between delivery attempts.
Generally, you should configure the BackOff to never return STOP.
However, since this error handler has no mechanism to «recover» after retries are exhausted, if the BackOffExecution returns STOP, the previous interval will be used for all subsequent delays.
Again, the maximum delay must be less than the max.poll.interval.ms consumer property.
Also see Retrying Batch Error Handler.

If the recoverer fails (throws an exception), the failed record will be included in the seeks.
Starting with version 2.5.5, if the recoverer fails, the BackOff will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
With earlier versions, the BackOff was not reset and recovery was re-attempted on the next failure.
To revert to the previous behavior, set the error handler’s resetStateOnRecoveryFailure to false.

Starting with version 2.3.2, after a record has been recovered, its offset will be committed (if one of the container AckMode s is configured).
To revert to the previous behavior, set the error handler’s ackAfterHandle property to false.

Starting with version 2.6, you can now provide the error handler with a BiFunction, Exception, BackOff> to determine the BackOff to use, based on the failed record and/or the exception:

handler.setBackOffFunction((record, ex) -> { ... });

If the function returns null, the handler’s default BackOff will be used.

Starting with version 2.6.3, set resetStateOnExceptionChange to true and the retry sequence will be restarted (including the selection of a new BackOff, if so configured) if the exception type changes between failures.
By default, the exception type is not considered.

Starting with version 2.7, while waiting for a BackOff interval, the error handler will loop with a short sleep until the desired delay, while checking to see if the container has been stopped, allowing the sleep to exit soon after the stop() rather than causing a delay.

Retrying Batch Error Handler

As discussed above, the SeekToCurrentBatchErrorHandler has no mechanism to recover after a certain number of failures.
One reason for this is there is no guarantee that, when a batch is redelivered, the batch has the same number of records and/or the redelivered records are in the same order.
It is impossible, therefore, to maintain retry state for a batch.
The RetryingBatchErrorHandler takes a different approach.
If a batch listener throws an exception, and this error handler is configured, the retries are performed from the in-memory batch of records.
In order to avoid a rebalance during an extended retry sequence, the error handler pauses the consumer, polls it before sleeping for the back off, for each retry, and calls the listener again.
If/when retries are exhausted, the ConsumerRecordRecoverer is called for each record in the batch.
If the recoverer throws an exception, or the thread is interrupted during its sleep, a SeekToCurrentErrorHandler is invoked so that the batch of records will be redelivered on the next poll.
Before exiting, regardless of the outcome, the consumer is resumed.

This error handler cannot be used with transactions.

Starting with version 2.7, while waiting for a BackOff interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the stop() rather than causing a delay.

Recovering Batch Error Handler

This is now the default error handler for batch listeners.
The default configuration retries 9 times (10 delivery attempts) with no back off between deliveries.

This error handler works in conjunction with the listener throwing a BatchListenerFailedException providing the index in the batch where the failure occurred (or the failed record itself).
If the listener throws a different exception, or the index is out of range, the error handler falls back to invoking a SeekToCurrentBatchErrorHandler and the whole batch is retried, with no recovery available.
The sequence of events is:

  • Commit the offsets of the records before the index.

  • If retries are not exhausted, perform seeks so that all the remaining records (including the failed record) will be redelivered.

  • If retries are exhausted, attempt recovery of the failed record (default log only) and perform seeks so that the remaining records (excluding the failed record) will be redelivered.
    The recovered record’s offset is committed

  • If retries are exhausted and recovery fails, seeks are performed as if retries are not exhausted.

The default recoverer logs the failed record after retries are exhausted.
You can use a custom recoverer, or one provided by the framework such as the DeadLetterPublishingRecoverer.

In all cases, a BackOff can be configured to enable a delay between delivery attempts.

@Bean
public RecoveringBatchErrorHandler batchErrorHandler(KafkaTemplate template) {
    DeadLetterPublishingRecoverer recoverer =
            new DeadLetterPublishingRecoverer(template);
    RecoveringBatchErrorHandler errorHandler =
            new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(2L, 5000));
}
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List> records) {
    records.forEach(record -> {
        try {
            process(record);
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", record);
        }
    });
}

For example; say 10 records are in the original batch and no more records are added to the topic during the retries, and the failed record is at index 4 in the list.
After the first delivery fails, the offsets for the first 4 records will be committed; the remaing 6 will be redelivered after 5 seconds.
Most likely (but not necessarily) the failed record will be at index 0 in the redelivery.
If it fails again, it will be retried one more time and, if it again fails, it will be sent to a dead letter topic.

When using a POJO batch listener (e.g. List), and you don’t have the full consumer record to add to the exception, you can just add the index of the record that failed:

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List things) {
    for (int i = 0; i < records.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}
This error handler cannot be used with transactions
If the recoverer fails (throws an exception), the failed record will be included in the seeks.
Starting with version 2.5.5, if the recoverer fails, the BackOff will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
With earlier versions, the BackOff was not reset and recovery was re-attempted on the next failure.
To revert to the previous behavior, set the error handler’s resetStateOnRecoveryFailure to false.

Starting with version 2.6, you can now provide the error handler with a BiFunction, Exception, BackOff> to determine the BackOff to use, based on the failed record and/or the exception:

handler.setBackOffFunction((record, ex) -> { ... });

If the function returns null, the handler’s default BackOff will be used.

Starting with version 2.6.3, set resetStateOnExceptionChange to true and the retry sequence will be restarted (including the selection of a new BackOff, if so configured) if the exception type changes between failures.
By default, the exception type is not considered.

Starting with version 2.7, while waiting for a BackOff interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the stop() rather than causing a delay.

Starting with version 2.7, the error handler can be configured with one or more RetryListener s, receiving notifications of retry and recovery progress.

@FunctionalInterface
public interface RetryListener {

	void failedDelivery(ConsumerRecord record, Exception ex, int deliveryAttempt);

	default void recovered(ConsumerRecord record, Exception ex) {
	}

	default void recoveryFailed(ConsumerRecord record, Exception original, Exception failure) {
	}

}

See the javadocs for more information.

After-rollback Processor

When using transactions, if the listener throws an exception (and an error handler, if present, throws an exception), the transaction is rolled back.
By default, any unprocessed records (including the failed record) are re-fetched on the next poll.
This is achieved by performing seek operations in the DefaultAfterRollbackProcessor.
With a batch listener, the entire batch of records is reprocessed (the container has no knowledge of which record in the batch failed).
To modify this behavior, you can configure the listener container with a custom AfterRollbackProcessor.
For example, with a record-based listener, you might want to keep track of the failed record and give up after some number of attempts, perhaps by publishing it to a dead-letter topic.

Starting with version 2.2, the DefaultAfterRollbackProcessor can now recover (skip) a record that keeps failing.
By default, after ten failures, the failed record is logged (at the ERROR level).
You can configure the processor with a custom recoverer (BiConsumer) and maximum failures.
Setting the maxFailures property to a negative number causes infinite retries.
The following example configures recovery after three tries:

AfterRollbackProcessor processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));
Recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing.
In such cases, the application listener must handle a record that keeps failing.

Starting with version 2.2.5, the DefaultAfterRollbackProcessor can be invoked in a new transaction (started after the failed transaction rolls back).
Then, if you are using the DeadLetterPublishingRecoverer to publish a failed record, the processor will send the recovered record’s offset in the original topic/partition to the transaction.
To enable this feature, set the commitRecovered and kafkaTemplate properties on the DefaultAfterRollbackProcessor.

If the recoverer fails (throws an exception), the failed record will be included in the seeks.
Starting with version 2.5.5, if the recoverer fails, the BackOff will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
With earlier versions, the BackOff was not reset and recovery was re-attempted on the next failure.
To revert to the previous behavior, set the processor’s resetStateOnRecoveryFailure property to false.

Starting with version 2.6, you can now provide the processor with a BiFunction, Exception, BackOff> to determine the BackOff to use, based on the failed record and/or the exception:

handler.setBackOffFunction((record, ex) -> { ... });

If the function returns null, the processor’s default BackOff will be used.

Starting with version 2.6.3, set resetStateOnExceptionChange to true and the retry sequence will be restarted (including the selection of a new BackOff, if so configured) if the exception type changes between failures.
By default, the exception type is not considered.

Starting with version 2.3.1, similar to the SeekToCurrentErrorHandler, the DefaultAfterRollbackProcessor considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure.
The exceptions that are considered fatal, by default, are:

since these exceptions are unlikely to be resolved on a retried delivery.

You can add more exception types to the not-retryable category, or completely replace the map of classified exceptions.
See the Javadocs for DefaultAfterRollbackProcessor.setClassifications() for more information, as well as those for the spring-retry BinaryExceptionClassifier.

Here is an example that adds IllegalArgumentException to the not-retryable exceptions:

@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}
With current kafka-clients, the container cannot detect whether a ProducerFencedException is caused by a rebalance or if the producer’s transactional.id has been revoked due to a timeout or expiry.
Because, in most cases, it is caused by a rebalance, the container does not call the AfterRollbackProcessor (because it’s not appropriate to seek the partitions because we no longer are assigned them).
If you ensure the timeout is large enough to process each transaction and periodically perform an "empty" transaction (e.g. via a ListenerContainerIdleEvent) you can avoid fencing due to timeout and expiry.
Or, you can set the stopContainerWhenFenced container property to true and the container will stop, avoiding the loss of records.
You can consume a ConsumerStoppedEvent and check the Reason property for FENCED to detect this condition.
Since the event also has a reference to the container, you can restart the container using this event.

Starting with version 2.7, while waiting for a BackOff interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the stop() rather than causing a delay.

Starting with version 2.7, the processor can be configured with one or more RetryListener s, receiving notifications of retry and recovery progress.

@FunctionalInterface
public interface RetryListener {

	void failedDelivery(ConsumerRecord record, Exception ex, int deliveryAttempt);

	default void recovered(ConsumerRecord record, Exception ex) {
	}

	default void recoveryFailed(ConsumerRecord record, Exception original, Exception failure) {
	}

}

See the javadocs for more information.

The following applies to record listeners only, not batch listeners.

Starting with version 2.5, when using an ErrorHandler or AfterRollbackProcessor that implements DeliveryAttemptAware, it is possible to enable the addition of the KafkaHeaders.DELIVERY_ATTEMPT header (kafka_deliveryAttempt) to the record.
The value of this header is an incrementing integer starting at 1.
When receiving a raw ConsumerRecord the integer is in a byte[4].

int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt()

When using @KafkaListener with the DefaultKafkaHeaderMapper or SimpleKafkaHeaderMapper, it can be obtained by adding @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery as a parameter to the listener method.

To enable population of this header, set the container property deliveryAttemptHeader to true.
It is disabled by default to avoid the (small) overhead of looking up the state for each record and adding the header.

The SeekToCurrentErrorHandler and DefaultAfterRollbackProcessor support this feature.

Publishing Dead-letter Records

As discussed earlier, you can configure the SeekToCurrentErrorHandler and DefaultAfterRollbackProcessor (as well as the RecoveringBatchErrorHandler) with a record recoverer when the maximum number of failures is reached for a record.
The framework provides the DeadLetterPublishingRecoverer, which publishes the failed message to another topic.
The recoverer requires a KafkaTemplate, which is used to send the record.
You can also, optionally, configure it with a BiFunction, Exception, TopicPartition>, which is called to resolve the destination topic and partition.

By default, the dead-letter record is sent to a topic named .DLT (the original topic name suffixed with .DLT) and to the same partition as the original record.
Therefore, when you use the default resolver, the dead-letter topic must have at least as many partitions as the original topic.

If the returned TopicPartition has a negative partition, the partition is not set in the ProducerRecord, so the partition is selected by Kafka.
Starting with version 2.2.4, any ListenerExecutionFailedException (thrown, for example, when an exception is detected in a @KafkaListener method) is enhanced with the groupId property.
This allows the destination resolver to use this, in addition to the information in the ConsumerRecord to select the dead letter topic.

The following example shows how to wire a custom destination resolver:

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));

The record sent to the dead-letter topic is enhanced with the following headers:

  • KafkaHeaders.DLT_EXCEPTION_FQCN: The Exception class name.

  • KafkaHeaders.DLT_EXCEPTION_STACKTRACE: The Exception stack trace.

  • KafkaHeaders.DLT_EXCEPTION_MESSAGE: The Exception message.

  • KafkaHeaders.DLT_KEY_EXCEPTION_FQCN: The Exception class name (key deserialization errors only).

  • KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE: The Exception stack trace (key deserialization errors only).

  • KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE: The Exception message (key deserialization errors only).

  • KafkaHeaders.DLT_ORIGINAL_TOPIC: The original topic.

  • KafkaHeaders.DLT_ORIGINAL_PARTITION: The original partition.

  • KafkaHeaders.DLT_ORIGINAL_OFFSET: The original offset.

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP: The original timestamp.

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE: The original timestamp type.

There are two mechanisms to add more headers.

  1. Subclass the recoverer and override createProducerRecord() - call super.createProducerRecord() and add more headers.

  2. Provide a BiFunction to receive the consumer record and exception, returning a Headers object; headers from there will be copied to the final producer record.
    Use setHeadersFunction() to set the BiFunction.

The second is simpler to implement but the first has more information available, including the already assembled standard headers.

Starting with version 2.3, when used in conjunction with an ErrorHandlingDeserializer, the publisher will restore the record value(), in the dead-letter producer record, to the original value that failed to be deserialized.
Previously, the value() was null and user code had to decode the DeserializationException from the message headers.
In addition, you can provide multiple KafkaTemplate s to the publisher; this might be needed, for example, if you want to publish the byte[] from a DeserializationException, as well as values using a different serializer from records that were deserialized successfully.
Here is an example of configuring the publisher with KafkaTemplate s that use a String and byte[] serializer:

@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate stringTemplate,
        KafkaTemplate bytesTemplate) {

    Map, KafkaTemplate> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}

The publisher uses the map keys to locate a template that is suitable for the value() about to be published.
A LinkedHashMap is recommended so that the keys are examined in order.

When publishing null values, when there are multiple templates, the recoverer will look for a template for the Void class; if none is present, the first template from the values().iterator() will be used.

Since 2.7 you can use the setFailIfSendResultIsError method so that an exception is thrown when message publishing fails.
You can also set a timeout for the verification of the sender success with setWaitForSendResultTimeout.

If the recoverer fails (throws an exception), the failed record will be included in the seeks.
Starting with version 2.5.5, if the recoverer fails, the BackOff will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
With earlier versions, the BackOff was not reset and recovery was re-attempted on the next failure.
To revert to the previous behavior, set the error handler’s resetStateOnRecoveryFailure property to false.

Starting with version 2.6.3, set resetStateOnExceptionChange to true and the retry sequence will be restarted (including the selection of a new BackOff, if so configured) if the exception type changes between failures.
By default, the exception type is not considered.

The ErrorHandlingDeserializer adds the deserialization exception(s) in headers ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER and ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER (using java serialization).
By default, these headers are not retained in the message published to the dead letter topic.
Starting with version 2.7, if both the key and value fail deserialization, the original values of both are populated in the record sent to the DLT.

If incoming records are dependent on each other, but may arrive out of order, it may be useful to republish a failed record to the tail of the original topic (for some number of times), instead of sending it directly to the dead letter topic.
See this Stack Overflow Question for an example.

The following error handler configuration will do exactly that:

@Bean
public ErrorHandler eh(KafkaOperations template) {
    return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic.DLT", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}

Starting with version 2.7, the recoverer checks that the partition selected by the destination resolver actually exists.
If the partition is not present, the partition in the ProducerRecord is set to null, allowing the KafkaProducer to select the partition.
You can disable this check by setting the verifyPartition property to false.

Источник: https://docs.spring.io/spring-kafka/reference/html/

от admin

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *