package no.fintlabs.kafka.consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import lombok.NonNull;
import no.fintlabs.kafka.consumer.cache.FintCache;
import no.fintlabs.kafka.consumer.cache.FintCacheManager;
import no.fintlabs.kafka.topic.DomainContext;
import no.fintlabs.kafka.topic.TopicNameService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:no/fintlabs/kafka/consumer/EntityConsumerFactory.class */
public class EntityConsumerFactory {
    private static final Logger log = LoggerFactory.getLogger(EntityConsumerFactory.class);

    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroupId;
    private final ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;
    private final ObjectMapper objectMapper;
    private final FintCacheManager fintCacheManager;
    private final TopicNameService topicNameService;

    /* loaded from: input_file:no/fintlabs/kafka/consumer/EntityConsumerFactory$EntityConsumer.class */
    public static abstract class EntityConsumer extends AbstractConsumerSeekAware implements MessageListener<String, String> {
    }

    EntityConsumerFactory(ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory, ObjectMapper objectMapper, TopicNameService topicNameService, FintCacheManager fintCacheManager) {
        this.kafkaListenerContainerFactory = concurrentKafkaListenerContainerFactory;
        this.objectMapper = objectMapper;
        this.fintCacheManager = fintCacheManager;
        this.topicNameService = topicNameService;
    }

    public <R> ConcurrentMessageListenerContainer<String, String> createEntityConsumer(DomainContext domainContext, String str, final Class<R> cls, final Function<R, List<String>> function, final boolean z) {
        String generateEntityTopicName = this.topicNameService.generateEntityTopicName(domainContext, str);
        final FintCache createCache = this.fintCacheManager.createCache(str, String.class, cls);
        ConcurrentMessageListenerContainer<String, String> createContainer = this.kafkaListenerContainerFactory.createContainer(new String[]{generateEntityTopicName});
        createContainer.getContainerProperties().setGroupId(this.consumerGroupId);
        createContainer.getContainerProperties().setMessageListener(new EntityConsumer() { // from class: no.fintlabs.kafka.consumer.EntityConsumerFactory.1
            public void onMessage(ConsumerRecord<String, String> consumerRecord) {
                try {
                    Object readValue = EntityConsumerFactory.this.objectMapper.readValue((String) consumerRecord.value(), cls);
                    List list = (List) function.apply(readValue);
                    FintCache fintCache = createCache;
                    list.forEach(str2 -> {
                        fintCache.put(str2, readValue);
                    });
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }

            public void onPartitionsAssigned(@NonNull Map<TopicPartition, Long> map, @NonNull ConsumerSeekAware.ConsumerSeekCallback consumerSeekCallback) {
                if (map == null) {
                    throw new NullPointerException("assignments is marked non-null but is null");
                }
                if (consumerSeekCallback == null) {
                    throw new NullPointerException("callback is marked non-null but is null");
                }
                super.onPartitionsAssigned(map, consumerSeekCallback);
                if (z) {
                    consumerSeekCallback.seekToBeginning(map.keySet());
                }
            }
        });
        createContainer.start();
        return createContainer;
    }
}
