package no.fintlabs.kafka.requestreply;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.SuccessCallback;

@Service
/* loaded from: input_file:no/fintlabs/kafka/requestreply/FintKafkaRequestService.class */
public class FintKafkaRequestService {
    private static final Logger log = LoggerFactory.getLogger(FintKafkaRequestService.class);
    private final RequestTopicService requestTopicService;
    private final ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;
    private final ObjectMapper objectMapper;

    /* loaded from: input_file:no/fintlabs/kafka/requestreply/FintKafkaRequestService$Reply.class */
    public static class Reply<V> {
        Headers headers;
        V value;

        public Reply(Headers headers, V v) {
            this.headers = headers;
            this.value = v;
        }
    }

    public FintKafkaRequestService(RequestTopicService requestTopicService, ReplyTopicService replyTopicService, FintKafkaReplyingTemplateFactory fintKafkaReplyingTemplateFactory, ObjectMapper objectMapper) {
        this.requestTopicService = requestTopicService;
        this.replyingKafkaTemplate = fintKafkaReplyingTemplateFactory.create("TODO");
        this.objectMapper = objectMapper;
    }

    public <V, R> void requestWithAsyncReplyConsumer(RequestTopicNameParameters requestTopicNameParameters, V v, Class<R> cls, Collection<Header> collection, Consumer<Reply<R>> consumer, Consumer<Throwable> consumer2) throws JsonProcessingException {
        try {
            RequestReplyFuture<String, String, String> request = request(createRequestProducerRecord(requestTopicNameParameters, v, collection));
            SuccessCallback successCallback = consumerRecord -> {
                if (consumerRecord == null) {
                    return;
                }
                try {
                    Object readValue = this.objectMapper.readValue((String) consumerRecord.value(), cls);
                    log.info("Return value: " + ((String) consumerRecord.value()));
                    consumer.accept(new Reply(consumerRecord.headers(), readValue));
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            };
            Objects.requireNonNull(consumer2);
            request.addCallback(successCallback, (v1) -> {
                r2.accept(v1);
            });
        } catch (InterruptedException | ExecutionException e) {
            log.error("Encountered error during request: " + e);
        }
    }

    public <V, R> Optional<Reply<R>> requestAndReceive(RequestTopicNameParameters requestTopicNameParameters, V v, Class<R> cls, Collection<Header> collection) throws JsonProcessingException {
        return (Optional<Reply<R>>) requestAndReceive(createRequestProducerRecord(requestTopicNameParameters, v, collection)).flatMap(consumerRecord -> {
            try {
                return Optional.of(new Reply(consumerRecord.headers(), this.objectMapper.readValue((String) consumerRecord.value(), cls)));
            } catch (JsonProcessingException e) {
                return Optional.empty();
            }
        });
    }

    private <V> ProducerRecord<String, String> createRequestProducerRecord(RequestTopicNameParameters requestTopicNameParameters, V v, Collection<Header> collection) throws JsonProcessingException {
        return new ProducerRecord<>(this.requestTopicService.getTopic(requestTopicNameParameters).name(), (Integer) null, (String) null, this.objectMapper.writeValueAsString(v), collection);
    }

    private Optional<ConsumerRecord<String, String>> requestAndReceive(ProducerRecord<String, String> producerRecord) {
        try {
            ConsumerRecord consumerRecord = (ConsumerRecord) request(producerRecord).get();
            log.info("Return value: " + ((String) consumerRecord.value()));
            return Optional.of(consumerRecord);
        } catch (InterruptedException | ExecutionException e) {
            log.error("Encountered error during request: " + e);
            return Optional.empty();
        }
    }

    private RequestReplyFuture<String, String, String> request(ProducerRecord<String, String> producerRecord) throws ExecutionException, InterruptedException {
        RequestReplyFuture<String, String, String> sendAndReceive = this.replyingKafkaTemplate.sendAndReceive(producerRecord);
        log.info("Sent ok: " + ((SendResult) sendAndReceive.getSendFuture().get()).getRecordMetadata());
        return sendAndReceive;
    }
}
