Post

MSA 구현하기(3) - Kafka 설정 (feat. Docker)

Skala 과정에서 마이크로 서비스 아키텍처 구조에 대해 새롭게 알게 되었습니다.
배운 것을 실제로 구현해보기 위해 이 여정을 시작하기로 했습니다.
처음 글부터 보러가기

Docker를 사용하여 Kafka 서버를 구성해보도록 하겠습니다!

Kafka 구조 살펴보기

Kafka Kafka는 Producer에서 Topic을 정해 보낸 메시지를 해당 Topic을 구독하고 있는 Consumer가 받는 형태로 동작합니다.

공식 사이트를 참고하여 Broker, Partition의 개념에 대해 설명하겠습니다. Kafka-Broker-Partition 이렇게 Kafka의 Topic을 받는 Partition이 각 Broker에 생기게 됩니다.

Topic을 생성할 때 각 파티션에 생성될 Repication Factor의 개수를 설정해줄 수 있습니다.
Kafka-Topic-Replication 저의 경우는 2개로 설정해주어서 각 파티션 마다 복제본이 2개씩 생긴 것을 볼 수 있습니다. (Kafka UI에서 확인할 수 있습니다.)


Kafka 서버 띄우기

docker-compose.yml 구성

여러 개의 Kafka 서버를 띄우고, 각기 포트도 다르게 설정해주어야 하기 때문에 이럴 때 아주 편리하게 사용할 수 있는 Docker를 사용해보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
version: "3"

networks:
  kafka_network:

volumes:
  Kafka00:
    driver: local
  Kafka01:
    driver: local
  Kafka02:
    driver: local

services:
  Kafka00Service:
    image: bitnami/kafka:3.5.1-debian-11-r44
    restart: unless-stopped
    container_name: Kafka00Container
    ports:
      - "10000:9094" # 포트포워딩
    environment:
      - KAFKA_CFG_BROKER_ID=0
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka00Service:9092,EXTERNAL://127.0.0.1:10000
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    networks:
      - kafka_network
    volumes:
      - Kafka00:/bitnami/kafka

  Kafka01Service:
    image: bitnami/kafka:3.5.1-debian-11-r44
    restart: unless-stopped
    container_name: Kafka01Container
    ports:
      - "10001:9094"
    environment:
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka01Service:9092,EXTERNAL://127.0.0.1:10001
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    networks:
      - kafka_network
    volumes:
      - Kafka01:/bitnami/kafka

  Kafka02Service:
    image: bitnami/kafka:3.5.1-debian-11-r44
    restart: unless-stopped
    container_name: Kafka02Container
    ports:
      - "10002:9094"
    environment:
      - KAFKA_CFG_BROKER_ID=2
      - KAFKA_CFG_NODE_ID=2
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka02Service:9092,EXTERNAL://127.0.0.1:10002
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    networks:
      - kafka_network
    volumes:
      - Kafka02:/bitnami/kafka

  KafkaWebUiService:
    image: provectuslabs/kafka-ui:latest
    restart: always
    container_name: KafkaWebUiContainer
    ports:
      - "8085:8080" # 호스트의 8085 포트를 컨테이너의 8080 포트에 바인딩
    environment:
      - KAFKA_CLUSTERS_0_NAME=Local-Kraft-Cluster
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
      - DYNAMIC_CONFIG_ENABLED=true
      - KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED=true
      - KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED=true
    depends_on:
      - Kafka00Service
      - Kafka01Service
      - Kafka02Service
    networks:
      - kafka_network

저는 user, order, item, Kafka UI 총 3개의 Kafka 브로커와 UI 서버가 필요하기 때문에 4개의 컨테이너를 한꺼번에 생성하는 docker-compose.yml 파일을 설정해주도록 하겠습니다.

나중에 Kafka 관련 설정을 변경하고 싶을 때, 이 파일이 저장되어 있는 경로로 들어가서 docker-compose down 명령어로 동시에 4개의 컨테이너를 다운시키고, docker-compose up -d 다시 모든 컨테이너를 구동시킬 수 있습니다.

Kafka의 구동 상태 및 구조를 파악하기 쉽게 Kafka UI를 이용할 수 있습니다.
저는 로컬에서 8085번 포트에 접근하면 Kafka UI를 확인할 수 있도록 설정해주었습니다.

잘 구동되고 있는지, docker ps 명령어를 이용하여 확인합니다.

Docker-Ps

잘 돌아가고 있습니다 ~.~


Spring에서 Kafka 설정하기

Kafka 공통(?) yaml 파일 구성하기

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
  kafka:
    bootstrap-servers: localhost:10000,localhost:10001,localhost:10002 # 카프카 서버 주소
    # Producer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # Consumer
    consumer:
      group-id: error-handler-group
      auto-offset-reset: earliest
    template:
      default-topic: error-messages

Config 파일에서 설정해 줄 것이지만 yml에서 설정하는 방법도 있습니다 -.-


Kafka Producer 설정하기

  • user가 order를 생성하는 경우 -> item 데이터의 변경이 필요
  • user가 order를 취소하는 경우 -> item 데이터의 변경이 필요
  • user가 item을 삭제하는 경우 -> order 데이터의 변경이 필요
    등등..

이런 경우에 데이터가 변경되어야 하기 때문에 Kafka를 이용합니다.
일단 구색을 갖춰놓은 후에 producer와 consumer 역할을 제대로 나누어보도록 하겠습니다,,


KafkaProducerConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

yml 파일에 설정해 둔 bootstrap-servers 변수를 가져와서 bootstrapServers 변수에 저장하고, 생성해 둔 broker를 찾기 위해 URI를 등록합니다.


KafkaProducer.java

이제 실제로 메시지를 보내는 기능이 있는 KafkaProducer.java 를 작성해보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper mapper = new ObjectMapper();
    private final String DB_TOPIC = "db-connection-test";

    public User sendDbUpdateMessage(User user) {
        String jsonInString = Strings.EMPTY;
        try {
            jsonInString = mapper.writeValueAsString(user);
        } catch (JsonProcessingException e) {
            log.error(e.getMessage());
        }

        kafkaTemplate.send(DB_TOPIC, jsonInString);
        log.info("Kafka producer send data from user microservice: {}", user);
        return user;
    }
}

아직 MariaDB에서 FEDERATED 엔진을 통해 테이블 링크를 진행하지 않았다는 가정하에 User 데이터를 보내도록 만들었습니다.
추후에 링크까지 진행하고 나면, User 부분을 Message DTO로 바꾸어서 전달해보겠습니다.


Kafka Consumer 설정하기

이번에는 Item과 Order 서버에 Consumer 설정을 해보겠습니다.


KafkaConsumerConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "itemConsumerGroup"); // 저는 Order 서버의 그룹을 Item 서버의 그룹과 다르게 설정했습니다
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
        return kafkaListenerContainerFactory;
    }
}

Producer와 전반적인 설정은 비슷하지만, factory 관련 함수의 기능이 다릅니다.
(Producer의 코드를 절대 복사하지 마세요,,,,,,,,,,경험담,,입니다,,,)


KafkaConsumer.java

이 클래스는 사실 수신한 메시지를 어떻게 처리할 것인가에 대한 클래스이기 때문에 각자의 프로젝트 요구사항에 맞게 구현이 조금씩 달라야 합니다..!

예시로 user가 order를 생성하면 item 수량이 변경되도록 만든 코드를 보여드리겠습니다

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
    private final ItemRepository itemRepository;

    @Transactional
    @KafkaListener(topics = "db-connection-test")
    public void processMessage(String kafkaMessage) {
        log.info("Kafka Message: ======> \n{}", kafkaMessage);

        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<>() {});
        } catch (JsonProcessingException e) {
            log.error(e.getMessage());
        }

        Long targetItemId = Long.parseLong(String.valueOf(Optional.of(map.get("itemId"))
                .orElseThrow(() -> new IllegalStateException("Not found itemId"))));
        Item item = itemRepository.findById(targetItemId)
                .orElseThrow(() -> new IllegalStateException("Item Not Found"));
        Integer soldQuantity = (Integer) Optional.of(map.get("quantity"))
                .orElseThrow(() -> new IllegalStateException("Not found quantity"));

        item.changeQuantity(item.getQuantity() - soldQuantity);
    }
}

try 안 쪽 구문이 중요한 것 같습니다.
json을 매핑하는 구문이기 때문에 저 부분을 공통적으로 사용하게 될 것 같아 추후에는 클래스나 함수로 만들어야겠습니다!

마치며

여기까지도 잘 따라오셨다면 Kafka UI(localhost:8085)에서 다음과 같은 화면을 볼 수 있습니다. Kafka-UI

다음은 MariaDB 설정입니다 •´◡`•



참고 자료

하나, , ,

This post is licensed under CC BY 4.0 by the author.