-
[자바의 정석] Ch.13 쓰레드스터디플래너/공부하기 2022. 8. 13. 21:22
9. 쓰레드의 동기화
String name = "자바의 정석"이라는 자원을 두고 쓰레드 A와 쓰레드 B가 함께 작업을 한다고 가정해보자. 쓰레드 A는 2초 후 "자바의 정석"을 출력하고싶어하고 쓰레드 B는 name을 "스프링의 정석"으로 바꾼 뒤 이를 출력하고자 한다. 쓰레드 A가 먼저 작업한 다음 쓰레드 B가 작업하지만 쓰레드 A가 2초간 대기하는 사이 쓰레드 B가 끼어들어서 스프링의 정석으로 바꿨기 때문에 쓰레드 A와 B 모두 스프링의 정석을 출력한다. 의도한 대로 작업하려면 어떻게 해야할까?
String name="자바의 정석"을 임계 영역(critical section)으로 정하고 이 데이터가 갖고있는 lock을 획득한 쓰레드만 이 영역의 코드를 수행할 수 있게 한다. lock을 가진 쓰레드가 작업이 끝나면 임계 영역에서 벗어나 lock을 반납해 다른 쓰레드가 작업하게 한다. 이 개념을 위 예시를 대입해보자.
String name = "자바의 정석"은 임계 영역이다. 이 데이터는 lock을 갖고있다.
먼저 수행하는 쓰레드 A가 임계 영역에 접근하여 lock을 얻었다. 쓰레드 A는 의도한대로 2초간 대기하고 자바의 정석을 출력한다.
출력이 끝나고 쓰레드 A는 lock을 반납했다.
쓰레드 B는 쓰레드 A다음으로 임계 영역에 접근하여 name을 스프링의 정석으로 바꾸고 이를 출력했다.
이처럼 한 쓰레드가 진행중인 작업을 다른 쓰레드가 간섭하지 못하도록 막는 것을 쓰레드의 동기화(synchronization)라고 한다.
9.1 synchronized를 이용한 동기화
synchronized는 임계 영역을 지정하는데 사용한다. 이 때 메소드 앞에 synchronized를 붙여 메소드 전체를 임계 영역으로 지정할 수 있고 특정 영역만 synchronized 코드 블럭 안에 넣어 그 영역만 임계 영역으로 지정할 수 있다. lock의 획득과 반납은 자동적으로 이루어진다. 하지만 메소드 전체를 synchronized하게 되면 메소드가 호출된 시점에 lock을 얻고 종료되는 시점에 lock을 반납한다. 임계 영역은 멀티스레드 프로그램의 성능을 좌우하기 때문에 메소드 전체에 lock을 거는 것보다 synchronized 블럭으로 영역을 최소화하는 것이 좋다.
이 때 임계 영역에서 접근하는 데이터는 private 접근 제한자를 사용해야 한다. private가 아니면 외부에서 접근할 수 있기 때문에 synchronized를 이용하여 동기화해도 외부에서 접근하는 걸 막을 수 없기 때문이다.
9.2 wait()와 notify()
효율적인 멀티 쓰레드 프로그램을 위해서 특정 쓰레드가 lock을 가진 상태로 오랜 시간 보내지 않아야 한다. 이 때 wait()를 호출하면 실행중인 쓰레드를 다시 대기시키고, notify()가 호출되면 기다리고 있던 쓰레드 중 임의의 쓰레드가 통지받는다. notifyAll()은 대기중인 모든 쓰레드에게 통보하지만 lock을 얻을 수 있는 것은 하나의 쓰레드이고 나머지 쓰레드는 다시 lock을 기다린다.
더보기import java.util.ArrayList; class ThreadWait { public static void main(String[] args) throws InterruptedException { Table table = new Table(); new Thread(new Cook(table), "COOK1").start(); new Thread(new Customer(table, "donut"), "CUST1").start(); new Thread(new Customer(table, "burger"), "CUST2").start(); Thread.sleep(100); System.exit(0); } } class Customer implements Runnable{ private Table table; private String food; Customer(Table table, String food){ this.table = table; this.food = food; } @Override public void run() { while (true){ try { Thread.sleep(10);} catch (InterruptedException e){ } String name = Thread.currentThread().getName(); if(eatFood()) System.out.println(name + " ate a " + food); else System.out.println(name + " failed to eat. :( "); } } boolean eatFood() { return table.remove(food); } } class Cook implements Runnable{ private Table table; Cook(Table table) { this.table = table; } @Override public void run() { while (true){ int idx = (int)(Math.random() * table.dishNum()); table.add(table.dishNames[idx]); try { Thread.sleep(1);} catch (InterruptedException e) { } } } } class Table{ String[] dishNames = { "donut", "donut", "burger" }; final int MAX_FOOD = 6; private ArrayList<String> dishes = new ArrayList<>(); public void add(String dish){ if(dishes.size() >= MAX_FOOD) return; dishes.add(dish); System.out.println("Dishes: " + dishes.toString()); } public boolean remove(String dishName){ for(int i=0; i<dishes.size(); i++) if(dishName.equals(dishes.get(i))){ dishes.remove(i); return true; } return false; } public int dishNum() { return dishNames.length; } }
코드를 실행하면 두 가지 예외가 발생한다. 첫번째는 요리사가 음식을 테이블에 놓고 있는데 손님이 음식을 가져가려고 해서 발생하는 예외(ConcurrentModificationException)이고 두번째는 손님이 마지막 음식을 가져가는데 다른 손님이 음식을 가져가서 없는 음식을 가져갈 때 발생하는 예외(IndexOutOfBoundsException)이다. synchronized를 이용하여 동기화로 예외가 발생하지 않게 할 수 있다.
더보기import java.util.ArrayList; class Customer implements Runnable{ private Table table; private String food; Customer(Table table, String food){ this.table = table; this.food = food; } @Override public void run() { while(true) { try { Thread.sleep(10); } catch (InterruptedException e) { } String name = Thread.currentThread().getName(); if(eatFood()) System.out.println( name + " ate a " + food); else System.out.println(name + " failed to eat. :( "); } } boolean eatFood() { return table.remove(food);} } class Cook implements Runnable{ private Table table; Cook(Table table){ this.table = table; } @Override public void run() { while(true){ int idx = (int)(Math.random() * table.dishNum()); table.add(table.dishNames[idx]); try{ Thread.sleep(100); } catch (InterruptedException e) { } } } } class Table{ String[] dishNames = { "donut", " donut", "burger" }; final int MAX_FOOD = 6; private ArrayList<String> dishes = new ArrayList<>(); public synchronized void add(String dish){ if(dishes.size() >= MAX_FOOD) return; dishes.add(dish); System.out.println("Dishes: " + dishes.toString()); } public boolean remove(String dishName){ synchronized (this){ while(dishes.size()==0){ String name = Thread.currentThread().getName(); System.out.println( name + " is waiting. "); try { Thread.sleep(500); } catch (InterruptedException e) { } } for(int i=0; i<dishes.size(); i++) if(dishName.equals(dishes.get(i))){ dishes.remove(i); return true; } } return false; } public int dishNum() { return dishNames.length; } } class ThreadWait{ public static void main(String[] args) throws Exception{ Table table = new Table(); new Thread(new Cook(table), "COOK1").start(); new Thread(new Customer(table, "donut"), "CUST1").start(); new Thread(new Customer(table, "burger"), "CUST2").start(); Thread.sleep(5000); System.exit(0); } }
요리사가 음식을 추가하고 손님이 음식을 가져가는 메소드를 동기화하였는데 손님들이 음식을 가져가지 못하고 있다. 이는 손님이 테이블의 lock을 갖고있어 요리사가 테이블에 음식을 추가하려해도 추가할 수 없다. 이럴 때 사용하는 것이 wait()과 notify()이다. 손님이 계속 lock을 갖고있는게 아니라 wait()로 lock을 풀었다가 음식이 추가되면 notify()로 알려줘서 다시 lock을 얻어 다음 작업을 진행하는 것이다.
더보기import java.util.ArrayList; class Customer implements Runnable{ private Table table; private String food; Customer(Table table, String food){ this.table = table; this.food = food; } @Override public void run() { while(true) { try { Thread.sleep(10); } catch (InterruptedException e) { } String name = Thread.currentThread().getName(); table.remove(food); System.out.println(name + " ate a " + food); } } } class Cook implements Runnable{ private Table table; Cook(Table table){ this.table = table; } @Override public void run() { while(true){ int idx = (int)(Math.random() * table.dishNum()); table.add(table.dishNames[idx]); try{ Thread.sleep(100); } catch (InterruptedException e) { } } } } class Table{ String[] dishNames = { "donut", " donut", "burger" }; final int MAX_FOOD = 6; private ArrayList<String> dishes = new ArrayList<>(); public synchronized void add(String dish){ while (dishes.size() >= MAX_FOOD){ String name = Thread.currentThread().getName(); System.out.println( name + " is waiting. "); try { wait(); Thread.sleep(500); }catch (InterruptedException e) { } } dishes.add(dish); notify(); System.out.println("Dishes: " + dishes.toString()); } public void remove(String dishName){ synchronized (this){ String name = Thread.currentThread().getName(); while (dishes.size() == 0){ System.out.println( name + " is waiting. "); try { wait(); Thread.sleep(500); }catch (InterruptedException e ) { } } while (true){ for(int i=0; i<dishes.size(); i++){ if(dishName.equals(dishes.get(i))){ dishes.remove(i); notify(); return; } } try { System.out.println(name + " is waiting. "); wait(); Thread.sleep(500); }catch (InterruptedException e) { } } } } public int dishNum() { return dishNames.length; } } class ThreadWait{ public static void main(String[] args) throws Exception{ Table table = new Table(); new Thread(new Cook(table), "COOK1").start(); new Thread(new Customer(table, "donut"), "CUST1").start(); new Thread(new Customer(table, "burger"), "CUST2").start(); Thread.sleep(2000); System.exit(0); } }
이제 잘 기다렸다가 손님별로 원하는 음식을 잘 챙겨간다. 하지만 여기서도 문제가 있다. 손님과 요리사가 테이블 waiting pool에서 함께 기다린다는 것이다. 테이블에 음식이 없어서 notify()를 호출한다면 요리사가 통지를 받아야하지만 손님이 통지를 받는다면 테이블에 여전히 음식이 없으므로 손님은 다시 waiting pool로 돌아가야 한다.
기아 현상과 경쟁 상태
만약 뒤로 넘어져도 코가 깨질 정도로 운이 나쁘다면 요리사는 계속 통지받지 못하고 손님만 통지를 받을 수 있다. 그러면 요리사는 계속 기다리기만해야하는데 이를 기아현상(starvation)이라 한다. 이 현상을 막으려면 notify()대신 notifyAll()을 사용해야 한다. notifyAll()을 호출하면 기아현상은 막을 수 있지만 테이블이 비어서 요리사를 호출해야할 때 손님 쓰레드도 통지받아 요리사와 lock을 얻기위해 경쟁해야 한다. 이렇게 여러 쓰레드가 lock을 얻기 위해 경쟁하는 것을 경쟁 상태(race condition)이라 한다. Lock과 Condition을 이용하면 wait()와 notify()로 불가능한 선별적인 통지를 할 수 있다.
9.3 Lock과 Condition을 이용한 동기화
JDK1.5 이후 'java.util.concurrent.locks'패키지가 제공하는 lock클래스를 이용하여 동기화할 수 있다. lock클래스는 세 가지가 있다.
(1) ReentrantLock : 재진입이 가능한 lock. 가장 일반적인 배타 lock.
(2) ReentrantRedWriteLock : 읽기 lock이 걸려있을 때는 다른 쓰레드가 읽기를 수행할 수 있고 쓰기에만 배타적인 lock. 읽기 lock이 걸린 상태에서 쓰기 lock을 걸 수 없음
(3) StampedLock : 낙관적 읽기 lock이 추가 된 것으로 lock을 걸거나 해지할 때 스탬프(long타입의 정수값)을 사용. 낙관적 읽기 lock은 쓰기 lock에 의해 바로 풀린다. 무조건 읽기 lock을 걸지 않고 쓰기와 읽기가 충돌할 때만 쓰기가 끝난 후에 읽기 lock을 거는 것이다.
long stamp = lock.tryOptimisticRead(); // 낙관적 읽기 lock을 건다. int curBalance = this.balance; //공유 데이터인 balance를 읽어온다. if(!lock.validate(stam)){ //쓰기 lock에 의해 낙관적 읽기 lock이 풀렸는지 확인 stamp = lock.readLock(); //lock이 풀렸으면 읽기 lock을 얻으려고 기다린다. try{ curBalance = this.balance; // 공유 데이터를 다시 읽어온다. } finally { lock.unlockRead(stamp); //읽기 lock을 푼다. } } return curBalance; //낙관적 읽기 lock이 풀리지 않았으면 곧바로 읽어온 값을 반환 }
ReentrantLock의 생성자
ReentrantLock()
ReentrantLock(boolean fair)ReentrantLock은 두 개의 생성자를 갖고있다. 두번째 생성자에 true값을 주면 가장 오래 기다린 쓰레드를 먼저 lock을 획득한다. 하지만 어떤 쓰레드가 가장 오래 기다렸는지 확인하는 과정을 거치면 성능이 떨어지기 때문에 잘 사용하지 않는다.
void lock() lock을 잠근다.
void unlock() lock을 해지한다.
boolean isLocked() lock이 잠겼는지 확인한다.ReentrantLock과 같은 lock 클래스는 수동으로 lock을 잠그고 해제해야 한다. unlockdms try - finally문으로 감싸는 것이 일반적이다.
boolean tryLock()
boolean tryLock(long timeout, TimeUnit unit) throws InterruptedExceptiontryLock()은 lock()과 다르게 lock이 걸려 있으면 lock을 얻으려고 기다리지 않거나 지정된 시간만큼만 기다린다. lock을 얻으면 true를 반환하고 얻지 못하면 false를 반환한다. lock()은 lock을 얻을 때까지 쓰레드를 블락시키므로 응답성이 나빠질 수 있다. 따라서 응답성이 중요한 경우 tryLock()을 이용하여 지정된 시간 동안 lock을 얻지 못하면 다시 시도할 것인지 포기할 것인지 사용자가 결정할 수 있게 하는 것이 좋다.
InterruptedException을 발생시킬 수 있는데 지정된 시간동안 lock을 얻으려고 기다릴 때 interrupt에 의해 작업을 취소할 수 있도록 코드를 작성할 수 있다.
ReentrantLock과 Condition
table의 waiting pool에 손님 쓰레드와 요리사 쓰레드를 함께 넣는 대신 손님 쓰레드르 위한 Condition과 요리사 쓰레드를 위한 Condition을 만들어 각각의 waiting pool에 기다리게 할 수 있다.
Object Condition void wait() void await()
void awaitUninterruptibly()void wait(long timeout) boolean await(long time, TimeUnit unit)
long awaitNanos(long nanosTImeout)
boolean awaitUnit(Date deadline)void notify() void signal() void notifyAll() void signalAll() ReenterantLock을 이용하여 기아현상과 경쟁상태를 개선하였다.
더보기import java.util.ArrayList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; class Customer implements Runnable{ private Table table; private String food; Customer(Table table, String food){ this.table = table; this.food = food; } @Override public void run() { while(true) { try { Thread.sleep(10); } catch (InterruptedException e) { } String name = Thread.currentThread().getName(); table.remove(food); System.out.println(name + " ate a " + food); } } } class Cook implements Runnable{ private Table table; Cook(Table table){ this.table = table; } @Override public void run() { while(true){ int idx = (int)(Math.random() * table.dishNum()); table.add(table.dishNames[idx]); try{ Thread.sleep(100); } catch (InterruptedException e) { } } } } class Table{ String[] dishNames = { "donut", " donut", "burger" }; final int MAX_FOOD = 6; private ArrayList<String> dishes = new ArrayList<>(); private ReentrantLock lock = new ReentrantLock(); private Condition forCook = lock.newCondition(); private Condition forCust = lock.newCondition(); public void add(String dish){ lock.lock(); try{ while (dishes.size() >= MAX_FOOD){ String name = Thread.currentThread().getName(); System.out.println( name + " is waiting. "); try { forCook.await(); Thread.sleep(500); }catch (InterruptedException e) { } } dishes.add(dish); forCust.signal(); System.out.println("Dishes: " + dishes.toString()); }finally { lock.unlock(); } } public void remove(String dishName){ lock.lock(); String name = Thread.currentThread().getName(); try { while (dishes.size() == 0) { System.out.println(name + " is waiting. "); try { forCust.await(); Thread.sleep(500); } catch (InterruptedException e) { } } while (true) { for (int i = 0; i < dishes.size(); i++) { if (dishName.equals(dishes.get(i))) { dishes.remove(i); forCook.signal(); return; } } try { System.out.println(name + " is waiting. "); forCust.await(); Thread.sleep(500); } catch (InterruptedException e) { } } } finally { lock.unlock(); } } public int dishNum() { return dishNames.length; } } class ThreadWait{ public static void main(String[] args) throws Exception{ Table table = new Table(); new Thread(new Cook(table), "COOK1").start(); new Thread(new Customer(table, "donut"), "CUST1").start(); new Thread(new Customer(table, "burger"), "CUST2").start(); Thread.sleep(2000); System.exit(0); } }
9.4 volatile
멀티 코어 프로세서에서는 코어마다 별도의 캐시를 갖고있다. 코어는 메모리에서 읽어온 값을 캐시에 저장하고 캐시에서 값을 읽어서 작업한다. 다시 같은 값을 읽어올 때는 먼저 캐시에 있는지 확인하고 없을 때만 메모리에서 읽어온다. 메모리에 저장된 변수의 값이 변경되었는데도 캐시에 저장된 값이 갱신되지 않아서 메모리에 저장된 값이 다른 경우가 발생한다. 그래서 변수 stopped의 값이 바뀌었는데도 쓰레드가 멈추지 않고 계속 실행되는 것이다.
변수 stopped와 suspended 앞에 volatile을 붙이면 코어가 변수의 값을 읽어올 때 캐시가 아니라 메모리에서 읽어오기 때문에 캐시와 메모리간의 값의 불일치가 해결된다. volatile 대신 synchronized블럭을 사용해도 같은 효과를 얻을 수 있다.
더보기class ThreadEx9{ public static void main(String[] args) { RunImplEx9 r1 = new RunImplEx9(); RunImplEx9 r2 = new RunImplEx9(); RunImplEx9 r3 = new RunImplEx9(); Thread t1 = new Thread(r1, "*"); Thread t2 = new Thread(r2, "**"); Thread t3 = new Thread(r3, "***"); t1.start(); t2.start(); t3.start(); try { Thread.sleep(2000); r1.suspend(); Thread.sleep(2000); r2.suspend(); Thread.sleep(2000); r1.resume(); Thread.sleep(3000); r1.stop(); r2.stop(); Thread.sleep(2000); r3.stop(); }catch (InterruptedException e){} } } class RunImplEx9 implements Runnable{ volatile boolean suspended = false; volatile boolean stopped = false; @Override public void run() { while (!stopped){ if(!suspended){ System.out.println(Thread.currentThread().getName()); try { Thread.sleep(1000); }catch (InterruptedException e) { } } } System.out.println(Thread.currentThread().getName() + " - stopped"); } public void suspend() { suspended = true; } public void resume() { suspended = false; } public void stop() { stopped = true; } }
volatile로 long과 double을 원자화
JVM은 데이터를 4byte(=32bit)단위로 처리하기 때문에 int와 int보다 작은 타입은 한번에 쓰고 읽을 수 있다. 하지만 8byte의 long과 double타입은 하나의 명령어로 값을 읽거나 쓸 수 없어 다른 쓰레드가 끼어들 수 있따. 이 때 synchronized블럭으로 감쌀 수 있지만 변수를 선언할 때 volatile을 붙이는 것이 간단하다.
volatile은 해당 변수에 대한 읽기나 쓰기가 원자화된다. 원자화란 작업을 나눌 수 없는 최소 단위가 되도록 하는 것인데 synchronized블럭도 여러 문장을 원자화함으로써 쓰레드의 동기화를 구현한 것이라 할 수 있다. volatile은 변수의 읽기와 쓰기를 원자화할 뿐 동기화하는 것은 아니다.
9.5 fork & join 프레임웍
JDK1.7부터 fork & join 프레임웍이 추가되었다. 이 프레임웍은 하나의 작업을 작은 단위로 나눠서 여러 쓰레드가 도잇에 처리하는 것을 쉽게 만들어 준다.
RecursiveAction 반환값이 없는 작업을 구현할 때 사용
RecursiveTask 반환값이 있는 작업을 구현할 때 사용두 클래스 모두 compute()라는 추상 메서드를 갖고있어 상속을 통해 이를 구현하면 된다. 구현할 때는 쓰레드풀과 수행할 작업을 생성하고 invoke()로 작업을 시작한다.
compute()의 구현
compute()의 구조는 일반적인 재귀호출 메서드와 동일하다. 작업의 범위를 어떻게 나눌 것인지만 정해주면 된다.
다른 쓰레드의 작업 훔쳐오기
fork()가 호출되어 작업 큐에 추가된 작업도 compute()로 더이상 나눌 수 없을 때까지 반복해서 나누고, 자신의 작업큐가 비어있는 쓰레드는 다른 쓰레드의 작업 큐에서 작업을 가져와서 수행한다. 이를 작업 훔쳐오기(work stealing)이라 한다.
fork()와 join()
fork()는 작업을 쓰레드 풀의 작업큐에 넣는 비동기 메소드이다. join()은 해당 작업의 수행이 끝날 때까지 기다렸다가 끝나면 그 결과를 반환하는 동기 메소드이다.
더보기import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class ForkJoin { static final ForkJoinPool pool = new ForkJoinPool(); public static void main(String[] args) { long from = 1L, to = 100_000_000L; SumTask task = new SumTask(from, to); long start = System.currentTimeMillis(); Long result = pool.invoke(task); System.out.println("Elapse time(4core): " + (System.currentTimeMillis() - start)); System.out.printf("sum of %d ~ %d = %d%n", from, to, result); System.out.println(); result = 0L; start = System.currentTimeMillis(); for(long i=from; i<=to; i++) result += i; System.out.println("Elapse time(1core): " + (System.currentTimeMillis() - start)); System.out.printf("sum of %d ~ %d = %d%n", from, to, result); } } class SumTask extends RecursiveTask<Long>{ long from, to; SumTask(long from, long to){ this.from = from; this.to = to; } @Override protected Long compute() { long size = to - from + 1; if(size <= 5) return sum(); long half = (from+to)/2; SumTask leftSum = new SumTask(from, half); SumTask rightSum = new SumTask(half+1, to); leftSum.fork(); return rightSum.compute() + leftSum.join(); } long sum(){ long tmp = 0L; for(long i=from; i<=to; i++) tmp += i; return tmp; } }
항상 멀티쓰레드로 처리하는 것이 빠르다고 생각해서는 안 된다. 멀티 쓰레드로 처리한 결과값과 그렇지 않은 것의 결과값을 비교한 후 멀티쓰레드로 처리하는 것이 이득이 있을 때만 멀티쓰레드로 처리해야 한다.
'스터디플래너 > 공부하기' 카테고리의 다른 글
[자바의 정석] Ch.14 람다와 스트림 (0) 2022.09.03 [자바의 정석] Ch.14 람다와 스트림 (0) 2022.08.28 [자바의 정석] Ch.13 쓰레드 (0) 2022.07.24 [자바의 정석] Ch.13 쓰레드 (0) 2022.07.17 [자바의 정석] Ch.12 지네릭스, 열거형, 애너테이션 (0) 2022.07.02