package no.fintlabs.adapter.datasync;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import no.fint.model.resource.FintLinks;
import no.fintlabs.adapter.config.AdapterProperties;
import no.fintlabs.adapter.datasync.ResourcePublisher;
import no.fintlabs.adapter.models.AdapterCapability;
import no.fintlabs.adapter.models.FullSyncPage;
import no.fintlabs.adapter.models.SyncPage;
import no.fintlabs.adapter.models.SyncPageEntry;
import no.fintlabs.adapter.models.SyncPageMetadata;
import no.fintlabs.adapter.models.SyncType;
import no.fintlabs.adapter.validator.ValidatorService;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:no/fintlabs/adapter/datasync/ResourceSubscriber.class */
public abstract class ResourceSubscriber<T extends FintLinks, P extends ResourcePublisher<T, ResourceRepository<T>>> implements Flow.Subscriber<SyncData<T>> {
    private static final Logger log = LoggerFactory.getLogger(ResourceSubscriber.class);

    @Value("${fint.adapter.page-size:100}")
    private int pageSize;
    private final WebClient webClient;
    private final ValidatorService<T> validatorService;
    protected final AdapterProperties adapterProperties;

    protected ResourceSubscriber(WebClient webClient, AdapterProperties adapterProperties, P p, ValidatorService<T> validatorService) {
        this.webClient = webClient;
        this.adapterProperties = adapterProperties;
        this.validatorService = validatorService;
        p.subscribe(this);
    }

    public void onSync(SyncData<T> syncData) {
        log.info("Syncing {} items to endpoint {}", Integer.valueOf(syncData.getResources().size()), getCapability().getEntityUri());
        Instant now = Instant.now();
        Flux.fromIterable(getPages(syncData.getResources(), syncData.getSyncType())).flatMap(this::sendPages).doOnComplete(() -> {
            logDuration(syncData.getResources().size(), now);
        }).blockLast();
    }

    private void logDuration(int i, Instant instant) {
        Duration between = Duration.between(instant, Instant.now());
        log.info("Syncing {} resources in {} pages took {}:{}:{} to complete", new Object[]{Integer.valueOf(i), Integer.valueOf(((i + this.pageSize) - 1) / this.pageSize), "%02d".formatted(Integer.valueOf(between.toHoursPart())), "%02d".formatted(Integer.valueOf(between.toMinutesPart())), "%02d".formatted(Integer.valueOf(between.toSecondsPart()))});
    }

    protected abstract AdapterCapability getCapability();

    protected Mono<?> sendPages(SyncPage<T> syncPage) {
        return this.webClient.method(syncPage.getSyncType().getHttpMethod()).uri("/provider" + getCapability().getEntityUri(), new Object[0]).body(Mono.just(syncPage), SyncPage.class).retrieve().toBodilessEntity().doOnNext(responseEntity -> {
            log.info("Page {} returned {}. ({})", new Object[]{Long.valueOf(syncPage.getMetadata().getPage()), syncPage.getMetadata().getCorrId(), responseEntity.getStatusCode()});
        });
    }

    public List<SyncPage<T>> getPages(List<T> list, SyncType syncType) {
        String uuid = UUID.randomUUID().toString();
        int size = list.size();
        int i = ((size + this.pageSize) - 1) / this.pageSize;
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        while (true) {
            int i3 = i2;
            if (i3 >= size) {
                break;
            }
            arrayList.add(createSyncPage(uuid, list, syncType, size, i, i3));
            i2 = i3 + this.pageSize;
        }
        if (this.adapterProperties.isDebug()) {
            this.validatorService.validate(arrayList, size);
        }
        return arrayList;
    }

    private SyncPage<T> createSyncPage(String str, List<T> list, SyncType syncType, int i, int i2, int i3) {
        List<SyncPageEntry<T>> syncPageEntries = getSyncPageEntries(list, i3);
        return FullSyncPage.builder().metadata(getSyncPageMetadata(str, i, i2, i3, syncPageEntries)).resources(syncPageEntries).syncType(syncType).build();
    }

    private SyncPageMetadata getSyncPageMetadata(String str, int i, int i2, int i3, List<SyncPageEntry<T>> list) {
        return SyncPageMetadata.builder().orgId(this.adapterProperties.getOrgId()).adapterId(this.adapterProperties.getId()).corrId(str).totalPages(i2).totalSize(i).pageSize(list.size()).page((i3 / this.pageSize) + 1).uriRef(getCapability().getEntityUri()).time(System.currentTimeMillis()).build();
    }

    @NotNull
    private List<SyncPageEntry<T>> getSyncPageEntries(List<T> list, int i) {
        return (List) list.subList(i, Math.min(i + this.pageSize, list.size())).stream().map(this::createSyncPageEntry).collect(Collectors.toList());
    }

    protected abstract SyncPageEntry<T> createSyncPageEntry(T t);

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        log.info("Subscribing to resources for endpoint {}", getCapability().getEntityUri());
        subscription.request(Long.MAX_VALUE);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(SyncData<T> syncData) {
        onSync(syncData);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        log.error(th.getMessage(), th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        log.info("Subscriber for {} is closed", getCapability().getEntityUri());
    }
}
