package no.fintlabs.adapter.events;

import java.util.concurrent.Flow;
import no.fint.model.resource.FintLinks;
import no.fintlabs.adapter.AdapterProperties;
import no.fintlabs.adapter.events.EventPublisher;
import no.fintlabs.adapter.models.AdapterCapability;
import no.fintlabs.adapter.models.ResponseFintEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

/* loaded from: input_file:no/fintlabs/adapter/events/EventSubscriber.class */
public abstract class EventSubscriber<T extends FintLinks, P extends EventPublisher<T>> implements Flow.Subscriber<ResponseFintEvent<T>> {
    private static final Logger log = LoggerFactory.getLogger(EventSubscriber.class);
    private final WebClient webClient;
    protected final AdapterProperties adapterProperties;
    private final String capabilityKey;

    protected EventSubscriber(WebClient webClient, AdapterProperties adapterProperties, P p, String str) {
        this.webClient = webClient;
        this.adapterProperties = adapterProperties;
        this.capabilityKey = str;
        p.subscribe(this);
    }

    public void onSync(ResponseFintEvent<T> responseFintEvent) {
        log.info("Posting response to event {}", responseFintEvent.getCorrId());
        this.webClient.post().uri("/provider/event", new Object[0]).body(Mono.just(responseFintEvent), ResponseFintEvent.class).retrieve().toBodilessEntity().doOnError(th -> {
            log.error("Posting response to event failed", th);
        }).doOnNext(responseEntity -> {
            responsePostingEvent(responseEntity, responseFintEvent);
        });
    }

    protected abstract void responsePostingEvent(ResponseEntity<Void> responseEntity, ResponseFintEvent<T> responseFintEvent);

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        log.info("Subscribing to resources for endpoint {}", getCapability().getEntityUri());
        subscription.request(Long.MAX_VALUE);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(ResponseFintEvent<T> responseFintEvent) {
        onSync(responseFintEvent);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        log.error(th.getMessage(), th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        log.info("Subscriber for {} is closed", getCapability().getEntityUri());
    }

    protected AdapterCapability getCapability() {
        return this.adapterProperties.getCapabilityByResource(this.capabilityKey);
    }
}
