정리

Monitor란 프로그래밍 언어 자체에서 제공하는 공유 자원에 대한 동기화 메커니즘입니다.

공유 자원에 접근하는 Procedure들이 존재하는데 이 Procedue는 한번에 하나만 실행되도록 조절됩니다.

Monitor 안에는 Condition variable이 존재. 그래서 조작 가능

예시는 아래와 같음 insert라는 procedure를 보면 한번에 하나만 접근되도록 막아놓았음. 만약 들어갔는데 막혀있으면 await로 대기열에 추가됨. await()와 signal()을 통해서 대기열에 추가되고 빠져나오고 함.

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerMonitor {
    private Queue<Integer> buffer;
    private int maxSize;
    private Lock lock;
    private Condition notFull;
    private Condition notEmpty;

    public ProducerConsumerMonitor(int size) {
        this.buffer = new LinkedList<>();
        this.maxSize = size;
        this.lock = new ReentrantLock();
        this.notFull = lock.newCondition();
        this.notEmpty = lock.newCondition();
    }

    public void insert(int item) throws InterruptedException {
        lock.lock();
        try {
            while (buffer.size() == maxSize) {
                notFull.await(); // 버퍼가 가득 찼으면 대기
            }
            buffer.add(item);
            System.out.println("Produced: " + item);
            notEmpty.signal(); // 아이템을 추가했으므로 소비자에게 알림
        } finally {
            lock.unlock();
        }
    }

    public int remove() throws InterruptedException {
        lock.lock();
        try {
            while (buffer.isEmpty()) {
                notEmpty.await(); // 버퍼가 비었으면 대기
            }
            int item = buffer.poll();
            System.out.println("Consumed: " + item);
            notFull.signal(); // 아이템을 제거했으므로 생산자에게 알림
            return item;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ProducerConsumerMonitor monitor = new ProducerConsumerMonitor(5);

        // 생산자 스레드
        Thread producer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    monitor.insert(i);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // 소비자 스레드
        Thread consumer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    monitor.remove();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        producer.start();
        consumer.start();
    }
}

그렇다면 await()와 signal()은 내부적으로 어떻게 구현되어있을까요

procedure F() // 처음 시작
{
	wait(mutex); // binary mutex니까 하나 가져감
	...
	Body of F
	...

	if (next_count) // 
		signal(next);
	else
		signal(mutex);
}

procedure cond_wait(x)
{
	x.count++; // 기다리고 있다
	if (next_count) // 
		signal(next);
	else
		signal(mutex);
	wait(x.sem);
	x.count--;
}

procedure cond_signal(x)
{
	if (x.count) // 기다리고있는 스레드가 있다
	{
		next_count++; // 준비됐다
		signal(x.sem);
		wait(next);
		next_count--;
	}
}