package no.fintlabs.kafka.common.topic;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import no.fintlabs.kafka.CommonConfiguration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:no/fintlabs/kafka/common/topic/TopicService.class */
public class TopicService {
    private static final Logger log = LoggerFactory.getLogger(TopicService.class);
    private final KafkaAdmin kafkaAdmin;
    private final AdminClient kafkaAdminClient;
    private final CommonConfiguration commonConfiguration;

    public TopicService(KafkaAdmin kafkaAdmin, AdminClient adminClient, CommonConfiguration commonConfiguration) {
        this.kafkaAdmin = kafkaAdmin;
        this.kafkaAdminClient = adminClient;
        this.commonConfiguration = commonConfiguration;
    }

    public TopicDescription getTopic(TopicNameParameters topicNameParameters) {
        return getTopic(topicNameParameters.toTopicName());
    }

    public TopicDescription getTopic(String str) {
        return (TopicDescription) this.kafkaAdmin.describeTopics(new String[]{str}).get(str);
    }

    public Map<String, String> getTopicConfig(TopicNameParameters topicNameParameters) throws ExecutionException, InterruptedException {
        return getTopicConfig(topicNameParameters.toTopicName());
    }

    public Map<String, String> getTopicConfig(String str) throws ExecutionException, InterruptedException {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
        return (Map) ((Config) ((KafkaFuture) this.kafkaAdminClient.describeConfigs(List.of(configResource)).values().get(configResource)).get()).entries().stream().collect(Collectors.toMap((v0) -> {
            return v0.name();
        }, (v0) -> {
            return v0.value();
        }));
    }

    public void createOrModifyTopic(TopicNameParameters topicNameParameters, long j, TopicCleanupPolicyParameters topicCleanupPolicyParameters) {
        createOrModifyTopic(topicNameParameters.toTopicName(), j, topicCleanupPolicyParameters);
    }

    public void createOrModifyTopic(String str, long j, TopicCleanupPolicyParameters topicCleanupPolicyParameters) {
        this.kafkaAdmin.createOrModifyTopics(new NewTopic[]{TopicBuilder.name(str).replicas(this.commonConfiguration.getDefaultReplicas()).partitions(this.commonConfiguration.getDefaultPartitions()).build()});
        updateTopicRetentionTime(str, j);
        updateTopicCleanUpPolicy(str, topicCleanupPolicyParameters);
    }

    private String getCleanupPolicyOrDefault(TopicCleanupPolicyParameters topicCleanupPolicyParameters) {
        StringJoiner stringJoiner = new StringJoiner(", ");
        if (topicCleanupPolicyParameters.compact) {
            stringJoiner.add("compact");
        }
        if (topicCleanupPolicyParameters.delete) {
            stringJoiner.add("delete");
        }
        return stringJoiner.length() > 0 ? stringJoiner.toString() : this.commonConfiguration.getDefaultCleanupPolicy();
    }

    private void updateTopic(String str, String str2, String str3) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry(str2, str3), AlterConfigOp.OpType.SET);
        HashMap hashMap = new HashMap(1);
        hashMap.put(configResource, List.of(alterConfigOp));
        try {
            this.kafkaAdminClient.incrementalAlterConfigs(hashMap).all().get();
        } catch (InterruptedException | ExecutionException e) {
            log.error(e.getMessage());
        }
    }

    private void updateTopicRetentionTime(String str, long j) {
        updateTopic(str, "retention.ms", getRetentionTimeOrDefault(j));
    }

    private void updateTopicCleanUpPolicy(String str, TopicCleanupPolicyParameters topicCleanupPolicyParameters) {
        updateTopic(str, "cleanup.policy", getCleanupPolicyOrDefault(topicCleanupPolicyParameters));
    }

    private String getRetentionTimeOrDefault(long j) {
        return j > 0 ? String.valueOf(j) : String.valueOf(this.commonConfiguration.getDefaultRetentionTimeMs());
    }
}
