Future 를 개선한 CompletableFuture

자바 8 버전

자바 5 버전에서 등장한 Future 는 혁신적인 기능들을 갖고 왔지만, 그럼에도 불구하고 실용적인 부분에서 개선해야 할 여지가 보였다.

자바 8 에서는 위 Future 가 갖고 있는 단점을 보완한 새로운 동시성 모델이 등장하였다.

CompletableFuture


Future 의 단점을 보완한 버전으로 Future 가 갖고 있는 속성을 그대로 구현하고 있으면서 추가적으로 CompletionStage 라는 인터페이스를 통해 비동기 연산의 결과를 연결할 수 있도록 구성 되어있다.

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
...
}
  • Future: 비동기 작업의 미래에 완료될 결과를 나타내는 인터페이스

  • CompletionStage: 비동기 작업의 결과를 연결할 수 있도록 구성하는 인터페이스

Future vs CompletableFuture

구분
Future(Java5)
CompletableFuture(Java8)

동작 방식

Blocking(get 호출 시 대기)

Non-blocking(Callback 등록 가능)

조합/연결

어려움 (get으로 꺼내서 다시 넣어야 함)

thenApply(), thenCompose() 등으로 파이프라인 구성 가능

예외 처리

try-catch 강제

exceptionally(), handle()을 통한 예외 처리

수동 완료

불가능

complete() 메서드로 강제 완료 가능

비동기 작업 실행


반환 값이 없는 비동기 실행 메서드 - runAsync()

CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
    System.out.println("Hello Run Async " + Thread.currentThread().getName());
});

future1.get();

반환 값이 존재하는 비동기 실행 메서드 - supplyAsync()

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello Supply Async " + Thread.currentThread().getName());
    return "Hello";
});
String futureResult = future2.get();
System.out.println("futureResult = " + futureResult);

실행 결과

Hello Run Async ForkJoinPool.commonPool-worker-1
Hello Supply Async ForkJoinPool.commonPool-worker-1
futureResult = Hello
  • 반환 값 없는 경우 : runAsync()

  • 반환 값 있는 경우 : supplyAsync()

스레드 풀은 기본적으로 ForkJoinPoolcommonPool() 에서 얻어서 스레드를 획득함, 스레드 풀을 직접 제어하고 싶다면 메서드 시그니처에 Executor 를 추가하면 해당 스레드 풀에서 스레드를 획득한다.

사용자 정의 스레드 풀 사용 예시
ExecutorService executorService = Executors.newFixedThreadPool(5);

CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
    System.out.println("Hello Run Async " + Thread.currentThread().getName());
},  executorService);

future1.get();

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello Supply Async " + Thread.currentThread().getName());
    return "Hello";
}, executorService);
String futureResult = future2.get();
System.out.println("futureResult = " + futureResult);
Hello Run Async pool-1-thread-1
Hello Supply Async pool-1-thread-2
futureResult = Hello

CompletableFuture 는 왜 ForkJoinPool 을 사용할까?

결과 값을 반환하는 메서드의 차이(get vs join)


get()

CompletableFuture 객체가 get() 메서드를 호출하면, 스레드의 작업 결과 값이 반환되기 까지 기다린다. 이 상황에서 스레드는 Blocking 되어 다른 스레드가 점유할 수 없으며, 만약 실행 도중 예외가 발생하면 CheckedException 예외를 발생 시킨다.

get() 메서드는 Future 인터페이스와 호환 되는 다른 클래스 모두 구현하여 사용중이고, CompletableFuture 가 등장할 당시 하위 버전에서도 동일한 기능을 제공하기 위해 해당 메서드를 그대로 구현하여 남겨두었다.

join()

CompletableFuture 객체가 join() 메서드를 호출하면, get() 메서드와 실행 방식은 동일하게 스레드가 작업의 결과를 반환 받기 위해 블락킹 상태에 있지만, 실행 도중 예외가 발생할 경우 UncheckedException 이 발생한다.

join() 메서드는 CompletableFuture 클래스에만 존재하며, 만약 다른 Future 인터페이스를 사용한다면 get() 메서드를 통해 호환을 보장해야한다. 하지만, CompletableFuture 만 사용한다면 join() 메서드를 사용하는 것을 권장한다.

get()

Checked Exception

(ExecutionException, InterruptedException)

인터럽트 발생 시 즉시 예외(InterruptedException)를 던짐. 반드시 try-catch 필요

일반적인 스레드 제어나 Future 인터페이스 호환성이 필요할 때

join()

Unchecked Exception

(CompletionException)

인터럽트 발생 시 Checked 예외를 던지지 않고 CompletionException으로 감쌈

Stream API나 람다식 내부에서 사용하여 코드를 간결하게 만들 때

작업 콜백


아래 메서드는 모두 이전 단계의 결과 값을 함수형 인터페이스 인수로 사용한다.

  • thenApply: 이전 결과 값을 활용하여 새로운 연산을 통해 값을 반환한다.

  • thenAccept: 이전 결과 값을 활용하여 새로운 연산을 하되 값을 반환하지 못한다.

  • thenRun: 이전 결과 값을 인수로 활용하지 않고, 새로운 작업을 실행 한다.

/**
 * 인자로 받은 Function을 사용하여 다음 연산 처리
 * Function의 반환 값을 가지고 있는 CompletableFuture<U> 반환
 */
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

예시

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello Supply Async " + Thread.currentThread().getName());
    return "Hello";
}).thenApply(
    result -> result + " thenApply - " + Thread.currentThread().getName()
);
String futureResult = future2.get();
System.out.println("futureResult = " + futureResult);
futureResult = Hello thenApply - main
메서드
사용 목적
예시

thenApply

이전 결과를 활용하여 반환 타입이 있는 로직을 호출할 때

이전 결과 값을 활용하여 새로운 객체를 만들 때

thenAccept

이전 결과를 활용하여 반환 타입이 없는 로직을 호출할 때

이전 작업의 결과 값과 함께 로그를 남길 때

thenRun

이전 결과의 반환 값을 사용하지 않고 다른 작업을 수행해야 할 때

비동기 작업 블럭에서 공유되는 자원을 활용하여 thenRun 메서드로 로그를 남길 때

작업 콜백의 비동기 메서드와 동기 메서드의 차이

thenApplyAsync, theAcceptAsync, thenRunAsync 와 같이 작업 콜백 메서드를 비동기로 지원하는 메서드도 같이 제공한다.

이 때 동기 메서드와 비동기 메서드의 차이는 후속 작업을 어떤 스레드에서 실행할 것인가에서 나뉜다.

  • 동기:이전 작업을 수행한 스레드를 그대로 재사용하려고 시도 -> 단, 너무 빨리 끝날 경우 호출한 스레드에서 실행

  • 비동기: 무조건 스레드 풀에 새로운 작업을 제출하여 실행함

비동기 작업 조합


작업간의 의존 관계가 존재할 때 thenCompose()

thenCompose 는 두 개의 Future를 순차적으로 연결하여, 이전 작업의 결과를 다음 작업 안에서 사용할 수 있다.

CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
    return "Hello";
});

CompletableFuture<String> result = hello.thenCompose(x -> CompletableFuture.supplyAsync(() -> {
    return x + " " + Thread.currentThread().getName() + "- thenCompose";
}));
System.out.println(result.get());
Hello ForkJoinPool.commonPool-worker-1- thenCompose

두 작업의 처리 결과를 결합 하고 싶은 경우 thenCombine()

thenCombine 은 두 개의 독립적인 Future를 처리하고 두 결과를 결합하여 추가적인 작업을 수행할 수 있습니다.

CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
    return "Hello " + Thread.currentThread().getName();
});

CompletableFuture<String> thread = CompletableFuture.supplyAsync(() -> {
    return "Thread " + Thread.currentThread().getName();
});

CompletableFuture<String> result = hello.thenCombine(thread, (h, t) -> h + "/" + t);
System.out.println(result.get());
Hello ForkJoinPool.commonPool-worker-1/Thread ForkJoinPool.commonPool-worker-1

예외 처리


handle()

작업 결과와 예외를 매개변수로 받아 에러 상황과 정상 작업 종료 상황을 모두 처리할 수 있다.

private static CompletableFuture<String> sample(boolean isThrow) {
    return CompletableFuture.supplyAsync(() -> {
        if (isThrow) {
            throw new IllegalArgumentException("멀티 스레드 동작 과정에서 에러 발생");
        }

        return "Thread sample method " + Thread.currentThread().getName();
    });
}
CompletableFuture<String> future = sample(false).handle(
        (result, error) -> error == null ? result : error.getMessage());

System.out.println("future = " + future.get());
future = Thread sample method ForkJoinPool.commonPool-worker-1

exceptionally()

작업의 예외만 매개변수로 받아 예외 상황을 처리할 수 있다.

CompletableFuture<String> future = sample(false).exceptionally(
        Throwable::getMessage);

System.out.println("future = " + future.get());
future = Thread sample method ForkJoinPool.commonPool-worker-1

병렬 처리


모든 스레드의 작업을 병렬로 실행 시킬 때 allOf()

allOf 메서드는 인자를 var-arg 로 받아 병렬로 실행 하여 작업이 모두 완료될 때 까지 대기한 뒤 모든 작업이 끝난 경우 CompletableFuture<Void> 타입을 반환한다.

var-arg 타입으로 인자를 받기 때문에 컬렉션을 사용할 수 없으며 이를 위해 Future 객체를 손수 인자로 넘기거나, 컬렉션을 사용하기 위해 우회 하는 코드를 작성해야 한다.

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
}

병렬로 수행하는 작업 중 가장 빨리 끝나는 작업을 얻고 싶을 때 anyOf()

anyOf() 메서드는 allOf() 와 동일하게 메서드 시그니처를 구성하고 있으며, 컬렉션을 사용할 수 없고 우회하는 방식 또한 같다.

allOf() 는 병렬로 모든 작업을 수행 시킨 뒤 추후 콜백을 통해 값을 획득할 수 있었지만, anyOf() 는 그와 달리 가장 빨리 끝나는 작업의 값을 반환한다.

private static CompletableFuture<String> sample(boolean isThrow) {
    return CompletableFuture.supplyAsync(() -> {
        if (isThrow) {
            throw new IllegalArgumentException("멀티 스레드 동작 과정에서 에러 발생");
        }

        return "Thread sample method " + Thread.currentThread().getName();
    });
}


CompletableFuture<String> future1 = sample(true).exceptionally(
        Throwable::getMessage);

CompletableFuture<String> future2 = sample(false).exceptionally(
        Throwable::getMessage);


List<CompletableFuture<String>> futures = List.of(future1, future2);

CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(future2, future1);
System.out.println(objectCompletableFuture.get());
Thread sample method ForkJoinPool.commonPool-worker-1
참고 자료

Last updated