package no.fintlabs.kafka.requestreply;

import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.SendResult;

/* loaded from: input_file:no/fintlabs/kafka/requestreply/RequestProducer.class */
public class RequestProducer<V, R> {
    private static final Logger log = LoggerFactory.getLogger(RequestProducer.class);
    private final ReplyingKafkaTemplate<String, V, R> replyingKafkaTemplate;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RequestProducer(ReplyingKafkaTemplate<String, V, R> replyingKafkaTemplate) {
        this.replyingKafkaTemplate = replyingKafkaTemplate;
    }

    public Optional<ConsumerRecord<String, R>> requestAndReceive(RequestProducerRecord<V> requestProducerRecord) {
        try {
            ConsumerRecord consumerRecord = (ConsumerRecord) request(requestProducerRecord).get();
            log.info("Reply: " + consumerRecord);
            return Optional.of(consumerRecord);
        } catch (InterruptedException | ExecutionException e) {
            log.error("Encountered error during request: " + e);
            return Optional.empty();
        }
    }

    public void requestWithAsyncReplyConsumer(RequestProducerRecord<V> requestProducerRecord, Consumer<ConsumerRecord<String, R>> consumer, Consumer<Throwable> consumer2) {
        try {
            request(requestProducerRecord).addCallback(consumerRecord -> {
                log.info("Reply: " + consumerRecord);
                consumer.accept(consumerRecord);
            }, th -> {
                handleAsyncFailure(consumer2, th);
            });
        } catch (InterruptedException | ExecutionException e) {
            handleAsyncFailure(consumer2, e);
        }
    }

    private RequestReplyFuture<String, V, R> request(RequestProducerRecord<V> requestProducerRecord) throws ExecutionException, InterruptedException {
        RequestReplyFuture<String, V, R> sendAndReceive = this.replyingKafkaTemplate.sendAndReceive(toProducerRecord(requestProducerRecord));
        log.debug("Sent ok: " + ((SendResult) sendAndReceive.getSendFuture().get()).getRecordMetadata());
        return sendAndReceive;
    }

    private ProducerRecord<String, V> toProducerRecord(RequestProducerRecord<V> requestProducerRecord) {
        return new ProducerRecord<>(requestProducerRecord.getTopicNameParameters().toTopicName(), (Integer) null, (Long) null, (Object) null, requestProducerRecord.getValue(), requestProducerRecord.getHeaders());
    }

    private void handleAsyncFailure(Consumer<Throwable> consumer, Throwable th) {
        log.error("Encountered error during request: " + th);
        consumer.accept(th);
    }
}
