배움 __IL/addtionalBackEnd

StreamAPI 의 실행순서와 병렬처리

Mo_bi!e 2025. 2. 7. 22:31

I. Stream API의 이해

1. 관점 변화

  • 명령형 프로그래밍 (C, JAVA)
    • What을 할 것인지 나타내기 보다 How할 건지를 설명하는 방식이다
  • 선언형 프로그래밍
    • How를 나타내기보다 What을 할건지 설명하는 방식

2. Stream API의 이해

  • JAVA는 OOP이므로 함수형 프로그래밍 불가
    • JDK8 부터 Stream API 로 데이터 추상화하고 처리하는데 자주 사용되는 함수 정리
    • 데이터를 추상화 했다는 것은 데이터의 종류와 관계없이 같은 방식으로 처리
    • 재사용성 높일 수 있음
  • 특징
    1. 원본 데이터 변경 X
      • 원본 데이터 조회 후 Stream을 생성함
    2. Stream 은 일회용
      • 한번 사용 끝나면 Stream을 재생성해야함
      • 닫힌 Stream은 IllegalStateException 발생
    3. 내부 반복으로 작업 처리
      • 간결해 지는 이유임
      • 반복 문법을 메소드 내부에 숨기고 있음
      • // 반복문이 forEach라는 함수 내부에 숨겨져 있다. nameStream.forEach(System.out::println);

II. 실행순서

1.  필요성

Stream API를 정확히 알고 사용하지 못하면 처리 속도의 지연을 야기한다.

그러므로 Stream API코드가 어떻게 동작할 것인지 정획히 이해하는것이 중요하다.

Stream.of("a", "b", "c", "d", "e")
        .filter(s -> {
            System.out.println("filter: " + s);
            return true;
        })
        .forEach(s -> System.out.println("forEach: " + s));

다음 코드가 있는 경우 실행결과는 어떻게 될까?

/*
filter: a
forEach: a
filter: b
forEach: b
filter: c
forEach: c
filter: d
forEach: d
filter: e
forEach: e
*/

 

 

모든 데이터에 대해 filter가 진행되고, forEach가 실행되는 수평적 구조로 순회하는것이 아니고,

각각의 데이터에 대해 filter와 forEach가 먼저 수직적인 구조로 순회하는 것임

이러한 방식을 'Lazy Evaluation' 이라고도 한다.

2.  Lazy Evaluation

stream 은 '필요할 때 만 연산'하는 구조 (Lazy Evaluation : 수직적 실행구조)이다.

 

좀 더 직관적인 이해 방식

Stream.of("a", "b", "c", "d", "e")
        .map(s -> {
            System.out.println("map: " + s);
            return s.toUpperCase();
        })
        .anyMatch(s -> {
            System.out.println("anyMatch: " + s);
            return s.startsWith("A");
        });

위 코드를 실행한다면

map: a
anyMatch: A

모든 데이터를 map()하고나서 anyMatch실행되는 것이 아님

첫 번째 데이터에서 anyMacth() 조건 충족되면 바로 종료 됨

 

예상으로는 위 코드가 map 5번 + anyMatch 1 번 = 총 6번 실행 될 것이다.

하지만 수직적인 구조로 코드를 실행하면 map 1번 + anyMactch 1번 = 총 2번 실행 될 것이다.

 

이러한 방식으로 실행되는 연산의 수를 줄일 수 있다.

 

3.  성능개선

앞서 언급한 Stream API는 Lazy Evaluation(지연 연산) 방식이므로 연산 배치순서에 따라

불필요한 연산의 최소화가 가능하다.

(1) 필터링을 먼저

1)

filter()를 앞에 배치함으로서, map() sorted(), distinct() 등의 연산의 횟수를 줄일 수있다.

Stream.of("apple", "banana", "avocado", "blueberry", "cherry")
        .map(String::toUpperCase) // 모든 데이터를 변환한 후 필터링 → 불필요한 연산 발생
        .filter(s -> s.startsWith("A"))
        .forEach(System.out::println);

 

모두 출력을 하면 map에서 모든 요소들을 대문자로 변환 하면서 불필요한 연산을 발생시킨다.

APPLE (X)
BANANA (X)
AVOCADO (O) → 최종 결과로 남음
BLUEBERRY (X)
CHERRY (X)

 

2) 

Stream.of("apple", "banana", "avocado", "blueberry", "cherry")
        .filter(s -> s.startsWith("a")) // 필요한 데이터만 선별
        .map(String::toUpperCase) // 변환 연산 최소화
        .forEach(System.out::println);
APPLE
AVOCADO

이런경우 차라리 다음과 같이 filter를 먼저 배치하면 연산횟수를 감소 시킬 수있음

 

(2) 대량 연산이 필요한 작업은 나중에

1) sorted()

sorted 는 마지막에 배치하자

전체 데이터를 정렬하므로 비용이 크다.

Stream.of("banana", "apple", "cherry", "avocado", "blueberry")
        .sorted() // 먼저 정렬 → 불필요한 연산 발생
        .filter(s -> s.startsWith("a"))
        .forEach(System.out::println);

 

sorted() 먼저 실행되면서 필요하지 않는 데이터까지 정렬이 됨

그 후 filter()가 실행되어 정렬 연산 낭비

Stream.of("banana", "apple", "cherry", "avocado", "blueberry")
        .filter(s -> s.startsWith("a")) // 필요한 데이터만 먼저 필터링
        .sorted() // 필터링된 데이터만 정렬 → 연산 감소
        .forEach(System.out::println);

 

2) distinct()는 filter() 이후

Stream.of("apple", "banana", "apple", "avocado", "banana")
        .distinct() // 먼저 중복 제거 → 불필요한 연산 발생
        .filter(s -> s.startsWith("a"))
        .forEach(System.out::println);

마찬가지로 불필요한 연산임

Stream.of("apple", "banana", "apple", "avocado", "banana")
        .filter(s -> s.startsWith("a")) // 먼저 필요한 데이터만 필터링
        .distinct() // 중복 제거 대상이 줄어들어 성능 개선
        .forEach(System.out::println);

순서를 조정하여 변경 가능함

(3) 최적의 연산순서

filter() → limit() → map() → distinct() → sorted() → forEach()

 

이 순서대로 하는것을 권장함

 

1) filter() → 불필요한 데이터 제거 (연산량 최소화)
2) limit() → 필요한 개수만 유지 (연산 단축)
3) map() → 데이터 변환 (필요한 데이터만 변환)
4) distinct() → 중복 제거 (최소한의 데이터만 처리)
5) sorted() → 필수 데이터만 정렬 (연산 부담 최소화)
6) forEach() → 최종 실행 (이전까지 모든 연산을 최적화한 상태)

 

 

모범사례

List<User> users = fetchUsersFromDatabase(); // DB에서 유저 목록 가져오기

users.stream()
    .filter(user -> user.getAge() >= 20) // 먼저 필터링하여 불필요한 데이터 제거
    .limit(100) // 필요한 데이터 개수만 유지
    .map(User::getName) // 필터링된 데이터만 변환
    .distinct() // 최소한의 데이터에서 중복 제거
    .sorted() // 최종적으로 정렬
    .forEach(System.out::println);

다음과 같이 한다면 최적화 되게 할 수있음

 

III. 병렬처리

1. 필요성

Stream API 의 장점 중 하나가 '병렬처리'이다. 다만 병렬스트림(parallelStream())을 사용한다고 성능이 무조건 좋아지는 것은 아니다.

병렬처리가 필요한 경우는 '대량의 데이터 처리', 'CPU 멀티코어를 활용하여 처리 속도 향상' 하는 경우이다.

다만 처리량 적거나 순서가 중요한경우, 공유 자원을 수정해야하는 경우에는 권장되지 않는다.

 

2. 기본 개념

List<Integer> numbers = IntStream.rangeClosed(1, 10).boxed().collect(Collectors.toList());

System.out.println("일반 스트림 실행:");
numbers.stream()
        .forEach(num -> System.out.println(Thread.currentThread().getName() + " - " + num));

System.out.println("\n병렬 스트림 실행:");
numbers.parallelStream()
        .forEach(num -> System.out.println(Thread.currentThread().getName() + " - " + num));

위 개념에서 결과는 다음과 같다

일반 스트림 실행:
main - 1
main - 2
main - 3
main - 4
main - 5
main - 6
main - 7
main - 8
main - 9
main - 10

병렬 스트림 실행:
main - 7
main - 6
main - 9
main - 10
ForkJoinPool.commonPool-worker-2 - 8
ForkJoinPool.commonPool-worker-1 - 3
ForkJoinPool.commonPool-worker-3 - 1
ForkJoinPool.commonPool-worker-1 - 4
ForkJoinPool.commonPool-worker-2 - 5
main - 2

- 일반 스트림(Stream())

단일 스레드에서 순차적으로 실행된다.

이 경우 모든 연산은 main 쓰레드에서 실행이 된다.

 

- 병렬 스트림(parallelStream())

내부적으로 여러 개의 쓰레드를 사용하여 작업을 분할 처리한다.

이 경우 순차적이지는 않다. (즉 데이터 순서가 보장되지 않는다.)

여러개의 ForkJoinPool 쓰레드에서 실행이 된다.

 

- 여기서 ForkJoinPool이란

병렬 처리를 효율적으로 수행하기 위한 쓰레드 풀(Tgread Pool)이다

이는 'parallelStream()' 이 내부적으로 사용하는 병렬 실행 엔진이다.

작업을 쪼개고(Fork), 다시 합치는(Join) 방식으로 동작한다.

 

이 값은 다음과 같은 JVM의 매개변수를 통해 별도로 설정해줄 수 있다.

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

 

3. 주의 사항

- 순서보장, Thread-Safety 문제, 항상 성능을 향상시키는 것은 아님

(1) 순서보장

1) 문제

List<String> names = List.of("Alice", "Bob", "Charlie", "David");

names.parallelStream()
        .forEach(System.out::println);  // 실행할 때마다 출력 순서가 다름

parallelStream()을 foreach() 출력하면 매번 순서가 다름

David
Alice
Charlie
Bob

매번 다르게 볼 수있음

 

2) 해결방법

names.parallelStream()
        .forEachOrdered(System.out::println);

forEachOrdered 는 

내부적으로 데이터의 순서를 유지하며 처리하도록 추가적인 정렬 연산(별도 버퍼를 활용)을 수행함

(2) Thread-Safety 문제

1) 문제

병렬 스트림 내부에서 공유 자원을 수정하면 Race Condtion 발생 가능

List<Integer> list = new ArrayList<>();

IntStream.range(1, 1000).parallel().forEach(list::add); // 문제 발생 가능!
System.out.println(list.size()); // 예상보다 작은 크기가 나올 수 있음

매번 list.size() 가 다르게 나옴

 

여러 개의 스레드가 동시에 add()를 실행하면서 Race Condition 발생

 

2) 해결

List<Integer> list = IntStream.range(1, 1000)
    .parallel()
    .boxed() //기본형 스트림을 Wrapper class로 변환
    .collect(Collectors.toList()); // 안전한 리스트 생성
System.out.println(list.size());

Thread-Safe 한 데이터 구조 사용이다.

 

주요 방법으로는 공유 자원을 직접 수정하지 않는 것이 원칙이다.

즉 공유자원인 ArrayList를 직접 수정하는것은 Race Condition(데이터 경쟁) 발생 시킬 수 있음

Collectors는 이를 방지하기 위해 내부적으로 안전한 방식으로 요소를 모음

 

(3) 병렬 스트림이 항상 성능을 향상 시키는것이 아니다.

1) 문제

List<String> names = List.of("Alice", "Bob", "Charlie", "David", "Eve");

long start = System.nanoTime();
names.stream().map(String::toUpperCase).forEach(System.out::println);
long end = System.nanoTime();
System.out.println("일반 스트림 실행 시간: " + (end - start) + " ns");

start = System.nanoTime();
names.parallelStream().map(String::toUpperCase).forEach(System.out::println);
end = System.nanoTime();
System.out.println("병렬 스트림 실행 시간: " + (end - start) + " ns");

다음과 같이 데이터 갯수가 5개에 불과하면 병렬 스트림이 오히려 더 느릴 수있음

일반 스트림의 평균 실행시간은 실행 결과 950 내외이지만, 병렬 스트림은 999가 출력됨

 

그 이유는 쓰레드 풀을 생성하는 오버헤드가 존재하기 때문이다.

 

2) 해결방법

대량의 데이터 처리(10만개 이상)할 때만 사용하는 것을 권장함