20 Oct 2014, 15:45

Java で Producer-Consumer Pattern を実装してみた

二つのスレッドで同期キューを用いて情報をやりとりするときに利用する アーキテクチャ・パターンに, Producer-Consumer Pattern というものがある.

Publisher-Subscriber パターン, 生産者-消費者パターンともいう.

これを Java で実装する場合は, BlockingQueue インターフェースを利用できる.

syncronised/wait/notify を利用してもできるが, BlockingQueue を利用したほうが楽.

以下, coursera の POSA での Assignment を改造してみて, 簡易版の Producer-consumer Pattern を書いてみた.

source

[sourcecode language=”java” title=”” ]
import java.util.concurrent.*;

public class SynchronizedQueue {
// Keep track of the number of times the producer test iterates.
static volatile int mProducerCounter = 0;

// Keep track of the number of times the consumer test iterates.
static volatile int mConsumerCounter = 0;

// Maximum timeout.
static final int TIMEOUT_SECONDS = 5;

// Error value for a timeout.
static final int TIMEOUT_OCCURRED = -1;

// Error value for a failure.
static final int FAILURE_OCCURRED = -2;

public static class QueueAdapter {
private BlockingQueue mQueue;

public QueueAdapter (BlockingQueue queue) {
mQueue = queue;
}

/**
* Insert msg at the tail of the queue.
*/
public void put (E msg) throws InterruptedException, TimeoutException {
// Keep track of how many times we’re called.
mProducerCounter++;
boolean timeoutValue = mQueue.offer (msg,
TIMEOUT_SECONDS,
TimeUnit.SECONDS);
if (timeoutValue == false)
throw new TimeoutException ();
}

/**
* Remove msg from the head of the queue.
*/
public E take () throws InterruptedException, TimeoutException {
// Keep track of how many times we’re called.
mConsumerCounter++;
E rValue = mQueue.poll (TIMEOUT_SECONDS,
TimeUnit.SECONDS);

if (rValue == null)
throw new TimeoutException ();

return rValue;
}
}

private static QueueAdapter mQueue = null;

static Runnable producerRunnable = new Runnable () {
public void run () {
for (int i = 0; i < mMaxIterations; i++) try { mQueue.put (i); if (Thread.interrupted ()) throw new InterruptedException (); } catch (InterruptedException e) { System.out.println ("Thread properly interrupted by " + e.toString () + " in producerRunnable"); // This isn't an error - it just means that // we've been interrupted by the main Thread. return; } catch (TimeoutException e) { System.out.println ("Exception " + e.toString () + " occurred in producerRunnable"); // Indicate a timeout. mProducerCounter = TIMEOUT_OCCURRED; return; } catch (Exception e) { System.out.println ("Exception " + e.toString () + " occurred in producerRunnable"); // Indicate a failure. mProducerCounter = FAILURE_OCCURRED; return; } } }; static Runnable consumerRunnable = new Runnable () { public void run () { for (int i = 0; i < mMaxIterations; i++) try { if (Thread.interrupted ()) { throw new InterruptedException (); } Integer result = (Integer) mQueue.take (); System.out.println ("iteration = " + result); } catch (InterruptedException e) { System.out.println ("Thread properly interrupted by " + e.toString () + " in consumerRunnable"); // This isn't an error - it just means that // we've been interrupted by the main Thread. return; } catch (TimeoutException e) { System.out.println ("Exception " + e.toString () + " occurred in consumerRunnable"); // Indicate a timeout. mConsumerCounter = TIMEOUT_OCCURRED; return; } catch (Exception e) { System.out.println ("Exception " + e.toString () + " occurred in consumerRunnable"); // Indicate a failure. mConsumerCounter = FAILURE_OCCURRED; return; } } }; // Number of iterations to test public static int mMaxIterations = 1000000; public static int mMaxQueueSize = (mMaxIterations / 10); public static void main (String[] args) { try { mQueue = new QueueAdapter(new ArrayBlockingQueue(mMaxQueueSize));
// mQueue = new QueueAdapter(new LinkedBlockingQueue ());

// create threads
Thread consumer = new Thread (consumerRunnable);
Thread producer = new Thread (producerRunnable);

// start the threads.
consumer.start ();
producer.start ();

// Give the Threads a chance to run before interrupting them.
Thread.sleep (100);

// interrupt the threads.
consumer.interrupt ();
producer.interrupt ();

// wait for the threads to exit.
consumer.join ();
producer.join ();

} catch (Exception e) {
}
}
}
[/sourcecode]