package com.arms.config; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; /* Zookeeper Cluster Zookeeper 는 분산 시스템에서 서비스 동기화와 naming registry 를 위해서 사용한다. 주로 하는 일은 Apache Kafka Cluster 의 상태를 추적하고 관리하는 역할을 한다. 이외에도 Zookeeper 는 모든 파티션에서 리더 팔로우 관계를 유지해주는 역할을 하도록 컨트롤러를 선택해주는 역할을 하기도 한다. Kafka Cluster Kafka 클러스터는 하나 이상의 Broker 로 구성된다. 주로 하는 역할은 Broker의 Controller 로서 역할을 한다. Broker Producer 로 부터 들어온 메시지를 저장하고 Consumer 가 이 메시지를 topic 별로 각 partition 에서 offset 을 기준으로 fetch 할 수 있도록 한다. 이중화를 하고 장애에 대응하는 역할도 한다. Producer 데이터 스트림을 생산하는 역할을 producer 가 합니다. 토큰 또는 메시지를 생성하고 이를 Kafka 클러스터의 하나 이상의 topic 에 추가로 append 하기 위해 Apache Kafka Producer 를 사용합니다 Consumer Consumer 는 Consumer Group 에 속해서 topic 을 subscribe 하고 해당 topic 에 있는 partition 에 데이터가 들어있다면 그 데이터를 가지고 오는 역할을 합니다. Topic 과 partition 카프카에서 Producer 가 보내는 메세지는 topic 으로 분류되고, topic 은 여러개의 파티션으로 나눠 질 수 있다. 파티션내의 한 칸은 로그라고 불린다. 데이터는 한 칸의 로그에 순차적으로 append 된다. 메세지의 상대적인 위치를 나타내는게 offset이다. 배열에서의 index를 생각하면 된다. Middle-Proxy KafkaConfig == Backend-Core KafkaConfig (코드가 100% 동일합니다) ``` ### 2. 하지만 역할은 정반대! ``` Middle-Proxy: ✅ Producer 사용 (메시지 발행) ❌ Consumer 미사용 (@KafkaListener 없음) Backend-Core: ❌ Producer 미사용 (설정만 존재) ✅ Consumer 사용 (ReqAddConsumer) ``` ### 3. 실제 메시지 플로우 ``` Frontend │ ▼ HTTP Request Middle-Proxy (Producer) │ kafkaTemplate.send("requirement", key, value) ▼ Kafka Message Kafka Cluster │ Consumer Poll ▼ Backend-Core (Consumer) │ @KafkaListener │ switch (CREATE/UPDATE/DELETE) │ internalService.process() ▼ Database */ @Configuration @RefreshScope public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers:Kafka00Service:9094,Kafka01Service:9094,Kafka02Service:9094}") private String bootstrapServers; @Value("${spring.kafka.topic.reqadd:REQADD}") private String reqAddTopic; @Bean public KafkaAdmin kafkaAdmin() { Map configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); return new KafkaAdmin(configs); } @Bean public AdminClient kafkaAdminClient(KafkaAdmin kafkaAdmin) { return AdminClient.create(kafkaAdmin.getConfigurationProperties()); } /** * REQADD 토픽 생성 * Partitions: 3, Replication Factor: 1 */ @Bean public NewTopic reqAddTopic() { return new NewTopic(reqAddTopic, 3, (short) 1); } @Bean public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { return new KafkaTemplate<>(producerFactory); } @Bean public ProducerFactory producerFactory() { Map configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // ✅ 추가 권장 설정 configs.put(ProducerConfig.ACKS_CONFIG, "all"); // 모든 replica 확인 configs.put(ProducerConfig.RETRIES_CONFIG, 3); // 재시도 3회 configs.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 배치 대기 configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 압축 configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 멱등성 configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); return new DefaultKafkaProducerFactory<>(configs); } }