# ReqAdd Kafka Integration Guide ## πŸ“‹ κ°œμš” Middle Proxyμ—μ„œ `/auth-user/api/arms/reqAdd` μš”μ²­μ„ Kafka둜 λ°œν–‰ν•˜μ—¬ Backend-Coreμ—μ„œ 비동기 μ²˜λ¦¬ν•˜λŠ” ꡬ쑰 ## πŸ”„ λ©”μ‹œμ§€ ν”Œλ‘œμš° ``` Frontend β”‚ β”‚ HTTP POST /auth-user/api/arms/reqAdd/T_ARMS_REQADD_123/addNode.do β”‚ Body: { "c_title": "μƒˆ μš”κ΅¬μ‚¬ν•­", "ref": 1, ... } β–Ό Middle-Proxy (Producer) β”‚ ReqAddProxyController β”‚ kafkaTemplate.send("REQADD", "T_ARMS_REQADD_123", message) β–Ό Kafka Cluster β”‚ Topic: REQADD β”‚ Partitions: 3 β”‚ Key: changeReqTableName (같은 ν…Œμ΄λΈ”μ€ 같은 νŒŒν‹°μ…˜) β–Ό Backend-Core (Consumer) β”‚ @KafkaListener(topics = "REQADD") β”‚ switch (operation) β”‚ case ADD_NODE β†’ reqAddService.addNode() β”‚ case UPDATE_NODE β†’ reqAddService.updateNode() β”‚ case REMOVE_NODE β†’ reqAddService.removeNode() β”‚ case MOVE_NODE β†’ reqAddService.moveNode() β–Ό Database ``` --- ## πŸ“¦ μƒμ„±λœ 파일 ### 1. ReqAddKafkaMessage.java ``` 경둜: src/main/java/com/arms/api/kafka/reqadd/model/ReqAddKafkaMessage.java μ—­ν• : Kafka λ©”μ‹œμ§€ 포맷 μ •μ˜ ``` **ν•„λ“œ:** - `operation`: ADD_NODE, UPDATE_NODE, REMOVE_NODE, MOVE_NODE - `changeReqTableName`: ν…Œμ΄λΈ”λͺ… (νŒŒν‹°μ…˜ ν‚€λ‘œ μ‚¬μš©) - `method`: HTTP λ©”μ„œλ“œ (참고용) - `payload`: μš”μ²­ λ³Έλ¬Έ (JSON String) - `timestamp`: λ©”μ‹œμ§€ 생성 μ‹œκ°„ - `requestId`: μš”μ²­ 좔적 ID (UUID) ### 2. ReqAddProxyController.java ``` 경둜: src/main/java/com/arms/api/kafka/reqadd/controller/ReqAddProxyController.java μ—­ν• : HTTP μš”μ²­μ„ λ°›μ•„ Kafka둜 λ°œν–‰ ``` **μ—”λ“œν¬μΈνŠΈ:** - `POST /auth-user/api/arms/reqAdd/{changeReqTableName}/addNode.do` - `POST /auth-user/api/arms/reqAdd/{changeReqTableName}/updateNode.do` - `DELETE /auth-user/api/arms/reqAdd/{changeReqTableName}/removeNode.do` - `POST /auth-user/api/arms/reqAdd/{changeReqTableName}/moveNode.do` ### 3. AppConfig.java (μˆ˜μ •) ``` 경둜: src/main/java/com/arms/config/AppConfig.java μΆ”κ°€: ObjectMapper Bean ``` --- ## πŸ§ͺ ν…ŒμŠ€νŠΈ 방법 ### 1. λ…Έλ“œ μΆ”κ°€ (addNode) ```bash curl -X POST "http://localhost:13131/auth-user/api/arms/reqAdd/T_ARMS_REQADD_123/addNode.do" \ -H "Content-Type: application/json" \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "ref": 1, "c_position": 0, "c_title": "μƒˆλ‘œμš΄ μš”κ΅¬μ‚¬ν•­", "c_type": "default" }' ``` **μ˜ˆμƒ 응닡 (202 ACCEPTED):** ```json { "status": "ACCEPTED", "message": "Request queued for processing", "requestId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", "operation": "ADD_NODE", "table": "T_ARMS_REQADD_123", "timestamp": 1700000000000 } ``` ### 2. λ…Έλ“œ μˆ˜μ • (updateNode) ```bash curl -X POST "http://localhost:13131/auth-user/api/arms/reqAdd/T_ARMS_REQADD_123/updateNode.do" \ -H "Content-Type: application/json" \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "c_id": 123, "c_title": "μˆ˜μ •λœ 제λͺ©" }' ``` ### 3. λ…Έλ“œ μ‚­μ œ (removeNode) ```bash curl -X DELETE "http://localhost:13131/auth-user/api/arms/reqAdd/T_ARMS_REQADD_123/removeNode.do" \ -H "Content-Type: application/json" \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "c_id": 123 }' ``` ### 4. λ…Έλ“œ 이동 (moveNode) ```bash curl -X POST "http://localhost:13131/auth-user/api/arms/reqAdd/T_ARMS_REQADD_123/moveNode.do" \ -H "Content-Type: application/json" \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "c_id": 123, "ref": 2, "c_position": 1, "copy": 0 }' ``` --- ## πŸ”§ Backend-Core Consumer κ΅¬ν˜„ μ˜ˆμ‹œ Backend-Coreμ—μ„œ λ‹€μŒκ³Ό 같이 Consumerλ₯Ό κ΅¬ν˜„ν•΄μ•Ό ν•©λ‹ˆλ‹€: ```java @Service @Slf4j public class ReqAddKafkaConsumer { @Autowired private ReqAddService reqAddService; @Autowired private ObjectMapper objectMapper; @KafkaListener( topics = "REQADD", groupId = "backend-core-reqadd-consumer", containerFactory = "kafkaListenerContainerFactory" ) public void consume(String message) { try { // JSON 역직렬화 ReqAddKafkaMessage kafkaMessage = objectMapper.readValue(message, ReqAddKafkaMessage.class); log.info("πŸ“₯ Received Kafka Message | Operation: {}, Table: {}, RequestId: {}", kafkaMessage.getOperation(), kafkaMessage.getChangeReqTableName(), kafkaMessage.getRequestId()); // Payload 역직렬화 String payload = kafkaMessage.getPayload(); // Operation에 따라 λΆ„κΈ° switch (kafkaMessage.getOperation()) { case "ADD_NODE": reqAddService.addNode( kafkaMessage.getChangeReqTableName(), payload ); break; case "UPDATE_NODE": reqAddService.updateNode( kafkaMessage.getChangeReqTableName(), payload ); break; case "REMOVE_NODE": reqAddService.removeNode( kafkaMessage.getChangeReqTableName(), payload ); break; case "MOVE_NODE": reqAddService.moveNode( kafkaMessage.getChangeReqTableName(), payload ); break; default: log.warn("⚠️ Unknown operation: {}", kafkaMessage.getOperation()); } log.info("βœ… Message processed successfully | RequestId: {}", kafkaMessage.getRequestId()); } catch (Exception e) { log.error("❌ Failed to process Kafka message", e); // DLQ둜 μ „μ†‘ν•˜κ±°λ‚˜ 재처리 둜직 μΆ”κ°€ } } } ``` --- ## πŸ“Š Kafka λͺ¨λ‹ˆν„°λ§ ### 1. Topic 확인 ```bash kafka-topics.sh --bootstrap-server localhost:9094 \ --describe --topic REQADD ``` ### 2. Consumer Lag 확인 ```bash kafka-consumer-groups.sh --bootstrap-server localhost:9094 \ --describe --group backend-core-reqadd-consumer ``` ### 3. λ©”μ‹œμ§€ 쑰회 (개발 ν™˜κ²½) ```bash kafka-console-consumer.sh --bootstrap-server localhost:9094 \ --topic REQADD \ --from-beginning \ --property print.key=true \ --property print.timestamp=true ``` --- ## 🚨 μ—λŸ¬ 처리 ### 1. Kafka λ°œν–‰ μ‹€νŒ¨ - **ν˜„μƒ**: 500 Internal Server Error 응닡 - **원인**: Kafka ν΄λŸ¬μŠ€ν„° μ—°κ²° μ‹€νŒ¨ - **ν•΄κ²°**: Kafka μ„œλ²„ μƒνƒœ 확인, `bootstrap-servers` μ„€μ • 확인 ### 2. JSON 직렬화 μ‹€νŒ¨ - **ν˜„μƒ**: 400 Bad Request 응닡 - **원인**: 잘λͺ»λœ JSON 포맷 - **ν•΄κ²°**: μš”μ²­ λ³Έλ¬Έ ν˜•μ‹ 확인 ### 3. Consumer 처리 μ‹€νŒ¨ - **ν˜„μƒ**: λ©”μ‹œμ§€κ°€ Kafkaμ—λŠ” μžˆμœΌλ‚˜ DB 반영 μ•ˆλ¨ - **원인**: Backend-Core Consumer 였λ₯˜ - **ν•΄κ²°**: Backend-Core 둜그 확인 --- ## 🎯 μ£Όμ˜μ‚¬ν•­ ### 1. μˆœμ„œ 보μž₯ - 같은 `changeReqTableName`에 λŒ€ν•œ λ©”μ‹œμ§€λŠ” 같은 νŒŒν‹°μ…˜μœΌλ‘œ 전솑됨 - νŒŒν‹°μ…˜ λ‚΄μ—μ„œλŠ” μˆœμ„œκ°€ 보μž₯됨 ### 2. λ©±λ“±μ„± (Idempotence) - Producerμ—μ„œ `enable.idempotence=true` 섀정됨 - 쀑볡 λ©”μ‹œμ§€ λ°©μ§€ ### 3. μž¬μ‹œλ„ (Retry) - Producer μž¬μ‹œλ„: 3회 - Consumer μž¬μ‹œλ„: DLQ(Dead Letter Queue) κ΅¬ν˜„ ꢌμž₯ ### 4. 응닡 μ‹œκ°„ - ν΄λΌμ΄μ–ΈνŠΈλŠ” μ¦‰μ‹œ 202 ACCEPTED 응닡 λ°›μŒ - μ‹€μ œ μ²˜λ¦¬λŠ” Backend-Coreμ—μ„œ λΉ„λ™κΈ°λ‘œ μˆ˜ν–‰ - 처리 κ²°κ³Ό 확인이 ν•„μš”ν•œ 경우 별도 쑰회 API λ˜λŠ” WebSocket μ‚¬μš© --- ## πŸ“ 둜그 μ˜ˆμ‹œ ### Middle-Proxy (Producer) ``` 2024-11-25 10:30:45.123 INFO [reqadd-proxy] πŸ“€ Kafka Message Published | Topic: REQADD, Operation: ADD_NODE, Table: T_ARMS_REQADD_123, Partition: 1, Offset: 12345, RequestId: a1b2c3d4-e5f6-7890-abcd-ef1234567890 ``` ### Backend-Core (Consumer) ``` 2024-11-25 10:30:45.234 INFO [reqadd-consumer] πŸ“₯ Received Kafka Message | Operation: ADD_NODE, Table: T_ARMS_REQADD_123, RequestId: a1b2c3d4-e5f6-7890-abcd-ef1234567890 2024-11-25 10:30:45.567 INFO [reqadd-consumer] βœ… Message processed successfully | RequestId: a1b2c3d4-e5f6-7890-abcd-ef1234567890 ``` --- ## βœ… 체크리슀트 ### Middle-Proxy (μ™„λ£Œ) - [x] ReqAddKafkaMessage DTO 생성 - [x] ReqAddProxyController 생성 - [x] ObjectMapper Bean μΆ”κ°€ - [x] KafkaConfig 확인 ### Backend-Core (μž‘μ—… ν•„μš”) - [ ] ReqAddKafkaConsumer 생성 - [ ] KafkaConfig Consumer μ„€μ • μΆ”κ°€ - [ ] κΈ°μ‘΄ ReqAddService와 톡합 - [ ] DLQ(Dead Letter Queue) κ΅¬ν˜„ - [ ] μ—λŸ¬ 처리 및 μž¬μ‹œλ„ 둜직 μΆ”κ°€ --- ## πŸ”— κ΄€λ ¨ λ¬Έμ„œ - [Kafka 곡식 λ¬Έμ„œ](https://kafka.apache.org/documentation/) - [Spring Kafka λ¬Έμ„œ](https://spring.io/projects/spring-kafka) - [A-RMS μ•„ν‚€ν…μ²˜ λ¬Έμ„œ](./README.md)