JAVA/이것이 자바다

Chapter .16-2 스트림과 병렬처리

개념원리 2021. 8. 2. 20:31

16.6 정렬( sorted() )

스트림은 요소가 최종 처리되기 전에 중간 단계에서 요소를 정렬해서 최종 처리 순서를 변경할 수 있다.

요소를 정렬하는 메소드

객체 요소일 경우에는 클래스가 Comparable을 구현하지 않으면 sorted() 메소드를 호출했을때 ClassCastException이 발생하기 때문에 Comparable을 구현한 요소에서만 sorted() 메소드를 호출해야 한다.

 

다음은 점수를 기준으로 Student 요소를 오름차순으로 정렬하기 위해 Comparable을 구현했다. 

public class Student implements Comparable<Student> {
	private String name;
	private int score;
	
	public Student(String name, int score) {
		this.name = name;
		this.score = score;
	}

	public String getName() { return name; }
	public int getScore() { return score; }

	@Override
	public int compareTo(Student o) {
		return Integer.compare(score, o.score);
	}
}

 

객체 요소가 Comparable을 구현한 상태에서 기본 비교(Comparable) 방법으로 정렬하고 싶다면 다음 세 가지 방법 중 하나를 선택해서 sorted()를 호출하면 된다.

sorted();
sorted( (a,b) -> a.compareTo(b) );
sorted( Compartator.naturalOrder() );

만약 객체 요소가 Comparable을 구현하고 있지만, 기본 비교 방법과 정반대 방법으로 정렬하고 싶다면 다음과 같이 sorted()를 호출하면 된다.

sorted( (a,b) -> b.comparaTo(a) );
sorted( Comparator.reverseOrder() );

객체 요소가 Comparable를 구현하지 않았다면 Comparator를 매개값으로 갖는 sorted() 메소드를 사용하면 된다.

Comparator는 함수적 인터페이스이므로 다음과 같이 람다식으로 매개값을 작성할 수 있다.

sorted( (a,b) -> { .... } )

중괄호 {} 안에는 a와 b를 비교해서 a가 작으면 음수, 같으면 0, a가 크면 양수를 리턴하는 코드를 작성하면 된다.

 

예제를 보면 숫자 요일 경우에는 오름차순으로 정렬한 후 출력했다.

Student 요소일 경우에는 Student의 기본 비교(Comparable) 방법을 이용해서 점수를 기준으로 오름차순으로 정렬한 후 출력했다.

그리고 Comparator를 제공해서 점수를 기준으로 내림차순으로 정렬한 후 출력했다.

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.stream.IntStream;

public class SortingExample {
	public static void main(String[] args) {
		//숫자 요소일 경우
		IntStream intStream = Arrays.stream(new int[] {5, 3, 2, 1, 4});
		intStream
			.sorted()
			.forEach(n -> System.out.print(n + ","));
		System.out.println();
		
		//객체 요소일 경우
		List<Student> studentList = Arrays.asList(
			new Student("홍길동", 30),
			new Student("신용권", 10),
			new Student("유미선", 20)
		);
		
		studentList.stream()
			.sorted( )   //오름차순으로 정렬
			.forEach(s -> System.out.print(s.getScore() + ","));
		System.out.println();
		
		studentList.stream()
		.sorted( Comparator.reverseOrder() )
		.forEach(s -> System.out.print(s.getScore() + ","));	
	}
}

 

16.7 루핑( peek(), forEach() )

- 요소 전체를 반복하는 것

 

- peek( )

  • 중간 처리 메소드

  • 중간 처리 단계에서 전체 요소를 루핑하며 추가 작업 하기 위해 사용

  • 최종처리 메소드가 실행되지 않으면 지연

    –반드시 최종 처리 메소드가 호출되어야 동작

 

- forEach( )

  • 최종 처리 메소드

  • 파이프라인 마지막에 루핑하며 요소를 하나씩 처리

  • 요소를 소비하는 최종 처리 메소드

    –sum( )과 같은 다른 최종 메소드 호출 불가

 

루핑(looping)은 요소 전체를 반복하는 것을 말한다.

루핑하는 메소드에는 peek(), forEach()가 있다.

이 두 메소드는 루핑한다는 기능에서는 동일하지만, 동작 방식은 다르다.

peek()는 중간 처리 메소드이고, forEach()는 최종 처리 메소드이다.

 

peek()는 중간 처리 단ㄷ계에서 전체 요소를 루핑하면서 추가적인 작업을 하기 위해 사용한다.

최종처리 메소드가 실행되지 않으면 지연되기 때문에 반드시 최종처리 메소드가 호출되어야 동작하낟.

 

예를 들어 필터링 후 어떤 요소만 남았는지 확인하기 위해 다음과 같이 peek()를 마지막에서 호출할 경우, 스트림은 전혀 동작하지 않는다.

intStream
.filter( a-> a%2 == 0)
.peek( a-> System.out.println(a));

요소 처리의 최종 단계가 합을 구하는 것이라면, peek() 메소드 호출 후 sum()을 호출해야만 peek()가 정상적으로 작동한다.

intStream
.filter(a -> a%2==0)
.peek( a-> System.out.println(a) )
.sum()

 

하지만 forEach()는 최종 처리 메소드이기 때문에 파이프라인 마지막에 루핑하면서 요소를 하나씩 처리한다.

forEach()는 요소를 소비하는 최종 처리 메소드이므로 이후에 sum()과 같은 다른 최종 메소드를 호출하면 안된다.

import java.util.Arrays;

public class LoopingExample {
	public static void main(String[] args) {
		int[] intArr = { 1, 2, 3, 4, 5 };
		
		System.out.println("[peek()를 마지막에 호출한 경우]");
		Arrays.stream(intArr)
			.filter(a -> a%2==0)
			.peek(n -> System.out.println(n));   //동작하지 않음
		
		System.out.println("[최종 처리 메소드를 마지막에 호출한 경우]");
		int total = Arrays.stream(intArr)
			.filter(a -> a%2==0)
			.peek(n -> System.out.println(n))   //동작함
			.sum();
		System.out.println("총합: " + total);
		
		System.out.println("[forEach()를 마지막에 호출한 경우]");
		Arrays.stream(intArr)
			.filter(a -> a%2==0)
			.forEach(n -> System.out.println(n)); //동작함
	}
}

 

출력결과

 

16.8 매칭( allMatch(), nayMatch(), noneMatch() )

스트림 클래스는 최종 처리 단계에서 요소들이 특정 조건에 만족하는지 조사할 수 있도록 세가지 메소드를 제공하고 있다.

 

- allMatch ( ) 메소드

  • 모든 요소들이 매개값으로 주어진 Predicate의 조건을 만족하는지 조사

 

- anyMatch( ) 메소드

  • 최소한 한 개의 요소가 매개값으로 주어진 Predicate 조건을 만족하는지 조사

 

- noneMatch( ) 메소드

  • 모든 요소들이 매개값으로 주어진 Predicate의 조건을 만족하지 않는지 조사

 

다음 예제는 int[] 배열로부터 스트림을 생성하고, 모든 요소가 2의 배수인지, 하나라도 3의 배수가 존재하는지, 모든 요소가 3의 배수가 아닌지를 조사한다.

import java.util.Arrays;

public class MatchExample {
	public static void main(String[] args) {
		int[] intArr = { 2, 4 ,6 };
		
		boolean result = Arrays.stream(intArr)
			.allMatch(a -> a%2==0);
		System.out.println("모두 2의 배수인가? " + result);     //true
		
		result = Arrays.stream(intArr)
			.anyMatch(a -> a%3==0);
		System.out.println("하나라도 3의 배수가 있는가? " + result);   //true
		
		result = Arrays.stream(intArr)
			.noneMatch(a -> a%3==0);
		System.out.println("3의 배수가 없는가?  " + result);  //false
	}
}

 

16.9 기본 집계( sum(), count(), average(), max(), min() )

- 최종 처리 기능으로 요소들을 처리해 카운팅, 합계, 평균값, 최대값, 최소값 등과 같이 하나의 값으로

  산출하는 것

- 집계는 대량의 데이터를 가공해서 축소하는 리덕션 (Reduction)

- 스트림이 제공하는 기본 집계

 

집계(Aggregate)는 최종 처리 기능으로 요소들을 처리해서 카운팅, 합계, 평균값, 최대값, 최소값 등과 같이 하나의 값으로 산출하는 것을 말한다.

 

집계는 대량의 데이터를 가공해서 축소하는 리덕션(Reduction)이라고 볼 수 있다.

 

16.9.1 스트림이 제공하는 기본 집계

스트림이 제공하는 기본 집계

 

이 집계 메소드에서 리턴하는 OptionalXXX는 자바 8에서 추가한 java.util 패키지의 Optional, OptionalDouble, OptionalInt, OptionalLong 클래스 타입을 말한다.

 

이들은 값을 저장하는 값 기반 클래스(value-based class)들 이다.

이 객체에서 값을 얻기 위해서는 get(), getAsDouble(), getAsInt(), getAsLong()을 호출하면 된다.

import java.util.Arrays;

public class AggregateExample {
	public static void main(String[] args) {
		long count = Arrays.stream(new int[] {1, 2, 3, 4, 5})
			.filter(n -> n%2==0)
			.count();
		System.out.println("2의 배수 개수: " + count);  //2
		
		long sum = Arrays.stream(new int[] {1, 2, 3, 4, 5})
			.filter(n -> n%2==0)
			.sum();
		System.out.println("2의 배수의 합: " + sum); //6
		
		double avg = Arrays.stream(new int[] {1, 2, 3, 4, 5})
			.filter(n -> n%2==0)
			.average()
			.getAsDouble();
		System.out.println("2의 배수의 평균: " + avg);  //3.0
		
		int max = Arrays.stream(new int[] {1, 2, 3, 4, 5})
			.filter(n -> n%2==0)
			.max()
			.getAsInt();
		System.out.println("최대값: " + max);  //4
		
		int min = Arrays.stream(new int[] {1, 2, 3, 4, 5})
			.filter(n -> n%2==0)
			.min()
			.getAsInt();
		System.out.println("최소값: " + min);   //2
		
		int first = Arrays.stream(new int[] {1, 2, 3, 4, 5})
			.filter(n -> n%3==0)
			.findFirst()
			.getAsInt();
		System.out.println("첫번째 3의 배수: " + first);  //3
	}
}

 

 

16.9.2 Optional 클래스

- Optional, OptionalDouble, OptionalInt, OptionalLong 클래스

- 저장하는 값의 타입만 다를 뿐 제공하는 기능은 거의 동일

- 단순히 집계 값만 저장하는 것이 아님

- 집계 값이 존재하지 않을 경우 디폴트 값을 설정 가능

  집계 값을 처리하는 Consumer도 등록 가능

 

§Optional 클래스들이 제공하는 메소드들

 

컬렉션의 요소는 동적으로 추가되는 경우가 많다.

만약 컬렉션의 요소가 추가되지 않아 저장된 요소가 없을 경우 다음 코드는 어떻게 될까?

List<Integer> list = new ArrayList<>();
double avg = list.stream()
             .mapToInt(Integer :: intValue)
             .average()
             .getAsDouble();
System.out.println("평균 : "+avg);

요소가 없기 때문에 평균값도 가져올 수 없다.

그래서 NoSuchElementException 예외가 발생한다.

 

요소가 없을 경우 예외를 피하는 세 가지 방법이 있는데, 첫 번째는 Optional 객체를 얻어 isPresent() 메소드로 평균값 여부를 확인하는 것이다.

 

isPresent() 메소드가 true를 리턴할때만 getAsDouble() 메소드로 평균값을 얻으면 된다.

OptionalDouble optional = list.stream()
                          .mapToInt(Integer :: intValue )
                          .average();
if(optional.isPresent()){
    System.out.println("평균 : "+optional.getAsDouble());
}else{
    System.out.println("평균 : 0.0");
}

두번째 방법은 orElse() 메소드로 디폴트 값을 정해 놓는다.

평균값을 구할 수 없는 경우에는 orElse()의 매개값이 디폴트 값이 된다.

double avg = list.stream()
             .mapToInt(Integer :: intValue)
             .average()
             .orElse(0.0);
System.out.pinrltn("평균 : "+avg);

세번째 방법은 ifPresent() 메소드로 평균값이 있을 경우에만 값을 이용하는 람다식을 실행한다.

list.stream()
     .mapToInt(Integer :: intValue)
     .average()
     .ifPresent(a-> System.out.println("평균 : "+a));
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalDouble;

public class OptionalExample {
	public static void main(String[] args) {
		List<Integer> list = new ArrayList<>();
		
		/*//예외 발생(java.util.NoSuchElementException)
		double avg = list.stream()
			.mapToInt(Integer :: intValue)
			.average()	
			.getAsDouble(); 
		*/
		
		//방법1
		OptionalDouble optional = list.stream()
			.mapToInt(Integer :: intValue)
			.average();	
		if(optional.isPresent()) {
			System.out.println("방법1_평균: " + optional.getAsDouble());
		} else {
			System.out.println("방법1_평균: 0.0");
		}
		
		//방법2
		double avg = list.stream()
			.mapToInt(Integer :: intValue)
			.average()
			.orElse(0.0);
		System.out.println("방법2_평균: " + avg);
		
		//방법3
		list.stream()
			.mapToInt(Integer :: intValue)
			.average()
			.ifPresent(a -> System.out.println("방법3_평균: " + a));
	}
}

 

 

16.10 커스텀 집계( reduce() )

스트림은 기본 집계인 sum( ), average( ), count( ), max( ), min( )을 제공하지만, 프로그램화해서 다양한 집계 결과물을 만들 수 있도록 reduce() 메소드도 제공한다.

프로그램화해서 다양한 집계 결과물 만들 수 있도록 reduce( ) 메소드 제공

각 인터페이스에는 매개 타입으로 XXXOperator, 리턴 타입으로 OptionalXXX, int, long, double을 가지는 reduece() 메소드가 오버로딩 되어 있다.

 

스트림에 요소가 전혀 없을 경우 디폴트 값인 identity 매개값이 리턴된다.

XXXOperator 매개값은 집계 처리를 위한 람다식을 대입하는데, 예를 들어 학생들의 총점은 학생 스트림에서 점수 스트림으로 매핑해서 다음과 같이 얻을 수 있다. 

int sum = studentList.stream()
          .map(Student :: getScore)
          .reduce((a,b) -> a+b)
          .get();

스트림이 요소에 없을 경우 NoSuchElementException이 발생한다.

int sum = studentList.stream()
          .map(Student :: getScore)
          .reduce(0,(a,b) -> a+b);

디폴트 값(identity)인 0을 반환한다.

스트림에 요소가 있을 경우에는 두 코드가 모두 동일한 결과를 산출한다.

 

import java.util.Arrays;
import java.util.List;

public class ReductionExample {
	public static void main(String[] args) {
		List<Student> studentList = Arrays.asList(
				new Student("홍길동", 92),
				new Student("신용권", 95),
				new Student("감자바", 88)
		);
		
		int sum1 = studentList.stream()
				.mapToInt(Student :: getScore)
				.sum();
		
		int sum2 = studentList.stream()
				.map(Student :: getScore)
				.reduce((a, b) -> a+b)
				.get();
		
		int sum3 = studentList.stream()
						.map(Student :: getScore)
						.reduce(0, (a, b) -> a+b);
		
		System.out.println("sum1: " + sum1);
		System.out.println("sum2: " + sum2);
		System.out.println("sum3: " + sum3);
	}
}
public class Student {
	private String name;
	private int score;
	
	public Student(String name, int score) {
		this.name = name;
		this.score = score;
	}

	public String getName() { return name; }
	public int getScore() { return score; }
}

 

 

16.11 수집( collect() )

스트림은 요소들을 필터링 또는 매핑한 후 요소들을 수집하는 최종 처리 메소드인 collect()를 제공한다.

이 메소드를 이용하면 필요한 요소만 컬렉션으로 담을 수 있고, 요소들을 그룹핑한 후 집계(리덕션) 할 수 있다.

 

16.11.1 필터링한 요소 수집

- Stream의 collect (Collector<T,A,R> collector ) 메소드

  필터링 또는 매핑된 요소들을 새로운 컬렉션에 수집하고, 이 컬렉션 리턴

 

매개값인 Collector(수집기)는 어떤 요소를 어떤 컬렉션에 수집할 것인지를 결정한다.

Collector의 타입 파라미터 T는 요소이고, A는 누적기(accumulator)이다.

그리고 R은 요소가 저장될 컬렉션이다.

 

풀어서 해석하면 T 요소를 A 누적기가 R에 저장한다는 의미이다.

 

Collector의 구현 객체는 다음과 같이 Collectors 클래스의 다양한 정적 메소드를 이용해서 얻을 수 있다.

리턴값인 Collector를 보면 A(누적기)가 ? 로 되어 있는데, 이것은 Collector가 R(컬렉션)에 T(요소)를 저장하는 방법을 알고 있어 A(누적기)가 필요 없기 때문이다.

 

Map과 ConcurrentMap의 차이점은 Map은 스레드에 안전하지 않고, ConcurrentMap은 스레드에 안전하다.

멀티 스레드 환경이라면 ConcurrentMap을 얻는 것이 좋다.

 

다음 코드는 전체 학생중에서 남학생들만 필터링해서 별도의 List로 저장한다.

1) Stream<Student> totalStream = totalList.stream();
2) Stream<Student> maleStream = totalStream.filter(s->s.getSex == Student.Sex.MALE);
3) Collector<Student, ? , List<Student>> collecor = Collectors.toList();
4) List<Student> maleList = maleStream.collect(collector);

1) 전체 학생 List에서 stream을 얻는다.

2) 남학생만 필터링해서 Stream을 얻는다.

3) List에 Student를 수집하는 Collector를 얻는다.

4) Stream에서 collect() 메소드로 Student를 수집해서 새로운 List를 얻는다.

 

상기 코드에서 변수를 생략하면 다음과 같이 간단하게 작성할 수 있다.

List<Student> maleList = totalList.stream()
                         .filter(s->s.getSex() ==Student.Sex.MALE)
                         .collect(Collectors.toList());
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class ToListExample {
	public static void main(String[] args) {
		List<Student> totalList = Arrays.asList(
				new Student("홍길동", 10, Student.Sex.MALE),
				new Student("김수애", 6, Student.Sex.FEMALE),
				new Student("신용권", 10, Student.Sex.MALE),
				new Student("박수미", 6, Student.Sex.FEMALE)
		);
		
		//남학생들만 묶어 List 생성
		List<Student> maleList = totalList.stream()
				.filter(s->s.getSex()==Student.Sex.MALE)
				.collect(Collectors.toList());
		maleList.stream()
			.forEach(s -> System.out.println(s.getName()));
		
		System.out.println();
		
		//여학생들만 묶어 HashSet 생성
		Set<Student> femaleSet = totalList.stream()
				.filter(s -> s.getSex() == Student.Sex.FEMALE)
				//.collect(Collectors.toCollection(HashSet :: new));
				//.collect( Collectors.toCollection(()->{return new HashSet<Student>();}) );
				.collect( Collectors.toCollection(()->new HashSet<Student>()) );
		femaleSet.stream()
			.forEach(s -> System.out.println(s.getName()));
	}
}
public class Student {
	public enum Sex { MALE, FEMALE }
	public enum City { Seoul, Pusan }
	
	private String name;
	private int score;
	private Sex sex;
	private City city;
	
	public Student(String name, int score, Sex sex) {
		this.name = name;
		this.score = score;
		this.sex = sex;
	}	
	
	public Student(String name, int score, Sex sex, City city) {
		this.name = name;
		this.score = score;
		this.sex = sex;
		this.city = city;
	}

	public String getName() { return name; }
	public int getScore() { return score; }
	public Sex getSex() { return sex; }
	public City getCity() { return city; }
}

 

 

16.11.2 사용자 정의 컨테이너에 수집하기

이번에는 List, Set, Map과 같은 컬렉션이 아니라 사용자 정의 컨테이너 객체에 수집하는 방법에 대해 알아보기로 하자.

스트림은 요소들을 필터링, 또는 매핑 해서 사용자 정의 컨테이너 객체에 수집할 수 있도록 다음과 같이 collect() 메소드를 추가적으로 제공한다.

- 첫번째 Supplier는 오소들이 수집될 컨테이너 객체(R)을 생성하는 역할을 한다.

  순차처리(싱글 스레드) 스트림에서는 단 한번 Supplier가 실행되고 하나의 컨테이너 객체를 생성한다.

  병렬처리(멀티 스레드) 스트림에서는 여러 번 Supplier가 실행되고 스레드별로 여러 개의 컨테이너 객체를 생성한다.

  하지만 최종적으로 하나의 컨테이너 객체로 결합된다.

 

- 두번째 XXXConsumer는 컨테이느 객체(R)에 요소(T)를 수집하는 역할을 한다.

  스트림에서 요소를 컨테이너에 수집할 때마다 XXXConsumer가 실행된다.

 

- 세번째 BiConsumer는 컨테이너 객체(R)을 결합하는 역할을 한다.

   순차 처리 스트림에서는 호출되지 않고, 병렬 처리 스트림에서만 호출되어 스레드별로 생성된 컨테이너 객체를 결합     해서 최종 컨테이너 객체를 완성한다.

   

 

리턴타입 R은 요소들이 최종 수집된 컨테이너 객체이다.

순차 처리 스트림에서는 리턴 객체가 첫 번쨰 Supplier가 생성한 객체이지만, 병렬 처리 스트림에서는 최종 결합된 컨테이너가 객체가 된다.

 

병렬 처리는 다음절에서 살펴 보기로 하고, 여기서는 순차처리를 이용해서 사용자 정의 객체에 요소를 수집하는 것을 살펴보기로 하자.

 

학생들 중에서 남학생만 수집하는 MaleStudent 컨테이너가 다음과 같이 정의되어있다고 가정해보자.

import java.util.ArrayList;
import java.util.List;

public class MaleStudent  {
	private List<Student> list;  //요소를 저장할 컬렉션
	
	public MaleStudent() {
		list = new ArrayList<Student>();
		System.out.println("[" + Thread.currentThread().getName() + "] MaleStudent()");
	}
	
	public void accumulate(Student student) { //요소를 수집하는 메소드
		list.add(student);
		System.out.println("[" + Thread.currentThread().getName() + "] accumulate()");
	}
	
	public void combine(MaleStudent other) { //두 MaleStudent를 결합하는 메소드
		list.addAll(other.getList());        //(병렬 처리시에만 호출)
		System.out.println("[" + Thread.currentThread().getName() + "] combine()");
	}
	
	public List<Student> getList() {   //요소가 저장될 컬렉션을 리턴
		return list;
	}
}

list 필드는 남학생들이 수집될 필드이다.

 

9번째 라인 System.out.println("[" + Thread.currentThread().getName() + "] MaleStudent()"); 는  MaleStudent() 새엇ㅇ자가 몇 번 호출되었는지 확인하기 위해 호출한 스레드의 이름과 함께 생성자 이름을 출력한다.

 

순차 처리 스트림에서 MaleStudent() 생성자는 딱 한번 호출되고, 하나의 maleStudent 객체만 생성된다.

 

accumulate() 메소드는 매개값으로 받은 Student를 list필드에 수집하는데

14라인 System.out.println("[" + Thread.currentThread().getName() + "] accumulate()"); 에서 accumulate() 메소드가 몇 번 실행되었는지 확인하기 위해 호출한 스레드의 이름과 함께 메소드 이름을 호출해 보았다.

 

combine() 메소드는 병렬 처리 스트림을 사용할 떄 다른 MaleStudent와 결합할 목적으로 실행된다.

 

순차 처리 스트림에서는 호출되지 않기 때문에 정의할 필요가 없다고 생각되지만, collect() 메소드의 세 번째 매개값인 BiConsumer를 생성하기 위해서는 필요하다.

 

자 그럼 스트림에서 읽은 남학생을 MaleStudent에 수집하는 코드를 보자.

 

1) Stream<Student> totalStream = totalList.stream();
2) Stream<Student> maleStream = totalStream.filter(s->s.getSex() == Student.Sex.MALE);

3) Supplier<MaleStudent< supplier = () -> new MaleStudent();
4) Biconsumer<MaleStudent,Student> accumulator = (ms, s) -> ms.accumulate(s);
5) MaleStudent maleStudent = maleStream.collect(supplier, accumulator, combiner);

6) MaleStudent maleStudent = maleStrea.collect(supplier, accumulator, combiner);

1) 전체 학생 List에서 Steram을 얻는다.

2) 남학생만 필터링 해서 Stream을 얻는다.

3) MaleStudent를 공급하는 Supplier를 얻는다.

4) MaleStudent와 Student를 매개값으로 받아서 MaleStudent의 accumulate() 메소드로 Student를 수집하는               

    BiConsumer를 얻는다.

5) 두 개의 MaleStudent를 매개값으로 받아 combine() 메소드로 결합하는 BiConsumer를 얻는다

6) supplier가 제공하는 MaleStudent에 accumulator가 Student를 수집해서 최종 처리된 MaleStudent를 얻는다.

   싱글 스레드에서는 combiner는 사용되지 않는다.

 

상기 코드에서 변수를 생략하면 다음과 같이 간단하게 작성할 수 있다.

MaleStudent maleStudent = totalList.stream()
                          .filter(s->s.getSex() == Student.Sex.MALE)
                          .collect(
                            () -> new MaleStudent(),
                            (r, t) -> r.accumulate(t),
                            (r1, r2) -> r1.combine(r2)
                          );

람다식을 메소드 참조로 변경하면 다음과 같이 더욱 간단하게 작성할 수 있다.

MaleStudent maleStudent = totalList.steram()
                         .filter(s -> s.getSex() == Student.Sex.MALE)
                         .collect(MaleStudent :: new, MaleStudent :: accumulate, MaleStudent :: combine);

 

다음 예제는 순차 스트림을 이용해서 사용자 정의 컨테이너인 MaleStudent에 남학생만 수집한다.

import java.util.Arrays;
import java.util.List;

public class MaleStudentExample {
	public static void main(String[] args) {
		List<Student> totalList = Arrays.asList(
				new Student("홍길동", 10, Student.Sex.MALE),
				new Student("김수애", 6, Student.Sex.FEMALE),
				new Student("신용권", 10, Student.Sex.MALE),
				new Student("박수미", 6, Student.Sex.FEMALE)
		);
		
		MaleStudent maleStudent = totalList.stream()
				.filter(s -> s.getSex() == Student.Sex.MALE)
				//.collect(MaleStudent :: new, MaleStudent :: accumulate, MaleStudent :: combine); 
				.collect(()->new MaleStudent(), (r, t)->r.accumulate(t), (r1, r2)->r1.combine(r2));
		
		maleStudent.getList().stream()
			.forEach(s -> System.out.println(s.getName()));
	}
}

 

16.11.3 요소를 그룹핑해서 수집

collect() 메소드는 단순히 요소를 수집하는 기능 이외에 컬렉션의 요소들을 그룹핑해서 Map 객체를 생성하는 기능도 제공한다.

 

collect()를 호출할 때 Collectors의 groupingBy() 또는 groupingByConcurrent()가 리턴하는 Collector를 매개갑으로 대입하면 된다.

 

groupingBy()는 스레드에 안전하지 않은 Map을 생성하지만,

groupingByConcurrent()는 스레드에 안전한 ConcurrentMap을 생성한다.

 

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class GroupingByExample {
	public static void main(String[] args) {
		List<Student> totalList = Arrays.asList(
				new Student("홍길동", 10, Student.Sex.MALE, Student.City.Seoul),
				new Student("김수애", 6, Student.Sex.FEMALE, Student.City.Pusan),
				new Student("신용권", 10, Student.Sex.MALE, Student.City.Pusan),
				new Student("박수미", 6, Student.Sex.FEMALE, Student.City.Seoul)
		);
		
		Map<Student.Sex, List<Student>> mapBySex = totalList.stream()
				.collect(Collectors.groupingBy(Student :: getSex));
		System.out.print("[남학생] ");
		mapBySex.get(Student.Sex.MALE).stream().forEach(s->System.out.print(s.getName() + " "));
		System.out.print("\n[여학생] ");
		mapBySex.get(Student.Sex.FEMALE).stream().forEach(s->System.out.print(s.getName() + " "));
		
		System.out.println();
		
		Map<Student.City, List<String>> mapByCity = totalList.stream()
				.collect(
						Collectors.groupingBy(
								Student::getCity, 
								Collectors.mapping(Student::getName, Collectors.toList())
						)
				); 
		System.out.print("\n[서울] ");
		mapByCity.get(Student.City.Seoul).stream().forEach(s->System.out.print(s + " "));
		System.out.print("\n[부산] ");
		mapByCity.get(Student.City.Pusan).stream().forEach(s->System.out.print(s + " "));	
	}
}

 

 

16.11.4 그룹핑 후 매핑 및 집계

Collectors.groupingBy() 메소드는 그룹핑 후, 매핑이나 집계(평균, 카운팅, 연결, 최대, 최소, 합계)를 할 수 있도록 두 번째 매개값으로 Collector를 가질 수 있다.

 

이전 예제에서 그룹핑된 학생 객체를 학생 이름으로 매핑하기 위해 mapping() 메소드로 Collector를 얻었다.

Collectors는 mapping() 메소드 이외에도 집계를 위한 다양한 Collector를 리턴하는 다음과 같은 메소드를 제공하고 있다.

다음 코드는 학생들을 성별로 그룹핑한 다음 같은 그룹에속하는 학생들의 평균 점수를 구하고, 성별을 키로, 평군 점수를 값으로 갖는 Map을 생성한다.

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.ToDoubleFunction;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class GroupingAndReductionExample {
	public static void main(String[] args) {
		List<Student> totalList = Arrays.asList(
				new Student("홍길동", 10, Student.Sex.MALE),
				new Student("김수애", 12, Student.Sex.FEMALE),
				new Student("신용권", 10, Student.Sex.MALE),
				new Student("박수미", 12, Student.Sex.FEMALE)
		);
		
		//성별로 평균 점수를 저장하는 Map 얻기
		Stream<Student> totalStream = totalList.stream();
		Function<Student, Student.Sex> classifier = Student :: getSex;
		ToDoubleFunction<Student> mapper = Student :: getScore;
		Collector<Student, ?, Double> collector1 = Collectors.averagingDouble(mapper);
		Collector<Student, ?, Map<Student.Sex, Double>> collector2 = Collectors.groupingBy(classifier, collector1);
		Map<Student.Sex, Double> mapBySex = totalStream.collect(collector2);
		
		/*Map<Student.Sex, Double> mapBySex = totalList.stream()
				.collect(
					Collectors.groupingBy(
						Student :: getSex, 
						Collectors.averagingDouble(Student :: getScore)
					)
				);*/
		
		System.out.println("남학생 평균 점수: " + mapBySex.get(Student.Sex.MALE));
		System.out.println("여학생 평균 점수: " + mapBySex.get(Student.Sex.FEMALE));
		
		//성별로 쉼표로 구분된 이름을 저장하는 Map 얻기
		Map<Student.Sex, String> mapByName = totalList.stream()
				.collect(
					Collectors.groupingBy(
						Student :: getSex, 
						Collectors.mapping(
							Student :: getName,
							Collectors.joining(",")
						)
					)
				);
		System.out.println("남학생 전체 이름: " + mapByName.get(Student.Sex.MALE));
		System.out.println("여학생 전체 이름: " + mapByName.get(Student.Sex.FEMALE));
	}
}

 

16.12 병렬처리

병렬처리(Parallel Operation)란 멀티 코어 CPU환경에서 하나의 작업을 분할해서 각각의 코어가 병렬적으로 처리하는 것을 말하는데, 병렬 처리의 목적은 작업 처리 시간을 줄이기 위한 것이다.

 

자바 8부터 요소를 병렬 처리할 수 있도록 하기 위해 병렬 스트림을 제공하기 때문에 컬렉션(배열)의 전체 요소 처리 시간을 줄여 준다.

 

 

16.12.1 동시성(Concurrency)과 병렬성(Parallelism)

멀티 스레드는 동시성 또는 병렬성으로 실행되기 때문에 이 용어들에 대해 정확히 이해하는 것이 좋다.

 

- 동시성 - 멀티 작업을 위해 멀티 스레드가 번갈아 가며 실행하는 성질

  • 싱글 코어 CPU를 이용한 멀티 작업

    – 병렬적으로 실행되는 것처럼 보임

    – 실체는 번갈아 가며 실행하는 동시성 작업

 

- 병렬성 - 멀티 작업 위해 멀티 코어 이용해서 동시에 실행하는 성질

 

 

병렬성은 데이터 병렬성과 작업 병렬성으로 구분할 수 있다.

 

데이터 병렬성(Data Parallelism)

  • 전체 데이터를 쪼개어 서브 데이터들로 만든 뒤 병렬 처리해 작업을 빨리 끝내는 것

 

  • 자바 8에서 지원하는 병렬 스트림은 데이터 병렬성을 구현한 것

 

  • 멀티 코어의 수만큼 대용량 요소를 서브 요소들로 나누고

  • 각각의 서브 요소들을 분리된 스레드에서 병렬 처리

  ex) 쿼드 코어(Quad Core) CPU일 경우 4개의 서브 요소들로  나누고, 4개의 스레드가 각각의 서브 요소들을 병렬 처리

 

데이터 병렬성은 전체 데이터를 쪼개어 서브 데이터들로 만들고 이 서브 데이터들을 병렬 처리해서 작업을 빨리 끝내는 것을 말한다.

 

자바 8에서 지원하는 병렬 스트림은 데이터 병렬성을 구현한 것이다.

 

멀티 코어 수만큼 대용량 요소를 서브 요소로 나누고, 각각의 서브 요소들은 분리된 스레드에서 병렬처리 시킨다.

예를 들어 쿼드 코어(Quad Core) CPU일 경우 4개의 서브 요소들로 나누고, 4개의 스레드가 각각의 서브 요소들을 병렬처리 한다.

 

작업 병렬성(Task Parallelism)

  • 작업 병렬성은 서로 다른 작업을 병렬 처리하는 것

  • Ex) 웹 서버 (Web Server )

    – 각각의 브라우저에서 요청한 내용을 개별 스레드에서 병렬로 처리

 

작업 병렬성은 서로 다른 작업을 병렬로 처리하는 것을 말한다.

작업 병렬성의 대표적인 예는 웹서버(Web Server)이다. 웹 서버는 각각의 브라우저에서 요청한 내용을 개별 스레드에서 병렬로 처리한다.

 

 

16.12.2 포크조인(ForkJoin) 프레임워크

- 런타임 시 포크조인 프레임워크 동작

 

- 포크 단계에서는 전체 데이터를 서브 데이터로 분리

- 서브 데이터를 멀티 코어에서 병렬로 처리

- 조인 단계에서는 서브 결과를 결합해서 최종 결과 도출

 

 

병렬 스트림은 요소들을 병렬 처리하기 위해 포크조인(ForkJoin) 프레임워크를 사용한다.

병렬 스트림을 이용하면 런타임 시에 포크조인 프레임워크가 동작하는데,

 

포크 단게에서는 전체 데이터를 서브 데이터로 분리한다.

그리고 나서 서브 데이터를 멀티 코어에서 병렬로 처리한다.

 

조인 단계에서는 서브 결과를 결합해서 최종 결과를 만들어 낸다.

ForkJoinPool 사용해 작업 스레드 관리

 

병렬 처리 스트림은 실제로 포크 단계에서 차레대로 요소를 4등분 하지 않는다.

 

이해하기 쉽도록 하기 위해 위 그림은 차례대로 4등분 했지만, 내부적으로 서브 요소를 나누는 알고리즘이 있다.

포크조인 프레임워크는 포크와 조인 기능 이외에 스레드풀인 ForkJoinPool을 제공한다.

 

각각의 코어에서 서브 요소를 처리하는 것은 개별 스레드가 해야 하므로 스레드 관리가 필요하다.

포크조인 프레임워크는 ExecutorService의 구현 객체인 ForkJoinPool을 사용해서 작업 스레드를 관리한다.

 

 

16.12.3 병렬 스트림 생성

병렬 처리를 위해 코드에서 포크조인 프레임워크를 직접 사용할 수는 있지만, 병렬 스트림을 이용할 경우에는 백그라운드에서 포크 조인 프레임워크가 사용되기 때문에 매우 쉽게 병렬 처리를 할 수 있다.

병렬 스트림을 얻는 메소드

- 병렬 스트림을 얻는 메소드

  • parallelStream ( ) 메소드

   – 컬렉션으로부터 병렬 스트림을 바로 리턴

  • parallel ( ) 메소드

   –순차 처리 스트림을 병렬 처리 스트림으로 변환해서 리턴

 

 

기존 코드

MaleStudent maleStudent = totalList.steram()
                          .filter(s -> s.getSex() == Student.Sex.MALE)
                          .collect(MaleStudemt :: new, MaleStudent :: accumulate, MaleStudent :: combine);

 

병렬 처리 코드

MaleStudent maleStudent = totalList.parallelSteram()   //parallelSteram으로 변경
                          .filter(s -> s.getSex() == Student.Sex.MALE)
                          .collect(MaleStudemt :: new, MaleStudent :: accumulate, MaleStudent :: combine);

 

단순히 stream()에서 parallelSteram()으로 변경 되었지만 내부 동작은 다음과 같은 순서로 진행된다.

 

1) 쿼드 코어 CPU에서 실행된다면 전체 요소는 4개의 서브 요소로 나눠지고, 4개의 스레드가 병렬처리한다.

각 스레드는 서브 요소를 수집해야 하므로 4개의 MaleStudent 객체를 생성하기 위해 collect()의 첫 번쨰 메소드 참조인 MaleStudent :: new를 4번 실행시킨다.

 

2) 각 스레드는 MaleStudent 객체에 남학생 요소를 수집하기 위해 두 번쨰 메소드 참조인 MaleStudent :: accumulate를 매번 실행시킨다.

 

3) 수집이 완료된 4개의 MaleStudent는 3번의 결합으로 최종 MaleStudent가 만들어 질 수 있으므로 세 번쨰 메소드 참조인 MaleStudent :: combine이 3번 실행된다.

 

import java.util.Arrays;
import java.util.List;

public class MaleStudentExample {
	public static void main(String[] args) {
		List<Student> totalList = Arrays.asList(
				new Student("홍길동", 10, Student.Sex.MALE),
				new Student("김수애", 10, Student.Sex.FEMALE),
				new Student("신용권", 10, Student.Sex.MALE),
				new Student("박수미", 10, Student.Sex.FEMALE)
		);
		
		MaleStudent maleStudent = totalList.parallelStream()
				.filter(s -> s.getSex() == Student.Sex.MALE)
				.collect(MaleStudent :: new, MaleStudent :: accumulate, MaleStudent :: combine); 
		
		maleStudent.getList().stream()
			.forEach(s -> System.out.println(s.getName()));
	}
}

실행 결과

실행 결과를 보면 main 스레드와 ForkJoinPool에서 3개의 스레드가 사용되어 총 4개의 스레드가 동작한다.

 

각각의 스레드가 하나의 서브 작업이라고 본다면 총 4개의 서브 작업으로 분리되었다고 생각하면 된다.

 

각 서브 작업은 남학생을 누적시킬 MaleStudent 객체를 별도로 생성하기 때문에 MaleStudent()는 생성자가 4번 실행되었다.

 

하지만 전체 학생 중에서 남학생이 2명밖에 없으므로 accumulate()는 2번밖에 호출되지 않았다.

 

누적이 완료된 4개의 MaleStudent 객체는 3번의 결합으로 최종 MaleStudent가 만들어지므로 combile() 메소드가 3번 호출 되었다.

서브작업으로 나누고 하나로 결합

 

 

다음은 4개의 코어가 병렬적으로 요소를 처리하고 있는 모습을 보여준다.

학생수를 100명으로 늘리고 쿼드 코어 CPU에서 테스트 한 결과

 

 

16.12.4 병렬 처리 성능

스트림 병렬처리가 스트림 순차처리보다 항상 실행 성능이 좋다고 판단해서는 안된다.

병렬처리에 미치는 다음 3가지 요인을 잘 살펴보아야 한다.

 

요소의 수와 요소당 처리 시간

  - 요소 수가 적고 요소당 처리 시간 짧으면 순차 처리가 빠름

  - 병렬 처리는 스레드풀 생성, 스레드 생성이라는 추가적인 비용이 발생하기 때문이다.

 

스트림 소스의 종류

  - ArrayList, 배열은 인덱스로 요소 관리 병렬처리가 빠름

  - 반면에 HashSet, TreeSet은 요소 분리가 쉽지 않고, LinkedLIst역시 링크를 따라가야 하므로 요소 분리가 쉽지 않다.

    따라서 이들은 ArrayList, 배열 보다는 상대적으로 병렬 처리가 늦다.

 

코어(Core)의 수 싱글 코어의 경우 순차 처리가 빠름

  - 싱글 코어 CPU일 경우에는 순차처리가 빠르다.

    병렬 스트림을 사용할 경우 스레드의 수만 증가하고 동시성 작업으로 처리되기 때문에 좋지 못한 결과를 준다.

    코어의 수가 많으면 많을수록 병렬 작업 처리 속도는 빨라진다.

 

 

다음 예제는

work() 메소드의 실행 시간(요소당 처리 시간)을 조정함으로써 순차처리와 병렬처리 중 어떤 것이 전체 요소를 빨리 처리하는지 테스트한다.

import java.util.Arrays;
import java.util.List;

public class SequencialVsParallelExample {
	//요소 처리
	public static void work(int value) {
		try { Thread.sleep(100); } catch (InterruptedException e) {}
	}

	//순차 처리
	public static long testSequencial(List<Integer> list) {
		long start = System.nanoTime();
		list.stream().forEach((a) -> work(a));
		long end = System.nanoTime();
	    long runTime = end - start;
	    return runTime;
	}
	
	//병렬 처리
	public static long testParallel(List<Integer> list) {
		long start = System.nanoTime();
		list.stream().parallel().forEach((a)-> work(a));
		long end = System.nanoTime();
	    long runTime = end - start;
	    return runTime;
	}
	
	public static void main(String... args) {
		//소스 컬렉션
		List<Integer> list = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
		
		//순차 스트림 처리 시간 구하기
		long Sequencial = testSequencial(list);
		
		//병렬 스트림 처리 시간 구하기
		long timeParallel = testParallel(list);
		
		if(Sequencial < timeParallel) {
			System.out.println("성능 테스트 결과: 순차 처리가 더빠름");
		} else {
			System.out.println("성능 테스트 결과: 병렬 처리가 더빠름");
		}
	}
}

 

이 예제의 실행결과는 work()의 요소 처리 시간에 따라 달라진다.

필자가 진행해본 결과 Thread.sleep(10)으로 실생하면 순차처리가 더 빨랐다.

 

그렇기 때문에 실제 작업 내용을 작성한 후에는 순차 처리와 병렬 처리 중 어떤 처리가 유리한지 테스트해보아야 한다.

 

 

 

다음 예제는 스트림 소스가 ArrayList인 경우와 LinkedList인 경우 대용량 데이터의 병렬 처리 성능을 테스트한 것이다.

백만 개의integer 객체를 각각 ArrayList와 LInkedList에 저장하고 테스트 하였다.

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class ArrayListVsLinkedListExample {
	//요소 처리
	public static void work(int value) {
	}
	
	//병렬 처리
	public static long testParallel(List<Integer> list) {
		long start = System.nanoTime();
		list.stream().parallel().forEach((a)-> work(a));
		long end = System.nanoTime();
	    long runTime = end - start;
	    return runTime;
	}
	
	public static void main(String... args) {
		//소스 컬렉션
		List<Integer> arrayList = new ArrayList<Integer>();
		List<Integer> linkedList = new LinkedList<Integer>();
		for(int i=0; i<1000000; i++) {
			arrayList.add(i);
			linkedList.add(i);
		}
		
		//워밍업 (실행 준비 과정에서 오차를 줄이기 위함)
		long arrayListListParallel = testParallel(arrayList);
		long linkedListParallel = testParallel(linkedList);
		
		//병렬 스트림 처리 시간 구하기
		arrayListListParallel = testParallel(arrayList);
		linkedListParallel = testParallel(linkedList);
		
		if(arrayListListParallel < linkedListParallel) {
			System.out.println("성능 테스트 결과: ArrayList 처리가 더빠름");
		} else {
			System.out.println("성능 테스트 결과: LinkedList 처리가 더빠름");
		}
	}
}

 

실행 결과는 ArrayList가 빠른 실행 성능을 보였다.

 

하지만 요소의 개수가 적을 경우 LinkedList가 더 빠른 성능을 보였다.