package no.fintlabs.kafka.common;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import no.fintlabs.kafka.common.topic.TopicNameParameters;
import no.fintlabs.kafka.common.topic.TopicNamePatternParameters;
import no.fintlabs.kafka.requestreply.ReplyProducerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.GenericMessageListener;
import org.springframework.kafka.support.JavaUtils;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:no/fintlabs/kafka/common/ListenerContainerFactoryService.class */
public class ListenerContainerFactoryService {
    private final FintConsumerFactoryService fintConsumerFactoryService;

    public ListenerContainerFactoryService(FintConsumerFactoryService fintConsumerFactoryService) {
        this.fintConsumerFactoryService = fintConsumerFactoryService;
    }

    public <T> ConcurrentKafkaListenerContainerFactory<String, T> createEmptyListenerFactory(Class<T> cls, CommonErrorHandler commonErrorHandler) {
        ConsumerFactory<String, T> createFactory = this.fintConsumerFactoryService.createFactory(cls, ListenerConfiguration.empty());
        ConcurrentKafkaListenerContainerFactory<String, T> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        concurrentKafkaListenerContainerFactory.setConsumerFactory(createFactory);
        concurrentKafkaListenerContainerFactory.setCommonErrorHandler(commonErrorHandler);
        return concurrentKafkaListenerContainerFactory;
    }

    public <VALUE, REPLY_VALUE, TOPIC_NAME_PARAMETERS extends TopicNameParameters, TOPIC_NAME_PATTERN_PARAMETERS extends TopicNamePatternParameters> ListenerContainerFactory<VALUE, TOPIC_NAME_PARAMETERS, TOPIC_NAME_PATTERN_PARAMETERS> createReplyingListenerFactory(Function<TOPIC_NAME_PARAMETERS, String> function, Function<TOPIC_NAME_PATTERN_PARAMETERS, Pattern> function2, Class<VALUE> cls, KafkaTemplate<String, REPLY_VALUE> kafkaTemplate, Function<ConsumerRecord<String, VALUE>, ReplyProducerRecord<REPLY_VALUE>> function3, ListenerConfiguration listenerConfiguration) {
        return createRecordListenerContainerFactory(function, function2, cls, consumerRecord -> {
            ReplyProducerRecord replyProducerRecord = (ReplyProducerRecord) function3.apply(consumerRecord);
            ProducerRecord producerRecord = new ProducerRecord(new String(consumerRecord.headers().lastHeader("kafka_replyTopic").value(), StandardCharsets.UTF_8), (Integer) null, (Long) null, (Object) null, replyProducerRecord.getValue(), replyProducerRecord.getHeaders());
            producerRecord.headers().add("kafka_correlationId", ((Header) consumerRecord.headers().headers("kafka_correlationId").iterator().next()).value());
            kafkaTemplate.send(producerRecord);
        }, listenerConfiguration);
    }

    public <VALUE, TOPIC_NAME_PARAMETERS extends TopicNameParameters, TOPIC_NAME_PATTERN_PARAMETERS extends TopicNamePatternParameters> ListenerContainerFactory<VALUE, TOPIC_NAME_PARAMETERS, TOPIC_NAME_PATTERN_PARAMETERS> createRecordListenerContainerFactory(Function<TOPIC_NAME_PARAMETERS, String> function, Function<TOPIC_NAME_PATTERN_PARAMETERS, Pattern> function2, Class<VALUE> cls, Consumer<ConsumerRecord<String, VALUE>> consumer, ListenerConfiguration listenerConfiguration) {
        return new ListenerContainerFactory<>(createRecordKafkaListenerContainerFactory(cls, consumer, listenerConfiguration, concurrentMessageListenerContainer -> {
        }), function, function2);
    }

    public <VALUE, TOPIC_NAME_PARAMETERS extends TopicNameParameters, TOPIC_NAME_PATTERN_PARAMETERS extends TopicNamePatternParameters> ListenerContainerFactory<VALUE, TOPIC_NAME_PARAMETERS, TOPIC_NAME_PATTERN_PARAMETERS> createBatchListenerContainerFactory(Function<TOPIC_NAME_PARAMETERS, String> function, Function<TOPIC_NAME_PATTERN_PARAMETERS, Pattern> function2, Class<VALUE> cls, Consumer<List<ConsumerRecord<String, VALUE>>> consumer, ListenerConfiguration listenerConfiguration) {
        return new ListenerContainerFactory<>(createBatchKafkaListenerContainerFactory(cls, consumer, listenerConfiguration, concurrentMessageListenerContainer -> {
        }), function, function2);
    }

    public <VALUE> ConcurrentKafkaListenerContainerFactory<String, VALUE> createRecordKafkaListenerContainerFactory(Class<VALUE> cls, Consumer<ConsumerRecord<String, VALUE>> consumer, ListenerConfiguration listenerConfiguration, Consumer<ConcurrentMessageListenerContainer<String, VALUE>> consumer2) {
        return createKafkaListenerContainerFactory(cls, new OffsetSeekingMessageListener(consumer, listenerConfiguration.isSeekingOffsetResetOnAssignment()), listenerConfiguration, consumer2);
    }

    public <VALUE> ConcurrentKafkaListenerContainerFactory<String, VALUE> createBatchKafkaListenerContainerFactory(Class<VALUE> cls, Consumer<List<ConsumerRecord<String, VALUE>>> consumer, ListenerConfiguration listenerConfiguration, Consumer<ConcurrentMessageListenerContainer<String, VALUE>> consumer2) {
        return createKafkaListenerContainerFactory(cls, new OffsetSeekingBatchMessageListener(consumer, listenerConfiguration.isSeekingOffsetResetOnAssignment()), listenerConfiguration, consumer2);
    }

    public <VALUE, LISTENER extends AbstractConsumerSeekAware & GenericMessageListener<?>> ConcurrentKafkaListenerContainerFactory<String, VALUE> createKafkaListenerContainerFactory(Class<VALUE> cls, LISTENER listener, ListenerConfiguration listenerConfiguration, Consumer<ConcurrentMessageListenerContainer<String, VALUE>> consumer) {
        ConsumerFactory createFactory = this.fintConsumerFactoryService.createFactory(cls, listenerConfiguration);
        ConcurrentKafkaListenerContainerFactory<String, VALUE> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        concurrentKafkaListenerContainerFactory.setConsumerFactory(createFactory);
        JavaUtils javaUtils = JavaUtils.INSTANCE;
        DefaultErrorHandler errorHandler = listenerConfiguration.getErrorHandler();
        Objects.requireNonNull(concurrentKafkaListenerContainerFactory);
        javaUtils.acceptIfNotNull(errorHandler, (v1) -> {
            r2.setCommonErrorHandler(v1);
        });
        concurrentKafkaListenerContainerFactory.setContainerCustomizer(concurrentMessageListenerContainer -> {
            JavaUtils javaUtils2 = JavaUtils.INSTANCE;
            ContainerProperties.AckMode ackMode = listenerConfiguration.getAckMode();
            ContainerProperties containerProperties = concurrentMessageListenerContainer.getContainerProperties();
            Objects.requireNonNull(containerProperties);
            javaUtils2.acceptIfNotNull(ackMode, containerProperties::setAckMode);
            JavaUtils.INSTANCE.acceptIfNotNull(listenerConfiguration.getMaxPollRecords(), num -> {
                concurrentMessageListenerContainer.getContainerProperties().getKafkaConsumerProperties().setProperty("max.poll.records", String.valueOf(num));
            });
            JavaUtils.INSTANCE.acceptIfNotNull(listenerConfiguration.getMaxPollIntervalMs(), num2 -> {
                concurrentMessageListenerContainer.getContainerProperties().getKafkaConsumerProperties().setProperty("max.poll.interval.ms", String.valueOf(num2));
            });
            JavaUtils.INSTANCE.acceptIfNotNull(listenerConfiguration.getOffsetSeekingTrigger(), offsetSeekingTrigger -> {
                offsetSeekingTrigger.addOffsetResettingMessageListener(listener);
            });
            consumer.accept(concurrentMessageListenerContainer);
            concurrentMessageListenerContainer.setupMessageListener(listener);
            concurrentMessageListenerContainer.start();
        });
        return concurrentKafkaListenerContainerFactory;
    }
}
