티스토리 뷰

현대 웹 서비스: 다양한 소스의 콘텐츠를 조합, 여러 웹 서비스에 접근해야 함

→ 즉 외부 서비스나 데이터베이스 결과를 기다리는 스레드를 블록함으로 연산 자원을 낭비하는 일은 피해야함

동시성을 구현하는 자바 지원의 진화

Runnable, thread → ExecutorService (스레드 실행과 태스크 submit을 분리) → Callable, Future (Runnable, Thread의 변형을 반환) → RecursiveTask (분할/정복 알고리즘의 포크/조인 구현을 지원) → 스트림/람다 지원에 기반한 병렬 프로세싱 → CompletableFuture 지원(분산 비동기 프로그래밍 지원) → Flow 인터페이스 (발행-구독 프로토콜)

  • 가능한한 동시에 실행할 수 있는 독립적인 태스크를 가능하게 만들면서 멀티코어 또는 여러 기기를 통해 제공되는 병렬성을 쉽게 이용

Executor와 스레드 풀

ExecutorService 는 태스크를 제출하고 나중에 결과를 수집할 수 있는 인터페이스 제공

ExecutorService newFixedThreadPool(int nThreads)

워커 스레드라 불리는 nThreads를 포함하여

스레드 풀에서 사용하지 않은 스레드로 제출된 태스크를 먼저 온 순서대로 실행

스레드 풀의 장점

  1. 하드웨어에 맞는 수의 태스크를 유지할 수 있음
  2. 동시에 수천 개의 테스크를 스레드 풀에 아무 오버헤드 없이 submit 가능
  3. 큐의 크기 조정, 거부 정책, 태스크 종류에 따른 우선순위 등 설정 가능

스레드 풀 주의 사항

  1. 불필요하게 sleep 되거나 I/O 기다리거나, 네트워크 연결 기다리는 태스크가 있다면 나머지 작업 스레드가 남은 태스크를 모두 처리해야 하므로 작업 효율성이 떨어짐
  2. 프로그램 종료 전에 모든 스레드 풀을 종료해야 함(Thread.setDeaemon 메서드 제공)

엄격한 포크 조인 / 여유로운 포크 조인

엄격한 포크 조인:

  • 태스크나 스레드가 메서드 호출 안에서 시작되면 그 메서드 호출은 반환하지 않고 작업이 끝나길 기다림
  • 스레드 생성 + join() 이 메서드 호출 내에 추가되는 형태

여유로운 포크 조인:

  • 메서드 호출에 의해 스레드가 생성되고 메서드가 반환된 이후에도 계속 실행
  • 메서드 반환된 이후에도 만들어진 태스크 실행이 계속되는 메서드를 비동기 메서드 라고 함
  • 주의 사항:
    • 메서드를 호출한 코드와 스레드 실행은 동시에 실행되므로, 데이터 경쟁 문제 일어날 수있음
    • 실행 중인 스레드가 종료되지 않은 상황에서 main 메서드가 반환될 경우

동기 API와 비동기 API

int y = f(x);
int z = g(x);
System.out.println(y+z);

f, g가 서로 독립된 함수라면 → f,g를 별도의 CPU 코어로 실행해서 f,g 중 오래 걸리는 작업의 시간으로 줄일 수 있음

즉 별도의 스레드로 f와 g를 실행하자

ExecutorService executorService = Executors.newFixedPool(2);
Future<Integer> y = executorService.submit(() -> f(x));
Future<Integer> z = executorService.submit(() -> g(x));
System.out.println(y.get() + z.get());

위와 같이 구현할 수 있지만, 더럽다! → CompletableFuture 이용하자, Flow 이용하자

대안은 2가지가 있다

Future 형식 API

Future<Integer> y = f(x);
Future<Integer> z = g(x);
System.out.println(y.get() + z.get()); //두 Future가 완료되어 결과가 합쳐지기를 기다림

리액티브 형식 API

콜백 형식의 프로그래밍을 이용

void f(int x, IntConsumer deaelWithResult);

return 문으로 결과를 반환하는 게 아니라 결과가 준비되면 이를 람다로 호출하는 태스크를 만듬

int x = 5;
Result result = new Result();

f(x, (int y) -> {
	result.left = y;
	System.out.println((result.left + result.right)):
};

g(x, (int y) -> {
	result.right = y;
	System.out.println((result.left + result.right)):
};

이 방식을 쓰면 사실 값을 2번 출력할 수도 있고 .. 호출 합계가 정확하게 출력이 되지 않을 수있음

  • 리액티브 API는 보통 한 결과가 아니라 일련의 이벤트에 반응하도록 설계되었으므로, Future 을 이용하는 것이 더 적절하다, 나중에 스트림으로 연결
  • Future API는 일회성의 값을 처리하는데 적합

블로킹 동작은 해로운 것으로 간주

스레드 풀에서 잠을 자는 태스크나 블로킹 되는 태스크는 다른 태스크가 시작되지 못하게 막으므로 자원을 소비

해결 방법 →

  • 태스크를 앞과뒤 두 부분으로 나누고 블록되지 않을 때만 자바가 스케줄링 하도록 요청
  • 태스크를 블록하는 것보다는 다른 작업을 태스크로 제출하고 현재 태스크는 종료한다 , I/O 작업의 경우에도 블록하지 않는 ‘읽기 시작’ 메서드를 호출하고 읽기 작업이 끝나면 이를 처리할 다음 태스크를 런타임 라이브러리에 스케줄하도록 요청하고 종료

비동기 API 에서 예외 처리

  • CompletableFuture: exceptionally()
  • Reactive: 예외 발생 시 콜백 생성, onError()

CompletableFuture와 콤비네이터를 이용한 동시성

ExecutorService executorService = Executors.newFixedThreaadPool(10);
int x = 1337;

CompletableFuture<Integer> a = new CompletableFuture<>();
CompletableFuture<Integer> b = new CompletableFuture<>();
//c작업은 a,b 작업이 끝날 때까지 thread에서 실행되지 않는다
CompletableFuture<Integer> c = a.thenCombine(b, (y,z)->(y+z)); 
executorService.submit(()->a.complete(f(x)));
executorService.submit(()->b.complete(g(x)));

System.out.println(c.get());
executorService.shutdown();

thenCombine 사용 시 → f , g가 끝난 다음에야 덧셈 계산이 실행, 병렬 실행의 효율성 높힐 수 있다

리액티브 프로그래밍

Future는 한 번만 실행해 결과를 제공

리액티브 프로그래밍은 여러 Future 같은 객체를 통해 여러 결과를 제공

구독자가 구독할 수 있는 발행자

이 연결을 구독이라고 함

이 연결을 이용해 메시지를 전송

interface Publisher<T> {
	void subscribe(Subscriber<? super T> subscriber);
}

interface Subscriber<T> {
	void onNext(T t);
	void onSubscribe(Subscription subscription); //Publisher <-> Subscriber 사이에 채널이 연결되면 첫 이벤트로 이 메서드가 호출됨
}

interface Subscription{  //이 객체 통해 요청
  void cancel();
  void request(long n);
 }

리액티브 시스템

런타임 환경이 변화에 대응하도록 전체 아키텍처가 설계된 프로그램 가리킴

  • 반응성: 질의의 응답을 지연하지 않고 실시간으로 입력에 반응
  • 회복성: 한 컴포넌트의 실패로 전체 시스템이 실패하지 않음
  • 탄력성: 시스템이 작업 부하에 맞게 적응하며 작업을 효율적으로 처리. 작업자 스레드 적절하게 재배치 등등

리액티브 시스템을 구현하는 방법 중 하나 → 리액티브 프로그래밍

리액티브 프로그래밍 인터페이ㅣ스 설계는 메시지 주도 속성을 반영,

처리할 입력을 기다리고 결과를 다른 컴포넌트로 보내면서 시스템이 반응

비동기 API 구현

CompletableFuture가 기존 Future 에 반해 가질 수 있는 이점

  • Future 집합이 실행하는 모든 태스크의 완료를 기다림
  • Future 집합에서 가장 빨리 완료되는 태스크를 기다렸다가 결과를 얻는다(⇒ 여러 태스크가 다양한 방식으로 같은 결과를 구하는 상황)
  • 프로그램적으로 Future을 완료시킴 (비동기 동작에 수동으로 결과 제공)
  • Future의 완료 동작에 반응 (결과를 기다리면서 블록되지 않고 결과가 준비되었다는 알림을 받은 다음에 Future의 결과로 원하는 추가 동작 실행 가능)
public Future<Double> getPriceAsync(String product){
 CompletableFuture<Double> futurePrice = new CompletableFuture<>();
 new Thread( () -> { 
		 try {
			 double price = calculatePrice(product); //calCulatePrice :: 처리 시간이 오래 걸리는 외부 api
			 futurePrice.complete(price);
		 } catch (Exception e) {
			 futurePrice.completeExceptionally(ex); //error 발생 시 발생 후 future 종료
		 }
}).start();
return futurePrice;
}
  • 실제 가격을 계산하는 다른 스레드를 만든 다음에 계산 결과를 기다리지 않고 결과를 포함할 Future 인스턴스를 바로 반환
Future<Double> futurePrice = getPriceAsync("product");
//다른 스레드가 제품 가격을 계산 하는 동안 .. 이 스레드는 다른 작업 수행
doSomethingElse();
try {
	  double price = futurePrice.get();//가격 정보가 있으면 Future에서 가격 정보를 읽고  
    //가격 정보가 없으면 가격 정보를 받을 때까지 block
    System.out.println(price);
 } catch (Exception e) {
  throw new RuntimeException(e);
  }

getPriceAsync 는 즉시 Future 반환, Future의 get 메서드 호출 .. → 결국 결과값을 받을 때까지 block 된다..

→ Future 작업이 끝났을 때만 이를 통지받으면서 람다 표현식이나 메서드 참조로 정의된 콜백 메서드를 실행하는 방법이 있음

다음과 같이 supplyAsync 로 CompleableFuture 만들 수 있음

다른 Executor 를 선택적으로 전달 가능

/**
Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the given executor with the value obtained
     * by calling the given Supplier.
**/
public Future<Double> getPriceAsync(String product){
  return CompletableFuture.supplyAsync(() -> calculatePrice(product));
  }

비블록 코드 만들기

public List<String> findPrices(String product){
	 List<CompletableFuture<String>> priceFutures =
		 shops.stream() //CompletableFuture 로 각각의 계산을 비동기적으로 계산
		 .map(shop -> CompletableFuture.supplyAsync(
					 () -> shop.getPrice(product)))
			.collect(Collectors.toList());
		
		return priceFutures.stream()
				.map(CompletableFuture::join)  //모든 비동기 동작이 끝나길 기다림
				.collect(toList());
		}
	

두 개의 스트림 파이프라인으로 처리한 이유:

스트림 연산은 게으른 특성이 있으므로 하나의 파이프라인으로 연산을 처리했다면 모든 가격 정보 요청 동작이 동기적, 순차적으로 이루어지게 된다, 기존 요청 작업이 완료되어야 join이 결과를 반환하면서 다음 상점으로 정보를 요청할 수있기 때문

두 개의 스트림 파이프라인으로 처리하면서 CompletableFuture를 리스트로 모은 다음 다른 작업과는 독립적으로 각자의 작업을 수행함

but.. 더 확장성이 좋은 방법은 없을까

커스텀 Executor 사용하기

스레드 풀 조정 공식

Nthrads = NCPU * UCPU * (1+W/C)

- NCPU 는 코어 수
- UCPU 는 0과 1 사이 값 갖는 CPU 사용 비율
- W/C는 대기 시간과 계산 시간의 비율

스레드 풀은 데몬 스레드 로 만듦 (t.setDaemon(true);)

자바에서 일반 스레드가 실행 중이면 프로그램은 종료되지 않음,

따라서 어떤 이벤트를 한없이 기다리면서 종료되지 않는 일반 스레드가 있다면 결국 자원이 고갈되서 문제가 생길 수 있다

데몬 스레드는 자바 프로그램이 종료될 때 강제로 실행이 종료될 수 있음

비동기 작업 파이프라인 만들기

비동기 작업과 동기 작업 조합

public List<String> findPrices(String product){
	List<CompletableFuture<String>> priceFutures =
		shop.stream()
		.map(shop -> CompletableFuture.supplyAsync(()->shop.getPrice(product), executor))
		//thenApply:: CompletableFuture 가 끝날때까지 block하지 않음, CompletableFuture
		//가 끝날 때 람다 표현식을 적용할 수 있음
		.map(future -> future.thenApply(Quote::parse))
		//첫번째 연산의 결과를 인수로 받고 CompletableFuture 로 반환
		.map(future -> future.thenCompose(quote -> 
						CompletableFuture.supplyAsync( () -> Discount.applyDiscount(quote, executor)))
		.collect(toList());
		
		
	return priceFutures.stream().map(CompletableFuture::join).collect(toList());

Async 로 끝나지 않는 메서드 : 이전 작업을 수행한 스레드와 같은 스레드에서 작업 실행

Async 로 끝나는 메서드: 다음 작업이 다른 스레드에서 실행되도록 스레드 풀에 작업 제출

그러나 위 코드는 두 번째 CompleableFuture의 결과가 첫 번째 CompletableFuture의 결과에 의존하므로 최종 실행시간에는 영향 미치지 X, 스레드 전환 오버헤드가 적게 발생하면서 효율성이 좋은 thenCompose 활용

thenApply vs thenCompose

public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

public <U> CompletableFuture<U> thenCompose(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(null, fn);
    }
    
 // synchronous mapping function.
 
 CompletableFuture<Integer> future = 
    CompletableFuture.supplyAsync(() -> 1)
                     .thenApply(x -> x+1);
 
 
 // thenCompose is used if you have an asynchronous mapping function 
 // (i.e. one that returns a CompletableFuture). 
 //It will then return a future with the result directly, 
 // rather than a nested future.
 CompletableFuture<Integer> future = 
    CompletableFuture.supplyAsync(() -> 1)
                     .thenCompose(x -> CompletableFuture.supplyAsync(() -> x+1));

thenApply: 동기 연산

thenCompose: 비동기 연산

독립적인 두 개의 CompletableFuture 합치기

Future<Double> futurePriceInUSD = 
//1st task 요청
	 CompletableFuture.supplyAsync(()-> shop.getPrice(product))
	 
	 .thenCombine(
		 CompletableFuture.supplyAsync(
		 //환율 요청하는 2nd task 요청
				 () -> exchangeService.getRate(Money.EUR, Money.USD))
					 //exchangeService가 1초 안에 결과 제공하지 않으면 default value 사용
				 .completeOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS),
				 //thenCombineAsync 일 경우 별도의 스레드에서 실행 ..
				 (price, rate) -> price * rate 
			));

getPrice : Executor 스레드 1에서 실행

`getRate:`` Executor 스레드 2에서 실행

결과 합쳐서 join …

CompletableFuture의 종료에 대응하는 방법

각상점에서 가격 정보를 제공할 때마다 즉시 보여주는 방법..

전 findPrices 코드는 전체 상점이 가격을 다 리턴해야 보여줌 ..

//각 pipeline 이 one executor thread 에서 실행 ..

public Stream<CompletableFuture<String>> findPricesStream(String product){
	return shops.stream()
		.map(shop -> CompletableFuture.supplyAsync( ()-> shop.getPrice(product), executor))
		.map(future -> future.thenApply(Quote::parse))
		.map(future -> future.thenCompose(quote -> 
					CompletableFuture.supplyAsync( ()-> Discount.applyDiscount(quote), executor)));

thenAccept : Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied action.

public CompletionStage<Void> thenAccept(Consumer<? super T> action);

CompletableFuture[] futures = findPricesStream("myphone")
																.map(f -> f.thenAccept(System.out::println))
																.toArray(size -> new CompletableFuture[size]);
																
//모든 completablefuture 가 완료되어야 CompletableFuture<Void> 반환
CompletableFuture.allOf(futures).join();

allOf 모든 CompletableFuture가 완료되어야 CompletableFuture<Void> 가 완료됨

Wrap-up

단일 값 → CompletableFuture

일련의 sequential 한 값 → Flow API

'공부 > Java' 카테고리의 다른 글

Netty 기초  (0) 2025.04.22
카프카 내부 매커니즘 - 1  (1) 2024.11.03
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/07   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30 31
글 보관함