/*
 * Decompiled with CFR 0.152.
 */
package com.arms.config;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

@Configuration
@RefreshScope
@EnableKafka
public class KafkaConfig {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Value(value="${spring.kafka.bootstrap-servers:Kafka00Service:9094,Kafka01Service:9094,Kafka02Service:9094}")
    private String bootstrapServers;
    @Value(value="${spring.kafka.consumer.group-id:requirement-consumer-group}")
    private String groupId;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", this.bootstrapServers);
        return new KafkaAdmin(configs);
    }

    @Bean
    public AdminClient kafkaAdminClient(KafkaAdmin kafkaAdmin) {
        return AdminClient.create((Map)kafkaAdmin.getConfigurationProperties());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", this.bootstrapServers);
        configs.put("key.serializer", StringSerializer.class);
        configs.put("value.serializer", StringSerializer.class);
        return new DefaultKafkaProducerFactory(configs);
    }

    @Bean
    public KafkaTemplate<String, String> reqAddKafkaTemplate() {
        return new KafkaTemplate(this.producerFactory());
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", this.bootstrapServers);
        configs.put("group.id", this.groupId);
        configs.put("key.deserializer", StringDeserializer.class);
        configs.put("value.deserializer", StringDeserializer.class);
        configs.put("auto.offset.reset", "earliest");
        configs.put("enable.auto.commit", false);
        configs.put("max.poll.records", 1);
        configs.put("session.timeout.ms", 300000);
        configs.put("heartbeat.interval.ms", 10000);
        configs.put("max.poll.interval.ms", 300000);
        return new DefaultKafkaConsumerFactory(configs);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> reqAddKafkaListenerContainerFactory() {
        this.logger.info("========================================");
        this.logger.info("Creating reqAddKafkaListenerContainerFactory Bean");
        this.logger.info("Bootstrap Servers: {}", (Object)this.bootstrapServers);
        this.logger.info("Consumer Group ID: {}", (Object)this.groupId);
        this.logger.info("========================================");
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(this.consumerFactory());
        factory.setConcurrency(Integer.valueOf(1));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            this.logger.error("=== Kafka Message Processing Failed ===");
            this.logger.error("Topic: {}", (Object)record.topic());
            this.logger.error("Key: {}", record.key());
            this.logger.error("Value: {}", record.value());
            this.logger.error("Partition: {}, Offset: {}", (Object)record.partition(), (Object)record.offset());
            this.logger.error("Exception: {}", (Object)exception.getMessage(), exception);
        }, (BackOff)new FixedBackOff(1000L, 3L));
        factory.setErrorHandler((ErrorHandler)errorHandler);
        this.logger.info("reqAddKafkaListenerContainerFactory Bean created successfully");
        return factory;
    }
}

