[Spring] Spring Kafka - Reactor kafka
[Spring] Spring Kafka - Reactor Kafka
Kafka
- 메시징 시스템(메시징 큐)이다.(e.g RabbitMQ)
- Distributed Event Streaming Platform이다.(https://kafka.apache.org)
- Pub/Sub 구조를 가지고 있다.
-> Publisher는 특정 토픽에 key/value(data)값을 publish한다.
-> Subscriber는 특정 토픽을 subscribe하여 데이터를 받는다.
- Kafka는 기본적으로 클러스터 구성으로 동작한다.
Local에서 Kafka 띄우기
- docker-compose-single-broker.yml 사용할 예정
- https://github.com/wurstmeister/kafka-docker git에서 clone
- docker-compose-single-broker.yml 파일 수정
-> services.kafka.environment.KAFKA_ADVERTISED_HOST_NAME을 127.0.0.1(localhost)로 변경
-> services.kafka.environment.KAFKA_CREATE_TOPICS 제거(topic은 수동으로 생성할 예정)
- docker-compose-single-broker.yml 파일
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
build: .
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- cmd : docker-compose -f docker-compose-single-broker.yml up
Kafka Tool
1. Kafka console 사용하기
- https://kafka.apache.org/downloads.html 에서 버전에 맞는 console에서 사용할 kafka 다운로드
1) 토픽 생성
- ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2) console consumer 실행
- ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
3) console producer 실행
- ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
2. Kadeck
- kafka 접근/kafka에 저장되는 데이터를 확인할 수 있는 gui툴(free software)
Reactor Kafka
- Reactive하게 사용할 수 있는 kafka api를 제공해준다.(http://projectreactor.io/docs/kafka/release/reference/)
- Spring webflux에서 사용할 수 있다.(Mono/Flux)
Spring Kafka Example
- 지난 번 포스트했던 SSE를 활용하여 kafka에서 컨슈밍한 데이터를 출력해주는 예제
-> https://dongdd.tistory.com/181
-> Spring boot 2.4.0으로 올렸더니 DirectProcessor가 deprecated되어 Sink.Many로 변경
1. Kafka Configuration
@Configuration
public class KafkaConfig {
@Value("${kafka.host}")
private String host;
@Value("${kafka.topic}")
private String topic;
@Value("${kafka.groupId}")
private String groupId;
@Bean("kafkaSender")
public KafkaSender<String, Object> kafkaSender() {
SenderOptions<String, Object> senderOptions = SenderOptions.create(getProducerProps());
senderOptions.scheduler(Schedulers.parallel());
senderOptions.closeTimeout(Duration.ofSeconds(5));
return KafkaSender.create(senderOptions);
}
@Bean
public ReceiverOptions<String, Object> receiverOptions() {
return ReceiverOptions.<String, Object>create(getConsumerProps())
.subscription(Collections.singleton(topic));
}
private Map<String, Object> getProducerProps() {
return new HashMap<>() {{
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000);
}};
}
private Map<String, Object> getConsumerProps() {
return new HashMap<>() {{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}};
}
}
- host/topic/groupId를 application.yml 파일에 정의하여 사용한다.
- KafkaSender와 ReceiverOptions를 Bean로 생성
1) Producer 옵션
- BOOTSTRAP_SERVERS_CONFIG : kafka의 host:port
- KEY_SERIALIZER_CLASS_CONFIG : key에 대한 serializer 설정
- VALUE_SERIALIZER_CLASS_CONFIG : value에 대한 serializer 설정
- MAX_BLOCK_MS_CONFIG : KafkaSender.send()에서 block이 일어날 수 있는데 최대 얼마나 block될 수 있는지에 대한 설정
2) Consumer 옵션
- BOOTSTRAP_SERVERS_CONFIG : kafka의 host:port
- KEY_DESERIALIZER_CLASS_CONFIG : key에 대한 deserializer 설정
- VALUE_DESERIALIZER_CLASS_CONFIG : value에 대한 deserializer 설정
- GROUP_ID_CONFIG : group id 설정
2. Kafka Consumer
@PostConstruct
public void init() { // Consumer를 열어놓음
disposable = KafkaReceiver.create(receiverOptions).receive()
.doOnNext(processReceivedData())
.doOnError(e -> {
System.out.println("Kafka read error");
init(); // 에러 발생 시, consumer가 종료되고 재시작할 방법이 없기 때문에 error시 재시작
})
.subscribe();
}
@PreDestroy
public void destroy() {
if (disposable != null) {
disposable.dispose();
}
kafkaSender.close();
}
private Consumer<ReceiverRecord<String, Object>> processReceivedData() {
return r -> {
System.out.println("Kafka Consuming");
Object receivedData = r.value();
if (receivedData != null) {
sinksMany.emitNext(r.value(), Sinks.EmitFailureHandler.FAIL_FAST); // data를 consuming할때마다 sink로 전송
}
r.receiverOffset().acknowledge();
};
}
- @PostConstructor를 사용하여 앱이 구동될 때 kafka consumer가 생성되도록 작성
- receiverOptions에 설정했던 topic에 데이터가 존재할 경우 processReceivedData() 호출하고 처리한다.
- processReceivedData()에서는 받은 데이터를 SSE로 내려주기 위해 Sink.Many에 데이터를 전송한다.
- doOnError에서 kafka에서 에러가 발생할 경우, consumer를 재생성/재시작한다.
3. Kafka Producer
public Mono<Boolean> send(String topic, String key, Object value) {
return kafkaSender.createOutbound()
.send(Mono.just(new ProducerRecord<>(topic, key, value))) // 해당 topic으로 message 전송
.then()
.map(ret -> true)
.onErrorResume(e -> {
System.out.println("Kafka send error");
return Mono.just(false);
});
}
- 1에서 생성했던 bean을 가져온다.
- createOutbound()로 데이터를 전송할 reactive gateway를 생성한다.
- producing할 topic/key/value값을 mono로 전송한다.
4. 동작
실제 적용하면서 겪었던 이슈
1. Kafka에 에러 발생 후 복구 시, kafka consumer가 재동작하지 않음
- 코드 상에서 @Postconstructor를 이용하여 consumer가 가동되는데, kafka에 문제가 생겨 연결이 끊기는 경우 kafka consumer가 종료되고 자동으로 재연결을 하지 않는다.
해결책
- KafkaReceiver 생성 시, 에러 처리로 consumer를 재시작하도록 처리(코드 레벨에서는 재귀처럼 동작할 것 같긴한데, 실제 사용해보았을 때 문제가 발생하지는 않았다.)
Github Code
https://github.com/DongDDo/spring_exam/tree/feature/ReactorKafka/kafkaexample
Reference
https://projectreactor.io/docs/kafka/release/reference/