본문으로 바로가기

Reactive Streams 개념 정리

category Coding/Java Spring 2023. 8. 23. 19:00
반응형

구성 요소

  • Publisher: 데이터를 생성하고 통지(발행, 게시, 방출)하는 역할을 한다.
  • Subscriber: 구독한 Publisher로부터 통지된 데이터를 전달받아서 처리하는 역할을 한다.
  • Subscription: Publisher에 요청할 데이터의 개수를 지정하고 데이터의 구독을 취소하는 역할을 한다.
  • Processor: Publisher와 Subscriber의 기능을 모두 가지고 있다. 즉, Subscriber로서 다른 Publisher를 구독할 수 있고 Publisher로서 다른 Subscriber가 구독할 수 있다.

동작 과정

  1. Subscriber는 Publisher를 구독한다. (subscribe)
  2. Publisher는 Subscriber에게 데이터를 통지할 준비가 되었음을 알린다. (onSubscribe)
  3. Subscriber는 Publisher에게 전달받기를 원하는 데이터의 개수를 요청한다. (Subscription.request)
  4. Publisher는 3번에서 요청받은 만큼의 데이터를 생성하고 통지한다. (onNext)
  5. 이렇게 Publisher와 Subscriber간에 데이터 통지, 데이터 수신, 데이터 요청의 과정을 반복하다가 Publisher가 모든 데이터를 통지하게 되면 마지막으로 데이터 전송이 완료되었음을 Subscriber에게 알린다. (onComplete)
  6. 만약 Publisher가 데이터를 처리하는 과정에서 에러가 발생하면 에러가 발생했음을 Subscriber에게 알린다. (onError)

위 과정 3번을 보면 Subscriber는 Publisher에게 데이터의 요청 개수를 지정하는데 이는 실제로 Publisher와 Subscriber는 각각 다른 스레드에서 비동기적으로 상호작용하는 경우가 대부분이기 때문이다. 이러한 경우 Publisher가 통지하는 속도가 Subscriber의 처리 속도보다 더 빠르다면 처리를 대기하는 데이터는 쌓이게 되고 결과적으로 시스템 부하가 커지게 된다. 따라서 이러한 문제를 방지하기 위해 Subscription.request를 통해 데이터 개수를 제어하는 것이다.

Publisher

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Publisher는 subscribe 메소드 하나만 구현하면 되는 수준으로 매우 단순하다. 해당 메소드는 파라미터로 전달받은 Subscriber를 등록하는 역할을 한다.

Subscriber

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public  void onError(Throwable t);
    public void onComplete();
}
  • onSubscribe: 구독 시작 시점에 Publisher에게 요청할 데이터의 개수를 정하거나 구독을 해지하는 역할을 한다.
  • onNext: Publisher가 통지한 데이터를 처리하는 역할을 한다.
  • onError: Publisher가 데이터를 통지하는 과정에서 에러가 발생했을 때 에러를 처리하는 역할을 한다.
  • onComplete: Publisher가 데이터 통지를 완료했음을 알릴 때 호출되는 메소드이다. 데이터 통지 완료 후 특정한 후처리를 하고싶다면 onComplete 메소드에 코드를 작성하면 된다.

Subscription

public interface Subscription {
    public void request(long n);
    public void cancel();
}

Subscription 인터페이스는 Subscriber가 구독한 데이터의 개수를 요청하거나 또는 데이터 요청의 취소, 즉 구독을 해지하는 역할을 한다.

위에서 설명한 동작 과정을 코드 관점에서 다시 설명하자면 다음과 같다.

  • Publisher가 Subscriber 인터페이스 구현 객체를 subscribe 메소드의 파라미터로 전달한다.
  • Publisher 내부에서는 전달받은 Subscriber 인터페이스 구현 객체의 onSubscribe 메소드를 호출하여 Subscriber의 구독을 의미하는 Subscription 인터페이스 구현 객체를 Subscriber에게 전달한다.
  • 호출된 Subscriber 인터페이스 구현 객체의 onSubscriber 메소드에서 전달받은 Subscription 객체를 통해 전달받을 데이터의 개수를 Publisher에게 요청한다.
  • Publisher는 Subscriber로부터 전달받은 요청 개수 만큼의 데이터를 onNext 메소드를 호출하여 Subscriber에게 전달한다.
  • Publisher는 통지할 데이터가 없을 경우 onComplete 메소드를 호출하여 Subscriber에게 데이터 처리 종료를 알린다.

Processor

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

Processor의 경우 별도로 구현해야 하는 메소드가 없다. 다른 인터페이스와 다른 점은 Subscriber 인터페이스와 Publisher 인터페이스를 상속한다는 것이다. 이는 Processor가 Publisher와 Subscriber의 기능을 모두 가지고 있기 때문이다.

리액티브 스트림즈의 구현 규칙

  1. Publisher가 Subscriber에게 보내는 onNext signal의 총 개수는 항상 해당 Subscriber의 구독을 통해 요청된 데이터의 총 개수보다 더 작거나 같아야 한다.
  2. Publisher는 요청된 것보다 적은 수의 onNext signal을 보내고 onComplete 또는 onError를 호출하여 구독을 종료할 수 있다.
  3. Publisher가 데이터 처리가 실패하면 onError signal을 보내야 한다.
  4. Publisher의 데이터 처리가 성공적으로 종료되면 onComplete signal을 보내야 한다.
  5. Publisher가 Subscriber에게 onError 또는 onComplete signal을 보내는 경우 해당 Subscriber의 구독은 취소된 것으로 간주되어야 한다.
  6. 일단 종료 상태 signal(onError, onComplete)을 받으면 더이상 signal이 발생되지 않아야 한다.
  7. 구독이 취소되면 Subscriber는 결국 signal 받는 것을 중지해야 한다.

Reference

스프링으로 시작하는 리액티브 프로그래밍

반응형