일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- LOB
- 운영체제
- Payload
- Operating System
- Lord of BOF
- Pwnable.kr
- System
- 정보처리기사 실기
- Buffer Overflow
- BOF
- wargame
- webhacking.kr
- PWN
- stack overflow
- OS
- 네트워크
- Spring Framework
- webhacking
- Spring
- 정보보안기사 실기
- pwnable
- 정보보안기사
- Spring MVC
- Shell code
- 워게임
- 웹해킹
- SQL
- 해킹
- system hacking
- hacking
- Today
- Total
DongDD's IT
[Spring] Spring webflux ServerSentEvent(SSE) 본문
[Spring] Spring webflux ServerSentEvent(SSE)
Server Sent Event(SSE)
- 일반적인 통신에서는 Client의 하나의 request에 하나의 response가 전달되는 형태로 통신이 이루어진다
- SSE에서 사용하는 event-stream 방식에서는 client가 request를 전송하면 connection이 맺어진 후에 half-duplex로 서버가 지속적으로 데이터를 보내줄 수 있다
Spring SSE
Spring MVC
- Spring MVC에서는 SSEEmitter를 이용하여 event-stream 방식의 SSE를 구현할 수 있다
Spring Webflux
- Spring Webflux는 reactive이기 때문에 Flux자체로 SSE를 구현할 수 있다
- webflux의 기본 단위 중 하나인 Flux를 이용하여 sse를 구현할 수 있다
- FluxProcessor의 구현체들이 SSE를 쉽게 구현할 수 있게 해준다(EmitterProcessor, DirectProcessor)
ServerSentEvent 구현
Flux.interval을 이용한 구현
- event stream을 가장 간단하게 구현할 수 있는 방법이다
- Flux.interval로 data를 내려줄 주기를 정하고 데이터를 뿌려주기만 하면 sse가 구현된다
@Override
public Flux<ServerSentEvent<String>> intervalStream() {
return Flux.interval(Duration.ofSeconds(1))
.map(i -> ServerSentEvent.builder("data " + i).build());
}
- 특정 주기(1초)마다 "data i"의 형태를 내려주는 코드이다
- Flux에서 쉽게 구현할 수 있도록 되어있다
결과
EmitterProcessor/DirectProcessor를 이용한 구현
- FluxProcessor의 구현체인 EmitterProcessor/DirectProcessor를 이용하여 구현할 수 있다
- 구현체로는 UnicastProcessor 등 여러가지가 더 있지만 두개만 사용해보았다.
@Override
public Flux<ServerSentEvent<String>> stream(String name, String processor) {
if (StringUtils.isEmpty(name)) {
return Flux.error(new IllegalStateException("invalid name"));
}
if (EMITTER_PROCESSOR.equals(processor)) {
return emitterProcessor
.publishOn(Schedulers.elastic())
.filter(sseModel -> name.equals(sseModel.getName()))
.map(sseModel -> ServerSentEvent.builder(sseModel.getMessage()).build())
.mergeWith(ping())
.doOnCancel(() -> System.out.println("Client is disconnected"));
} else if (DIRECT_PROCESSOR.equals(processor)) {
return directProcessor
.publishOn(Schedulers.elastic())
.filter(sseModel -> name.equals(sseModel.getName()))
.map(sseModel -> ServerSentEvent.builder(sseModel.getMessage()).build())
.mergeWith(ping())
.doOnCancel(() -> System.out.println("Client is disconnected"));
}
return Flux.error(new IllegalStateException("invalid processor"));
}
@Override
public Mono<Void> produceData(SseModel sseModel) {
directProcessorFluxSink.next(sseModel);
emitterProcessorFluxSink.next(sseModel);
return Mono.empty();
}
- name, message가 들어있는 데이터를 전송받고 데이터가 들어왔을 때, 들어온 name 해당하는 subscribe라면 message를 받게 되는 형태로 구현해보았다
- 각 processor를 bean으로 생성해두었다(github 코드 참조)
- processor로 fluxsink를 만들어주고 들어온 데이터를 fluxsink.next(data)를 이용하여 processor에 데이터를 보내준다
- processor에서는 들어온 data와 client가 요청한 name이 일치하는지 확인하고 일치하면 데이터를 내려주는 형식이다
- client의 disconnect를 감지하기 위해 mergeWith(ping())으로 빈 데이터를 내려주는 flux를 함께 동작하게한다
결과(EmitterProcessor)
결과(DirectProcessor)
SSE를 실제 적용하면서 생겼던 이슈
Client의 disconnect 감지
- 일반적으로 SSE를 생각해보면 data가 존재할 때만 데이터를 내려가게 해야한다고 생각했었다
- 데이터가 없는 경우, 데이터를 내려주지 않게 되면 client가 connection을 끊었을 때 이것을 서버에서 감지하기가 어려웠다
- 또한 data type이 string일 경우, ""와 같이 empty string을 내려준다고 해도 disconnect가 감지되지 않았고 설정한 connection time에 도달할 때까지 connection resource 낭비가 생겼었다
해결책
- 데이터를 Processor를 이용하여 내려주는 방식은 유지하고, Flux.interval로 다른 event stream을 merge하여 주기적으로 빈 데이터(ping)을 보내게 하여 disconnect를 감지
- 앞에서 말했듯이 empty string의 경우 감지하는 것이 불가능했기 때문에 ServerSentEvent로 String을 감싸서 보내주었다
EmitterProcessor의 큐
- FluxProcessor의 구현체인 EmitterProcessor를 처음에 사용했었다
- 원래 구현하려던 기능은 client가 subscribe한 후에 들어오는 데이터만 전송해주려는 기능이다
- EmitterProcessor는 자체에 내부 queue를 가지고 있어 subscriber가 없더라도 해당 데이터를 큐에 쌓아두게 되어있었다(해당 데이터는 subscribe를 통해 소비되거나 큐가 오버플로우 나지 않는 이상 사라지지 않음)
- 위의 상황때문에 client가 subscribe한 후에 이전에 들어왔던 데이터들도 모두 받게 되는, 구현하려던 기능과 다른 동작을 했었다
해결책
- EmitterProcessor가 내려주는 data에 filter를 걸어 조건에 만족하는 데이터만 내려주게 할 수 있었으나 굳이 필요없는 데이터를 한번더 체크해야 하나 싶어 다른 방법을 찾아보았다
- FluxProcessor의 구현체 중 하나인 DirectProcessor는 큐를 사용하지 않아 원하는 기능을 구현할 수 있을거라 생각했고 실제 적용했더니 정상 동작하는 것을 확인할 수 있었다
- DirectProcessor의 단점은 webflux에서 지원해주는 back pressure를 사용할 수 없다는 점이 있었다
-> 추후의 back pressure를 필요한 상황이 되면 첫번째 했던 말처럼 EmitterProcessor를 사용하고 필요없는 데이터를 필터하는 식으로 데이터를 내려주어야 할 것 같다
Gihub Code
https://github.com/DongDDo/spring_exam/tree/develop/sseexample
Reference
https://www.baeldung.com/spring-server-sent-events
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/EmitterProcessor.html
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/DirectProcessor.html
https://projectreactor.io/docs/core/release/reference/#_directprocessor
'프로그래밍 > Spring' 카테고리의 다른 글
[Spring] Spring Kafka - Reactor kafka (0) | 2020.11.26 |
---|---|
[Spring] Spring Data Redis - Cache (0) | 2019.12.21 |
[Spring] Spring Boot (0) | 2019.05.04 |
[Spring] Spring MyBatis (0) | 2019.05.04 |
[Spring] Spring JPA 2 - Spring Data JPA, Spring Data JPA CRUD (0) | 2019.04.28 |