아래 카운터를 락으로 적절히 보호하지 않은 상태로 멀티스레드 환경에서 실행하면 다른 스레드가 수정한 결과가 소실될 수도 있다.
public class Counter {
private int i = 0;
public int increment() {
return i = i + 1;
}
}
synchronized
키워드로 해결할 수 있고 자바 5 이전엔 이 방법이 유일했다.synchronized
때문에 프로그램이 더 느려질 수도 있기 때문
sychronized
키워드가 나타내는 의미
synchronized
를 적용하면 소실된 업데이트 현상이 나타난다.java.util.concurrent
패키지는 멀티스레드 애플리케이션을 더 쉽게 개발할 수 있게 설계된 라이브러리다.
sun.misc.Unsafe
클래스를 통해 엑세스한다.
Unsafe
는 거의 모든 주요 프레임워크의 구현 핵심부가 됐다.
get()
으로 계산한 결과를 돌려 받는다.
volatile
확장판이라고 할 수 있으며 더 유연해서 상태 의존적 업데이트가 가능AtomicInteger
는 Integer
를 상속하지 않음Unsafe
를 이용한 아토믹 구현 원리
public class AtomicIntegerExample extends Number {
private volatile int value;
// Unsafe.compareAndSwapInt로 업데이트하기 위해 설정
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset(
AtomicIntegerExample.class.getDeclaredField("value"));
} catch (Exception ex) {
throw new Error(ex);
}
}
public final int get() {
return value;
}
public final void set(int newValue) {
value = newValue;
}
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
// 생략
}
Unsafe
의 getAndSetInt()
메서드를 사용했고 JVM을 호출하는 네이티브 코드가 핵심 public final int getAndSetInt(Object o, long offset, int newValue) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, newValue));
return v;
}
public native int getIntVolatile(Object o, long offset);
public final native boolean compareAndSwapInt(Object o, long offset,
int expected, int x);
Unsafe
내부에서 루프를 이용해 CAS를 반복 시도한다.Unsafe
에 이미 존재하는 것java.util.concurrent.locks.Lock
이 추가되었다.lock()
newCondition()
tryLock()
unlock()
lock()
에 대응되는 후속 호출로 락을 해제한다.ReentrantLock
Lock
의 주요 구현체로 내부적으로 int
값으로 compareAndSwap()
을 한다.compareAndSet()
을 호출하고 Unsafe
를 사용하는 Sync
는 스레드를 파킹 및 재개하는 메서드가 구현된 LockSupport
클래스를 사용한다.
Sync
는AbstractQueuedSynchronizer
를 확장한 정적 서브 클래스LockSupport
클래스는 스레드에게 퍼밋(허가증)을 발급한다. (퍼밋이 없으면 스레드는 기다려야 함)LockSupport는 오직 한 가지 퍼밋만 발급 (바이너리 세마포어)
while (!canProceed()) { ... LockSupport.park(this); }}
synchronized
나 (조건 없는) ReentrantLock
을 쓰면 한 가지 락 정책을 따를 수밖에 없다.
ReentrantReadWriteLock
클래스를 활용하면 여러 읽기 스레드 작업 중 다른 읽기 스레드를 블로킹하지 않을 수 있다.
public class AgeCache {
private final ReentratReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock readLock = rwl.readLock();
private final Lock writeLock = rwl.writeLock();
private Map<String, Integer> ageCache = new HashMap<>();
public Integer getAge(String name) {
readLock.lock();
try {
return ageCache.get(name);
} finally {
readLock.unLock();
}
}
public void updateAge(String name, int newAge) {
writeLock.lock();
try {
ageCache.put(name, newAge);
} finally {
writeLock.unLock();
}
}
}
‘최대 O개 객체까지만 엑세스를 허용’
// 퍼밋은 2개, 공정 모드로 설정된 세마포어 생성
private Semaphore poolPermits = new Semaphore(2, true);
Semaphore
클래스의 acquire()
메서드로 사용 가능한 퍼밋 수를 하나씩 줄인다.
release()
메서드는 퍼밋을 반납하고 대기 중인 스레드 중 하나에게 퍼밋을 전달Map
구현체 (ConcurrentHashMap
)
Map
곳곳을 읽는 동안 쓰기가 필요할 경우 한 세그먼트에만 락을 거는 것도 가능ConcurrentModiFicationException
이 거의 발생하지 않는다.CopyOnWriteArrayList
, CopyOnWriteArraySet
ConcurrentModiFicationException이
발생하지 않는다.모든 스레드가 테스크1 → 테스크2 → 테스크3로 진행되는 것이 이상적이라면 래치를 쓰기 좋은 경우다.
public class CountdownLatchEx implements Runnable {
private final CountDownLatch latch;
public CountdownLatchEx(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " api call done");
try {
latch.countDown();
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " continue processing");
}
public static void main(String[] args) throws InterruptedException {
CountDownLatch apiLatch = new CountDownLatch(5);
ExecutorService pool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
pool.submit(new CountdownLatchEx(apiLatch));
}
System.out.println(Thread.currentThread().getName() + " await main");
apiLatch.await();
pool.shutdown();
try {
pool.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " all processing complete");
}
}
countdown()
을 호출할 때마다 카운트 값이 1씩 감소한다.카운트가 0이 되면 래치가 열리고 await()
에 매여 있든 스레드가 해제된다.
pool-1-thread-5 api call done
main await main
pool-1-thread-3 api call done
pool-1-thread-1 api call done
pool-1-thread-4 api call done
pool-1-thread-2 api call done
pool-1-thread-2 continue processing
pool-1-thread-5 continue processing
pool-1-thread-3 continue processing
pool-1-thread-1 continue processing
pool-1-thread-4 continue processing
main all processing complete
java.util.concurrent
패키지의 적절히 추상화된 기능을 골라 쓰는 편이 낫다.Callable
: 자바에서 태스크를 추상화하는 방법
Callable<V>
은 call()
메서드만 존재하는 제네릭 인터페이스V
타입을 반환하되 결과값을 계산할 수 없으면 예외를 던진다.Runnable
은 결과값과 예외를 반환하지 않는다는 점이 Callable
과 다르다.자바 스레드는 OS 수준 프로세스와 동등하며 생성 비용이 비싼 경우도 있다.
Runnable
에서 결과를 가져오는 방식은 다른 스레드를 상대로 실행 반환을 조정해야 하기에 복잡도가 가증될 수 있다.
ExecutorService
: 관리되는 스레드 풀에서 태스크 실행 메커니즘을 규정한 인터페이스
submit()
메서드를 통해 Runnable
또는 Callable
객체를 받는다.submit()
메서드가 반환하는 Future<V>
객체의 get()
메서드를 통해 결과를 받을 때까지 블로킹할 수 있고 isDone()
으로 논블로킹 호출할 수도 있다.Executors
: 로직에 따라 서비스 및 기반 스레드 풀(ExecutorService
)을 생성하는 new*
팩토리 메서드를 제공하는 헬퍼 클래스
newFixtedThreadPool(int nThreads)
newCachedThreadPool()
newSingleThreadExecutor()
newScheduledThreadPool(int corePoolSize)
Callabler
과 지연 시간을 전달 받는 메서드가 존재ExecutorService
선택의 장점
ThreadFactory
를 이용해 커스텀 스레드 생성기를 작성할 수는 있다.ForkJoinPool
ExecutorService
구현체ForkJoinPool
의 두 가지 특성
ForkJoinTask
클래스가 지원
ForkJoinPool
에 있는 commonPool()
정적 메서드로 전체 시스템 풀의 레퍼런스를 반환할 수 있다.
Runtime.getRuntime().availableProcessors() - 1
로 정해지지만 항상 기대한 값을 반환하지는 않는다.parallelStream()
때문에 포크/조인 활용 범위는 크게 확대되었다.Collections
인터페이스의 stream()
메서드로 스트림을 생성할 수 있다.
ReferencePipeline
을 생성parallelStream()
을 사용하면 병렬로 데이터를 작업 후 결과를 재조립할 수 있다.
Spliterator
를 써서 작업을 분할하고 공용 포크/조인 풀에서 연산을 수행락-프리 기법은 CPU 코어를 계속 스피닝하여 컨텍스트 교환 없이 해당 코어에서 작업할 준비를 한다는 뜻이다.
private volatile int proceedValue;
while (i != proceedValue) {
// 작업량이 많은 루프
}
ForkJoinTask
처럼 태스크를 스레드 하나보다 더 작게 나타내는 접근 방식이다.