본문 바로가기
WebFlux

1. 리액티브 시스템과 리액티브 프로그래밍 & 2. 리액티브 스트림즈

by 대우니 2024. 3. 31.
728x90
반응형

개요

스프링으로 시작하는 리액티브 프로그래밍 책을 보고 정리하고자 해당 포스팅을 작성하고 있다.목차 별로 중요하다고 생각되는 부분, 개발할 때 기억해 두면 좋을 부분을 추려보고자 한다.

리액티브 시스템이란?

비동기 메시지 기반 통신으로, 클라이언트 요청에 즉각적으로 응답하여 지연 시간을 최소화하는 시스템.

설계원칙

빠른 응답성을 바탕으로 유지보수와 확장이 용이한 시스템

Means

주요 통신 수단으로 무엇을 사용할건지 표현

  • 비동기 메시지 기반의 통신을 통해 구성요소 간의 느슨한 결합, 격리성, 위치 투명성 보장해야 함.
  • 스레드가 요청에 대해 1:1로 매핑되지 않는다.

Form

비동기 메시지 통신 기반 하에 탄력성과 회복성을 가지는 시스템이어야 함.

  • 탄력성: 시스템 자원을 적절히 추가하거나 감소시켜 작업량의 변화에 대응하여 응답성을 일정하게 유지하는 것
  • 회복성: 시스템에 장애가 발생하더라도 응답성을 유지하는 것
  • 느슨하게 결합되어있기 때문에 독립적으로 분리되어 있어, 장애가 발생하더라도 전체 시스템에 장애를 발생시키지 않을 수 있음.

Value

비동기 메시지 통신을 바탕으로, 회복성, 확장성, 탄력성을 확보해 즉각적으로 응답 가능한 시스템을 구축할 수 있음을 의미

 

선언형, 명령형 프로그래밍 차이

명령형 프로그래밍

public void sum(List<Integer> nums) {
	for(int num: nums) {
    	if(num > 6 && num % 2 != 0) {
        	sum += num;
        }
    }
    System.out.println("합계: " + sum);
}

선언형 프로그래밍

public void sum(List<Integer> nums) {
	int sum = nums.stream()
    			  .filter(num -> num > 6 && num % 2 != 0)
        		  .mapToInt(num -> num)
                  .sum();

    System.out.println("합계: " + sum);
}​
  • 선언형은 동작을 구체적으로 명시하지 않고 목표만 선언한다.
  • 코드가 간결해지고 가독성도 좋아진다.
  • 명령형: 내가 직접 식당에 있는 정수기로 걸어가서 차가운 물을 컵에 따르는 것
  • 선언형: 여기 차가운 물 주세요 라고 종업원에게 부탁하는 것

리액티브 프로그래밍 코드 구성

Publisher

입력으로 들어오는 데이터를 제공하는 역할

Subscriber

Publisher가 제공한 데이터를 전달받아서 사용하는 주체

Data Source

Publisher의 입력으로 전달되는 데이터

Operator

publisher와 subscriber 사이의 가공 처리 담당

리액티브 스트림즈란?

데이터 스트림을 Non-Blocking이면서 비동기 적인 방식으로 처리하기 위한 리액티브 라이브러리의 표준 사양

리액티브 스트림즈 구성요소

  1. 먼저 Subscriber는 전달받을 데이터를 구독한다.(Publisher 인터페이스 내 subscribe)
  2. 다음으로 Publisher는 데이터를 방출(emit)할 준비가 되었음을 Subscriber에게 알린다.
    (Subscriber 인터페이스 내 onSubscribe)
  3. Publisher가 데이터를 방출(emit)할 준비가 되었다는 알림을 받은 Subscriber는
    전달받기를 원하는 데이터의 개수를 Publisher에게 요청한다.
    (Subscription 인터페이스 내 request)
  4. 다음으로 Publisher는 Subscriber로부터 요청받은 만큼의 데이터를 방출한다.
    (Subscriber 인터페이스 내 onNext)
  5. 이렇게 Publisher와 Subscriber 간에 데이터 방출, 데이터 수신, 데이터 요청의 과정을 반복하다가
    Publisher가 모든 데이터를 방출하게 되면 마지막으로 데이터 전송이 완료되었음을 Subscriber에게 알린다.
    (Subscriber 인터페이스 내 onComplete)
    만약 Publisher가 데이터를 처리하는 과정에서 에러가 발생하면 에러가 발생했음을 Subscriber에게 알린다.
    (Subscriber 인터페이스 내 onError)

onComplete와 onError는 후처리 함수이다.

onComplete는 완료되었을 경우 DB 커넥션 종료 등을 처리하고,

onError는 에러가 발생했을 때 해당 에러를 처리하는 코드를 작성하면 된다.

 

리액티브 스트림즈 관련 용어 정리

Signal

Publisher와 Subscriber간에 주고 받는 상호작용

Subscriber 인터페이스 내에 정의된 메서드들을 실제 호출해서 사용하는 주체는

Publisher이므로 Publisher가 Subscriber에게 보내는 Signal

Demand

Subscriber가 Publisher에게 요청하는 데이터

Emit

Publisher가 데이터를 전달하는 것, onNext Signal을 줄여서 데이터를 emit한다고 표현함.

Upstream/Downstream

데이터 스트림 관점으로 볼 때, operator 체인으로 이루어진 경우 같은 객체를 반환하는 것인데,

앞 순서 체인이 반환하는 데이터 스트림은 upstream, 뒷 순서 체인이 반환하는 데이터 스트림은 downstream이라 부른다.

public void twoTimes() {
    // A기준으로 B downstream
    // B기준으로 A upstream
    // sequence
	Flux.just(1,2,3,4,5,6)
    	.filter(n -> n % 2 == 0) // A
        .map(n -> n * 2)		 // B
        .subscribe(System.out::println);
}

Sequence

operator 체인 형태로 정의된다.

 

리액티브 스트림즈 구현  규칙

인터페이스에 정의된 메소드를 보면서 의문이 들었던 것들만 적어본다.

Publisher 구현을 위한 기본 규칙

  1. Publisher가 Subscriber에게 보내는 onNext signal의 총 개수는 항상 해당 Subscriber의 구독을 통해 요청된 데이터의 총 개수보다 작거나 같아야 한다.
  2. 일단 종료 상태 signal(onError, onComplete)을 받으면 더 이상 signal이 발생되지 않아야 한다.
    • 해당 시그널은 publisher와 subscriber 간에 상호작용이 끝났음을 알리는 것이다.
  3. 구독이 취소되면 Subscriber는 결국 signal을 받는 것을 중지해야한다.

Subscriber 구현을 위한 기본 규칙

  1. Subscriber는 Publisher로부터 onNext signal을 수신하기 위해 Subscription.request(n) 을 통해 demand signal을 publisher에게 보내야 한다.
  2. onComplete 및 onError는 signal을 수신한 후 구독이 취소된 것으로 여겨져야 한다.
  3. onSubscribe는 지정된 Subscriber에 대해 최대 한번만 호출되어야 한다.

Subscription 구현을 위한 기본 규칙

  1. 구독은 Subscriber가 onNext 혹은 onSubscribe 내에서 동기적으로 Subscription.request를 호출하도록 허용해야 한다.
    • 아래 코드를 확인해보면, onNext는 request를 호출하고, request는 onNext를 호출하고 있어서 동기적으로 호출되지 않을 경우, 상호 재귀로 인해 스택 오버플로우가 발생할 수 있다.
    • 따라서 이를 피하기 위해 동기적으로 호출되도록 허용해야한다.
public class ForPractice {

    @Test
    void test() {
        Iterable<Integer> iter = List.of(1, 2, 3, 4, 5);
        Publisher pub = new Publisher() {
            @Override
            public void subscribe(Subscriber sub) {
                Iterator<Integer> iterator = iter.iterator();
                // 해당 요청이 오면 어떻게 동작할건지 정의되어 있음 ..
                // 이 동작은 pub가 정의하는게 맞을듯 .. sub가 이 요청을 하면 나는 이렇게 해야지
                // 그리고 그걸 sub에게 건내서 sub가 pub에게 요청을 보내고 싶은 경우에 사용함 
                // pub-sub 사이에 매개체
                Subscription subscription = new Subscription() {

                    @Override
                    public void request(long n) {
                        System.out.println("[request] ");
                        if (iterator.hasNext()) {
                            sub.onNext(iterator.next());
                        } else {
                            sub.onComplete();
                        }

                    }

                    @Override
                    public void cancel() {

                    }
                };
                sub.onSubscribe(subscription);
            }
        };

        Subscriber<Integer> sub = new Subscriber<>() {
            Subscription subscription;
            int REQUEST_NUM = 1;

            @Override
            public void onSubscribe(Subscription subscription) {
                System.out.println("[sub][onSubscribe] ");
                this.subscription = subscription;
                this.subscription.request(REQUEST_NUM); // 요청준비가 되었어?! 그럼 이 만큼 메시지 줘!
            }

            @Override
            public void onNext(Integer i) {
                System.out.println("[sub][onNext] " + i);
                subscription.request(REQUEST_NUM);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {
                System.out.println("[sub][onComplete]");
            }
        };

        pub.subscribe(sub);
    }
}

https://kkang-joo.tistory.com/55

  1.  구독은 무제한 수의 request 호출을 지원해야 하고, 최대 2^63 - 1 개의 Demand를 지원해야 한다.(한번에 가져갈 수 있는 양)
  2. cancel, request 등의 호출에 대한 응답으로 예외를 던지는 것을 허용하지 않는다.

 

 

 

반응형