본문 바로가기

PATTERN

[PATTERN] Producer - Consumer

 

 

 여러 개의 producer는 각각 동일한 작업을 해서 queue에 집어넣고, consumer에서는 queue에 있는 작업을 가져가서 각자 동일한 작업으로 수행합니다.

 

 여기서 여러 개의 producer와 consumer들이 각각 동일한 작업을 수행하는 여러개의 쓰레드 혹은 프로세스로 구성이 되므로 병렬처리가 되는 것입니다.

 

 실무에서 사용하는 예시로 에러기록을 쌓는 경우 사용됩니다. 만약 어플리케이션 서버에 수 많은 이용자가 에러를 발생시킨다면 이 에러를 처리하는 서버는 과부가하 걸리고 다른 요청을 처리 못하게  됩니다.

 

 이때 프로바이더 컨슈머 패턴을 사용하면 어플리케이션 서버에서 오류가 발생하면 오류를 담아두는 객체를 생성하고 해당내용을 큐에 넣어둡니다. 큐는 어플리케이션 서버에서 주기적으로 읽어 오류가 있다면 해당내용을 DB에 저장합니다.

 

이때 CONSUMER는 에러를 DB에 담는 역할을 하고 PROVIDER는 에러상세를 담고있는 객체를 생성합니다.

 

package test.providerconsumer;

import java.util.Queue;

public class TestMain {

	public static void main(String[] args) {
		
		ErrorProvider errorProvider = new ErrorProvider();
		ErrorConsumer errorConsumer = new ErrorConsumer();
		
		QueueManager qManager = new QueueManager();
		qManager.setConsumer(errorConsumer);
		qManager.makeQueue();

		
		Queue q = qManager.getQueue();
		
		try {
			throw new Exception();
		} catch(Exception e) {
			errorProvider.setError("errcd1", "에러발생1");
		}
		
		try {
			throw new Exception();
		} catch(Exception e) {
			errorProvider.setError("errcd2", "에러발생2");
		}
	}
}

>> OUTPUT

Error Data Insert error : errCd : errcd1 errMsg : 에러발생1
Error Data Insert error : errCd : errcd2 errMsg : 에러발생2
Error Data Insert error : null
Error Data Insert error : null
Error Data Insert error : null

3초 간격으로 큐를 확인해 ERROR를 CONSUMER에서 DB저장

 

 

 

 Producer-Consumer 패턴은 멀티스레드 환경에서 생산자(Producer)와 소비자(Consumer) 간의 작업을 조율하는 방법을 제공합니다. 생산자는 데이터를 생성하고 공유 자원에 저장하며, 소비자는 데이터를 가져와서 처리합니다. 이러한 패턴은 데이터의 생산과 소비를 비동기적으로 처리하고, 생산자와 소비자 간의 결합도를 낮춰 독립적인 개발과 확장이 가능하도록 합니다.

Java에서 Producer-Consumer 패턴을 구현하는 방법은 다양합니다. 가장 일반적인 방법은 공유 자원을 나타내는 큐(Queue)를 사용하는 것입니다. 여기서는 Java의 java.util.concurrent 패키지에서 제공되는 BlockingQueue 인터페이스를 사용하는 방법을 살펴보겠습니다.

 

 

 


1. BlockingQueue 인터페이스:

  • BlockingQueue는 멀티스레드 환경에서 안전하게 데이터를 생산하고 소비할 수 있는 큐를 나타내는 인터페이스입니다.
  • put() 메서드를 사용하여 데이터를 추가하고, take() 메서드를 사용하여 데이터를 가져옵니다.
  • put() 메서드는 큐에 공간이 생길 때까지 데이터를 추가하는 데 사용됩니다. 큐가 가득 차 있는 경우에는 블로킹됩니다.
  • take() 메서드는 큐에서 데이터를 가져올 때까지 블로킹됩니다. 큐가 비어있는 경우에는 대기합니다.

 

 

2. Producer 구현:

  • 생산자는 데이터를 생성하고 큐에 추가하는 역할을 합니다.
  • put() 메서드를 사용하여 데이터를 큐에 추가합니다.
import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {
    private BlockingQueue<Integer> queue;
    
    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
    
    @Override
    public void run() {
        try {
            // 데이터를 생성하여 큐에 추가
            while (true) {
                int data = generateData();
                queue.put(data);
                System.out.println("Produced: " + data);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private int generateData() {
        // 데이터 생성 로직
        // 생략
    }
}

 

3. Consumer 구현:

 

  • 소비자는 큐에서 데이터를 가져와서 처리하는 역할을 합니다.
  • take() 메서드를 사용하여 큐에서 데이터를 가져옵니다.
import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    private BlockingQueue<Integer> queue;
    
    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
    
    @Override
    public void run() {
        try {
            // 큐에서 데이터를 가져와서 처리
            while (true) {
                int data = queue.take();
                processData(data);
                System.out.println("Consumed: " + data);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private void processData(int data) {
        // 데이터 처리 로직
        // 생략
    }
}

 

 

4. 메인 코드:

  • 생산자와 소비자 스레드를 생성하고 시작합니다.
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {
    public static void main(String[] args) {
        // 공유 큐 생성
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
        
        // 생산자와 소비자 스레드 생성
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        
        // 생산자와 소비자 스레드 시작
        Thread producerThread = new Thread(producer);
        Thread consumerThread = new Thread(consumer);
        
        producerThread.start();
        consumerThread.start();
    }
}

 

 

 

 

참조

https://seokhyun2.tistory.com/53

'PATTERN' 카테고리의 다른 글

[PATTERN] Proxy Pattern  (0) 2021.09.24
[PATTERN]Adapter Pattern  (0) 2021.07.14
[PATTERN] DelegationPattern  (0) 2021.07.13