프로그래밍/Spring

[Spring] Spring webflux ServerSentEvent(SSE)

DongDD 2019. 12. 8. 13:52

[Spring] Spring webflux ServerSentEvent(SSE)

 

Server Sent Event(SSE)


 

- 일반적인 통신에서는 Client의 하나의 request에 하나의 response가 전달되는 형태로 통신이 이루어진다

- SSE에서 사용하는 event-stream 방식에서는 client가 request를 전송하면 connection이 맺어진 후에 half-duplex로 서버가 지속적으로 데이터를 보내줄 수 있다

 

 

Spring SSE


Spring MVC

- Spring MVC에서는 SSEEmitter를 이용하여 event-stream 방식의 SSE를 구현할 수 있다

 

Spring Webflux

- Spring Webflux는 reactive이기 때문에 Flux자체로 SSE를 구현할 수 있다

- webflux의 기본 단위 중 하나인 Flux를 이용하여 sse를 구현할 수 있다

- FluxProcessor의 구현체들이 SSE를 쉽게 구현할 수 있게 해준다(EmitterProcessor, DirectProcessor)

 

 

ServerSentEvent 구현


 

Flux.interval을 이용한 구현

- event stream을 가장 간단하게 구현할 수 있는 방법이다

- Flux.interval로 data를 내려줄 주기를 정하고 데이터를 뿌려주기만 하면 sse가 구현된다

@Override
public Flux<ServerSentEvent<String>> intervalStream() {
	return Flux.interval(Duration.ofSeconds(1))
		.map(i -> ServerSentEvent.builder("data " + i).build());
}

- 특정 주기(1초)마다 "data i"의 형태를 내려주는 코드이다

- Flux에서 쉽게 구현할 수 있도록 되어있다

결과

 

EmitterProcessor/DirectProcessor를 이용한 구현

- FluxProcessor의 구현체인 EmitterProcessor/DirectProcessor를 이용하여 구현할 수 있다

- 구현체로는 UnicastProcessor 등 여러가지가 더 있지만 두개만 사용해보았다.

@Override
public Flux<ServerSentEvent<String>> stream(String name, String processor) {
    if (StringUtils.isEmpty(name)) {
        return Flux.error(new IllegalStateException("invalid name"));
    }
    if (EMITTER_PROCESSOR.equals(processor)) {
        return emitterProcessor
                .publishOn(Schedulers.elastic())
                .filter(sseModel -> name.equals(sseModel.getName()))
                .map(sseModel -> ServerSentEvent.builder(sseModel.getMessage()).build())
                .mergeWith(ping())
                .doOnCancel(() -> System.out.println("Client is disconnected"));
    } else if (DIRECT_PROCESSOR.equals(processor)) {
        return directProcessor
                .publishOn(Schedulers.elastic())
                .filter(sseModel -> name.equals(sseModel.getName()))
                .map(sseModel -> ServerSentEvent.builder(sseModel.getMessage()).build())
                .mergeWith(ping())
                .doOnCancel(() -> System.out.println("Client is disconnected"));
    }

    return Flux.error(new IllegalStateException("invalid processor"));
}

@Override
public Mono<Void> produceData(SseModel sseModel) {
    directProcessorFluxSink.next(sseModel);
    emitterProcessorFluxSink.next(sseModel);
    return Mono.empty();
}

- name, message가 들어있는 데이터를 전송받고 데이터가 들어왔을 때, 들어온 name 해당하는 subscribe라면 message를 받게 되는 형태로 구현해보았다

- 각 processor를  bean으로 생성해두었다(github 코드 참조)

- processor로 fluxsink를 만들어주고 들어온 데이터를 fluxsink.next(data)를 이용하여 processor에 데이터를 보내준다

- processor에서는 들어온 data와 client가 요청한 name이 일치하는지 확인하고 일치하면 데이터를 내려주는 형식이다

- client의 disconnect를 감지하기 위해 mergeWith(ping())으로 빈 데이터를 내려주는 flux를 함께 동작하게한다

결과(EmitterProcessor)

결과(DirectProcessor)

 

SSE를 실제 적용하면서 생겼던 이슈


 

Client의 disconnect 감지

- 일반적으로 SSE를 생각해보면 data가 존재할 때만 데이터를 내려가게 해야한다고 생각했었다

- 데이터가 없는 경우, 데이터를 내려주지 않게 되면 client가 connection을 끊었을 때 이것을 서버에서 감지하기가 어려웠다

 

- 또한 data type이 string일 경우, ""와 같이 empty string을 내려준다고 해도 disconnect가 감지되지 않았고 설정한 connection time에 도달할 때까지 connection resource 낭비가 생겼었다

 

해결책

- 데이터를 Processor를 이용하여 내려주는 방식은 유지하고, Flux.interval로 다른 event stream을 merge하여 주기적으로 빈 데이터(ping)을 보내게 하여 disconnect를 감지

- 앞에서 말했듯이 empty string의 경우 감지하는 것이 불가능했기 때문에 ServerSentEvent로 String을 감싸서 보내주었다

 

EmitterProcessor의 큐

- FluxProcessor의 구현체인 EmitterProcessor를 처음에 사용했었다

- 원래 구현하려던 기능은 client가 subscribe한 후에 들어오는 데이터만 전송해주려는 기능이다

- EmitterProcessor는 자체에 내부 queue를 가지고 있어 subscriber가 없더라도 해당 데이터를 큐에 쌓아두게 되어있었다(해당 데이터는 subscribe를 통해 소비되거나 큐가 오버플로우 나지 않는 이상 사라지지 않음)

- 위의 상황때문에 client가 subscribe한 후에 이전에 들어왔던 데이터들도 모두 받게 되는, 구현하려던 기능과 다른 동작을 했었다

 

해결책

- EmitterProcessor가 내려주는 data에 filter를 걸어 조건에 만족하는 데이터만 내려주게 할 수 있었으나 굳이 필요없는 데이터를 한번더 체크해야 하나 싶어 다른 방법을 찾아보았다

- FluxProcessor의 구현체 중 하나인 DirectProcessor는 큐를 사용하지 않아 원하는 기능을 구현할 수 있을거라 생각했고 실제 적용했더니 정상 동작하는 것을 확인할 수 있었다

- DirectProcessor의 단점은 webflux에서 지원해주는 back pressure를 사용할 수 없다는 점이 있었다

-> 추후의 back pressure를 필요한 상황이 되면 첫번째 했던 말처럼 EmitterProcessor를 사용하고 필요없는 데이터를 필터하는 식으로 데이터를 내려주어야 할 것 같다

 

 

Gihub Code


https://github.com/DongDDo/spring_exam/tree/develop/sseexample

 

DongDDo/spring_exam

sping_exam. Contribute to DongDDo/spring_exam development by creating an account on GitHub.

github.com

 

 

Reference


https://www.baeldung.com/spring-server-sent-events

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/EmitterProcessor.html

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/DirectProcessor.html

https://projectreactor.io/docs/core/release/reference/#_directprocessor