package no.fintlabs.kafka.producing;

import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import no.fintlabs.kafka.model.RequestProducerRecord;
import no.fintlabs.kafka.topic.name.TopicNameService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.SendResult;

/* loaded from: input_file:no/fintlabs/kafka/producing/RequestTemplate.class */
public class RequestTemplate<V, R> {
    private static final Logger log = LoggerFactory.getLogger(RequestTemplate.class);
    private final ReplyingKafkaTemplate<String, V, R> replyingKafkaTemplate;
    private final TopicNameService topicNameService;

    public RequestTemplate(ReplyingKafkaTemplate<String, V, R> replyingKafkaTemplate, TopicNameService topicNameService) {
        this.replyingKafkaTemplate = replyingKafkaTemplate;
        this.topicNameService = topicNameService;
    }

    public Optional<ConsumerRecord<String, R>> requestAndReceive(RequestProducerRecord<V> requestProducerRecord) {
        try {
            ConsumerRecord<String, ?> consumerRecord = (ConsumerRecord) request(requestProducerRecord).get();
            logReply(consumerRecord);
            return Optional.of(consumerRecord);
        } catch (InterruptedException | ExecutionException e) {
            logRequestError(e);
            return Optional.empty();
        }
    }

    public void requestWithAsyncReplyConsumer(RequestProducerRecord<V> requestProducerRecord, Consumer<ConsumerRecord<String, R>> consumer, Consumer<Throwable> consumer2) {
        try {
            request(requestProducerRecord).whenComplete((consumerRecord, th) -> {
                if (th != null) {
                    handleAsyncFailure(consumer2, th);
                } else {
                    logReply(consumerRecord);
                    consumer.accept(consumerRecord);
                }
            });
        } catch (InterruptedException | ExecutionException e) {
            handleAsyncFailure(consumer2, e);
        }
    }

    private RequestReplyFuture<String, V, R> request(RequestProducerRecord<V> requestProducerRecord) throws ExecutionException, InterruptedException {
        RequestReplyFuture<String, V, R> sendAndReceive = this.replyingKafkaTemplate.sendAndReceive(toProducerRecord(requestProducerRecord));
        logRequestSendResult((SendResult) sendAndReceive.getSendFuture().get());
        return sendAndReceive;
    }

    private ProducerRecord<String, V> toProducerRecord(RequestProducerRecord<V> requestProducerRecord) {
        return new ProducerRecord<>(this.topicNameService.validateAndMapToTopicName(requestProducerRecord.getTopicNameParameters()), (Integer) null, (Long) null, requestProducerRecord.getKey(), requestProducerRecord.getValue(), requestProducerRecord.getHeaders());
    }

    private void handleAsyncFailure(Consumer<Throwable> consumer, Throwable th) {
        logRequestError(th);
        consumer.accept(th);
    }

    private void logRequestSendResult(SendResult<String, ?> sendResult) {
        log.debug("Sent request on topic={} with offset={} and correlationId={}", new Object[]{sendResult.getRecordMetadata().topic(), Long.valueOf(sendResult.getRecordMetadata().offset()), sendResult.getProducerRecord().headers().lastHeader("kafka_correlationId").value()});
    }

    private void logReply(ConsumerRecord<String, ?> consumerRecord) {
        log.debug("Received reply on topic={} with offset={} and correlationId={}", new Object[]{consumerRecord.topic(), Long.valueOf(consumerRecord.offset()), consumerRecord.headers().lastHeader("kafka_correlationId").value()});
    }

    private void logRequestError(Throwable th) {
        log.error("Encountered error during request", th);
    }
}
