package no.fintlabs.kafka.consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import no.fintlabs.kafka.consumer.cache.FintCache;
import no.fintlabs.kafka.consumer.cache.FintCacheManager;
import no.fintlabs.kafka.topic.DomainContext;
import no.fintlabs.kafka.topic.TopicNameService;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanDefinitionCustomizer;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:no/fintlabs/kafka/consumer/EntityConsumerFactory.class */
public class EntityConsumerFactory extends AbstractConsumerSeekAware {

    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroupId;
    private final GenericApplicationContext applicationContext;
    private final ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;
    private final ObjectMapper objectMapper;
    private final FintCacheManager fintCacheManager;
    private final TopicNameService topicNameService;

    EntityConsumerFactory(GenericApplicationContext genericApplicationContext, ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory, ObjectMapper objectMapper, TopicNameService topicNameService, FintCacheManager fintCacheManager) {
        this.applicationContext = genericApplicationContext;
        this.kafkaListenerContainerFactory = concurrentKafkaListenerContainerFactory;
        this.objectMapper = objectMapper;
        this.fintCacheManager = fintCacheManager;
        this.topicNameService = topicNameService;
    }

    public <R> void createEntityConsumer(DomainContext domainContext, String str, Class<R> cls, Function<R, List<String>> function, boolean z) {
        ConcurrentMessageListenerContainer<String, String> createEntityConsumerContainer = createEntityConsumerContainer(domainContext, str, cls, function);
        if (z) {
            Optional.ofNullable(createEntityConsumerContainer.getAssignedPartitions()).ifPresent(this::seekOffsetsToBeginning);
        }
        String str2 = str.replace(".", "") + "EntityConsumer";
        this.applicationContext.registerBean(str2, ConcurrentMessageListenerContainer.class, new BeanDefinitionCustomizer[0]);
        this.applicationContext.getBean(str2);
        createEntityConsumerContainer.start();
    }

    private <R> ConcurrentMessageListenerContainer<String, String> createEntityConsumerContainer(DomainContext domainContext, String str, Class<R> cls, Function<R, List<String>> function) {
        String generateEntityTopicName = this.topicNameService.generateEntityTopicName(domainContext, str);
        FintCache createCache = this.fintCacheManager.createCache(str, String.class, cls);
        ConcurrentMessageListenerContainer<String, String> createContainer = this.kafkaListenerContainerFactory.createContainer(new String[]{generateEntityTopicName});
        createContainer.getContainerProperties().setGroupId(this.consumerGroupId);
        createContainer.getContainerProperties().setMessageListener(consumerRecord -> {
            try {
                Object readValue = this.objectMapper.readValue((String) consumerRecord.value(), cls);
                ((List) function.apply(readValue)).forEach(str2 -> {
                    createCache.put(str2, readValue);
                });
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        });
        return createContainer;
    }

    private void seekOffsetsToBeginning(Collection<TopicPartition> collection) {
        collection.forEach(this::seekOffsetToBeginning);
    }

    private void seekOffsetToBeginning(TopicPartition topicPartition) {
        Optional.ofNullable(getSeekCallbackFor(topicPartition)).ifPresent(consumerSeekCallback -> {
            consumerSeekCallback.seekToBeginning(topicPartition.topic(), topicPartition.partition());
        });
    }
}
