프로그래밍/Spring

[Spring] Spring Kafka - Reactor kafka

DongDD 2020. 11. 26. 18:52

[Spring] Spring Kafka - Reactor Kafka

 

 

Kafka


- 메시징 시스템(메시징 큐)이다.(e.g RabbitMQ)

- Distributed Event Streaming Platform이다.(https://kafka.apache.org)

- Pub/Sub 구조를 가지고 있다.

    -> Publisher는 특정 토픽에 key/value(data)값을 publish한다.

    -> Subscriber는 특정 토픽을 subscribe하여 데이터를 받는다.

- Kafka는 기본적으로 클러스터 구성으로 동작한다.

 

 

Local에서 Kafka 띄우기


- docker-compose-single-broker.yml 사용할 예정

- https://github.com/wurstmeister/kafka-docker git에서 clone
- docker-compose-single-broker.yml 파일 수정
    -> services.kafka.environment.KAFKA_ADVERTISED_HOST_NAME을 127.0.0.1(localhost)로 변경
    -> services.kafka.environment.KAFKA_CREATE_TOPICS 제거(topic은 수동으로 생성할 예정)

- docker-compose-single-broker.yml 파일

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    build: .
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

- cmd : docker-compose -f docker-compose-single-broker.yml up

 

Kafka Tool


1. Kafka console 사용하기

- https://kafka.apache.org/downloads.html 에서 버전에 맞는 console에서 사용할 kafka 다운로드

1) 토픽 생성
- ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2) console consumer 실행
- ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
3) console producer 실행

- ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

 

2. Kadeck

- kafka 접근/kafka에 저장되는 데이터를 확인할 수 있는 gui툴(free software)

- http://www.getkadeck.com/#/

 

 

Reactor Kafka


- Reactive하게 사용할 수 있는 kafka api를 제공해준다.(http://projectreactor.io/docs/kafka/release/reference/)

- Spring webflux에서 사용할 수 있다.(Mono/Flux)

 

 

Spring Kafka Example


- 지난 번 포스트했던 SSE를 활용하여 kafka에서 컨슈밍한 데이터를 출력해주는 예제

    -> https://dongdd.tistory.com/181

    -> Spring boot 2.4.0으로 올렸더니 DirectProcessor가 deprecated되어 Sink.Many로 변경

 

1. Kafka Configuration

@Configuration
public class KafkaConfig {
    @Value("${kafka.host}")
    private String host;

    @Value("${kafka.topic}")
    private String topic;

    @Value("${kafka.groupId}")
    private String groupId;

    @Bean("kafkaSender")
    public KafkaSender<String, Object> kafkaSender() {
        SenderOptions<String, Object> senderOptions = SenderOptions.create(getProducerProps());
        senderOptions.scheduler(Schedulers.parallel());
        senderOptions.closeTimeout(Duration.ofSeconds(5));

        return KafkaSender.create(senderOptions);
    }

    @Bean
    public ReceiverOptions<String, Object> receiverOptions() {
        return ReceiverOptions.<String, Object>create(getConsumerProps())
                .subscription(Collections.singleton(topic));
    }

    private Map<String, Object> getProducerProps() {
        return new HashMap<>() {{
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000);
        }};
    }

    private Map<String, Object> getConsumerProps() {
        return new HashMap<>() {{
            put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
            put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        }};
    }
}

- host/topic/groupId를 application.yml 파일에 정의하여 사용한다.

- KafkaSender와 ReceiverOptions를 Bean로 생성

1) Producer 옵션

- BOOTSTRAP_SERVERS_CONFIG : kafka의 host:port

- KEY_SERIALIZER_CLASS_CONFIG : key에 대한 serializer 설정

- VALUE_SERIALIZER_CLASS_CONFIG : value에 대한 serializer 설정

- MAX_BLOCK_MS_CONFIG : KafkaSender.send()에서 block이 일어날 수 있는데 최대 얼마나 block될 수 있는지에 대한 설정

2) Consumer 옵션

- BOOTSTRAP_SERVERS_CONFIG : kafka의 host:port

- KEY_DESERIALIZER_CLASS_CONFIG : key에 대한 deserializer 설정

- VALUE_DESERIALIZER_CLASS_CONFIG : value에 대한 deserializer 설정

- GROUP_ID_CONFIG : group id 설정

 

2. Kafka Consumer

@PostConstruct
public void init() {    // Consumer를 열어놓음
    disposable = KafkaReceiver.create(receiverOptions).receive()
            .doOnNext(processReceivedData())
            .doOnError(e -> {
                System.out.println("Kafka read error");
                init();     // 에러 발생 시, consumer가 종료되고 재시작할 방법이 없기 때문에 error시 재시작
            })
            .subscribe();
}

@PreDestroy
public void destroy() {
    if (disposable != null) {
        disposable.dispose();
    }
    kafkaSender.close();
}

private Consumer<ReceiverRecord<String, Object>> processReceivedData() {
    return r -> {
        System.out.println("Kafka Consuming");
        Object receivedData = r.value();
        if (receivedData != null) {
            sinksMany.emitNext(r.value(), Sinks.EmitFailureHandler.FAIL_FAST);   // data를 consuming할때마다 sink로 전송
        }
        r.receiverOffset().acknowledge();
    };
}

- @PostConstructor를 사용하여 앱이 구동될 때 kafka consumer가 생성되도록 작성

- receiverOptions에 설정했던 topic에 데이터가 존재할 경우 processReceivedData() 호출하고 처리한다.

- processReceivedData()에서는 받은 데이터를 SSE로 내려주기 위해 Sink.Many에 데이터를 전송한다.

- doOnError에서 kafka에서 에러가 발생할 경우, consumer를 재생성/재시작한다.

 

3. Kafka Producer

public Mono<Boolean> send(String topic, String key, Object value) {
    return kafkaSender.createOutbound()
            .send(Mono.just(new ProducerRecord<>(topic, key, value)))  // 해당 topic으로 message 전송
            .then()
            .map(ret -> true)
            .onErrorResume(e -> {
                System.out.println("Kafka send error");
                return Mono.just(false);
            });
}

- 1에서 생성했던 bean을 가져온다.

- createOutbound()로 데이터를 전송할 reactive gateway를 생성한다.

- producing할 topic/key/value값을 mono로 전송한다.

 

4. 동작

 

 

실제 적용하면서 겪었던 이슈


1. Kafka에 에러 발생 후 복구 시, kafka consumer가 재동작하지 않음

- 코드 상에서 @Postconstructor를 이용하여 consumer가 가동되는데, kafka에 문제가 생겨 연결이 끊기는 경우 kafka consumer가 종료되고 자동으로 재연결을 하지 않는다.

 

해결책

- KafkaReceiver 생성 시, 에러 처리로 consumer를 재시작하도록 처리(코드 레벨에서는 재귀처럼 동작할 것 같긴한데, 실제 사용해보았을 때 문제가 발생하지는 않았다.)

 

 

Github Code


https://github.com/DongDDo/spring_exam/tree/feature/ReactorKafka/kafkaexample

 

DongDDo/spring_exam

sping_exam. Contribute to DongDDo/spring_exam development by creating an account on GitHub.

github.com

 

 

 

Reference


https://kafka.apache.org/

https://projectreactor.io/docs/kafka/release/reference/