package no.fintlabs.kafka.consuming;

import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.GenericMessageListener;
import org.springframework.kafka.support.JavaUtils;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:no/fintlabs/kafka/consuming/ListenerContainerFactoryService.class */
public class ListenerContainerFactoryService {
    private final ConsumerFactoryService consumerFactoryService;

    ListenerContainerFactoryService(ConsumerFactoryService consumerFactoryService) {
        this.consumerFactoryService = consumerFactoryService;
    }

    public <VALUE> ConcurrentKafkaListenerContainerFactory<String, VALUE> createRecordKafkaListenerContainerFactory(Class<VALUE> cls, Consumer<ConsumerRecord<String, VALUE>> consumer, ListenerConfiguration listenerConfiguration, Consumer<ConcurrentMessageListenerContainer<String, VALUE>> consumer2) {
        return createKafkaListenerContainerFactory(cls, () -> {
            return new OffsetSeekingRecordConsumer(consumer, listenerConfiguration.isSeekingOffsetResetOnAssignment());
        }, listenerConfiguration, consumer2);
    }

    public <VALUE> ConcurrentKafkaListenerContainerFactory<String, VALUE> createBatchKafkaListenerContainerFactory(Class<VALUE> cls, Consumer<List<ConsumerRecord<String, VALUE>>> consumer, ListenerConfiguration listenerConfiguration, Consumer<ConcurrentMessageListenerContainer<String, VALUE>> consumer2) {
        return createKafkaListenerContainerFactory(cls, () -> {
            return new OffsetSeekingBatchConsumer(consumer, listenerConfiguration.isSeekingOffsetResetOnAssignment());
        }, listenerConfiguration, consumer2);
    }

    private <VALUE, LISTENER extends AbstractConsumerSeekAware & GenericMessageListener<?>> ConcurrentKafkaListenerContainerFactory<String, VALUE> createKafkaListenerContainerFactory(Class<VALUE> cls, Supplier<LISTENER> supplier, ListenerConfiguration listenerConfiguration, Consumer<ConcurrentMessageListenerContainer<String, VALUE>> consumer) {
        ConsumerFactory createFactory = this.consumerFactoryService.createFactory(cls, listenerConfiguration);
        ConcurrentKafkaListenerContainerFactory<String, VALUE> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        concurrentKafkaListenerContainerFactory.setConsumerFactory(createFactory);
        JavaUtils javaUtils = JavaUtils.INSTANCE;
        DefaultErrorHandler errorHandler = listenerConfiguration.getErrorHandler();
        Objects.requireNonNull(concurrentKafkaListenerContainerFactory);
        javaUtils.acceptIfNotNull(errorHandler, (v1) -> {
            r2.setCommonErrorHandler(v1);
        });
        concurrentKafkaListenerContainerFactory.setContainerCustomizer(concurrentMessageListenerContainer -> {
            JavaUtils.INSTANCE.acceptIfNotNull(listenerConfiguration.getMaxPollRecords(), num -> {
                concurrentMessageListenerContainer.getContainerProperties().getKafkaConsumerProperties().setProperty("max.poll.records", String.valueOf(num));
            }).acceptIfNotNull(listenerConfiguration.getMaxPollInterval(), duration -> {
                concurrentMessageListenerContainer.getContainerProperties().getKafkaConsumerProperties().setProperty("max.poll.interval.ms", String.valueOf(duration.toMillis()));
            });
            AbstractConsumerSeekAware abstractConsumerSeekAware = (AbstractConsumerSeekAware) supplier.get();
            JavaUtils.INSTANCE.acceptIfNotNull(listenerConfiguration.getOffsetSeekingTrigger(), offsetSeekingTrigger -> {
                offsetSeekingTrigger.addOffsetResettingMessageListener(abstractConsumerSeekAware);
            });
            concurrentMessageListenerContainer.setupMessageListener(abstractConsumerSeekAware);
            consumer.accept(concurrentMessageListenerContainer);
        });
        return concurrentKafkaListenerContainerFactory;
    }
}
