/*
 * Decompiled with CFR 0.152.
 */
package com.arms.config;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class KafkaLagMonitor {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired(required=false)
    private AdminClient kafkaAdminClient;
    @Value(value="${spring.kafka.consumer.group-id:requirement-consumer-group}")
    private String groupId;

    public long getConsumerLag(String topic) {
        if (this.kafkaAdminClient == null) {
            this.logger.warn("AdminClient is not available. Cannot get consumer lag.");
            return -1L;
        }
        try {
            this.logger.debug("Getting consumer lag for topic: {} with group: {}", (Object)topic, (Object)this.groupId);
            Map currentOffsets = this.getCurrentOffsets(topic);
            if (currentOffsets.isEmpty()) {
                this.logger.warn("No consumer offsets found for topic: {} in group: {}", (Object)topic, (Object)this.groupId);
                return 0L;
            }
            Map endOffsets = this.getEndOffsets(topic);
            long totalLag = 0L;
            for (Map.Entry entry : endOffsets.entrySet()) {
                TopicPartition tp = (TopicPartition)entry.getKey();
                long endOffset = (Long)entry.getValue();
                long currentOffset = currentOffsets.getOrDefault(tp, 0L);
                long lag = endOffset - currentOffset;
                totalLag += lag;
                this.logger.debug("[{}] Partition {}: Current={}, End={}, Lag={}", new Object[]{topic, tp.partition(), currentOffset, endOffset, lag});
            }
            this.logger.info("Topic [{}] Total Lag: {} messages", (Object)topic, (Object)totalLag);
            return totalLag;
        }
        catch (Exception e) {
            this.logger.error("Error getting consumer lag for topic: {}", (Object)topic, (Object)e);
            return -1L;
        }
    }

    private Map<TopicPartition, Long> getCurrentOffsets(String topic) throws ExecutionException, InterruptedException {
        ListConsumerGroupOffsetsResult result = this.kafkaAdminClient.listConsumerGroupOffsets(this.groupId);
        Map offsets = (Map)result.partitionsToOffsetAndMetadata().get();
        HashMap<TopicPartition, Long> currentOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry entry : offsets.entrySet()) {
            if (!((TopicPartition)entry.getKey()).topic().equals(topic)) continue;
            currentOffsets.put((TopicPartition)entry.getKey(), ((OffsetAndMetadata)entry.getValue()).offset());
        }
        return currentOffsets;
    }

    private Map<TopicPartition, Long> getEndOffsets(String topic) throws ExecutionException, InterruptedException {
        Map topicDescription = (Map)this.kafkaAdminClient.describeTopics(Collections.singleton(topic)).all().get();
        HashMap<TopicPartition, Long> endOffsets = new HashMap<TopicPartition, Long>();
        for (TopicPartitionInfo partitionInfo : ((TopicDescription)topicDescription.get(topic)).partitions()) {
            TopicPartition tp = new TopicPartition(topic, partitionInfo.partition());
            Map<TopicPartition, OffsetSpec> requestMap = Collections.singletonMap(tp, OffsetSpec.latest());
            ListOffsetsResult listOffsetsResult = this.kafkaAdminClient.listOffsets(requestMap);
            long endOffset = ((ListOffsetsResult.ListOffsetsResultInfo)((Map)listOffsetsResult.all().get()).get(tp)).offset();
            endOffsets.put(tp, endOffset);
        }
        return endOffsets;
    }

    public Map<Integer, Long> getDetailedLag(String topic) {
        HashMap<Integer, Long> partitionLags = new HashMap<Integer, Long>();
        if (this.kafkaAdminClient == null) {
            this.logger.warn("AdminClient is not available");
            return partitionLags;
        }
        try {
            Map currentOffsets = this.getCurrentOffsets(topic);
            Map endOffsets = this.getEndOffsets(topic);
            for (Map.Entry entry : endOffsets.entrySet()) {
                TopicPartition tp = (TopicPartition)entry.getKey();
                long endOffset = (Long)entry.getValue();
                long currentOffset = currentOffsets.getOrDefault(tp, 0L);
                long lag = endOffset - currentOffset;
                partitionLags.put(tp.partition(), lag);
                this.logger.debug("Partition {} - Lag: {}", (Object)tp.partition(), (Object)lag);
            }
        }
        catch (Exception e) {
            this.logger.error("Error getting detailed lag for topic: {}", (Object)topic, (Object)e);
        }
        return partitionLags;
    }

    public Map<String, Object> getConsumerStatus(String topic) {
        HashMap<String, Object> status = new HashMap<String, Object>();
        try {
            long totalLag = this.getConsumerLag(topic);
            Map detailedLag = this.getDetailedLag(topic);
            status.put("topic", topic);
            status.put("groupId", this.groupId);
            status.put("totalLag", totalLag);
            status.put("partitionCount", detailedLag.size());
            status.put("partitionLags", detailedLag);
            status.put("timestamp", System.currentTimeMillis());
            String lagStatus = totalLag == 0L ? "NO_LAG" : (totalLag < 100L ? "LOW" : (totalLag < 1000L ? "MEDIUM" : "HIGH"));
            status.put("lagStatus", lagStatus);
        }
        catch (Exception e) {
            this.logger.error("Error getting consumer status for topic: {}", (Object)topic, (Object)e);
            status.put("error", e.getMessage());
        }
        return status;
    }
}

