二つのスレッドで同期キューを用いて情報をやりとりするときに利用するアーキテクチャ・パターンに, Producer-Consumer Pattern というものがある.
Publisher-Subscriber パターン, 生産者-消費者パターンともいう.
これを Java で実装する場合は, BlockingQueue インターフェースを利用できる.
syncronised/wait/notify を利用してもできるが, BlockingQueue を利用したほうが楽.
以下, coursera の POSA での Assignment を改造してみて, 簡易版の Producer-consumer Pattern を書いてみた.
source
import java.util.concurrent.*;
public class SynchronizedQueue {
// Keep track of the number of times the producer test iterates.
static volatile int mProducerCounter = 0;
// Keep track of the number of times the consumer test iterates.
static volatile int mConsumerCounter = 0;
// Maximum timeout.
static final int TIMEOUT_SECONDS = 5;
// Error value for a timeout.
static final int TIMEOUT_OCCURRED = -1;
// Error value for a failure.
static final int FAILURE_OCCURRED = -2;
public static class QueueAdapter<E> {
private BlockingQueue<E> mQueue;
public QueueAdapter (BlockingQueue<E> queue) {
mQueue = queue;
}
/**
* Insert msg at the tail of the queue.
*/
public void put (E msg) throws InterruptedException, TimeoutException {
// Keep track of how many times we're called.
mProducerCounter++;
boolean timeoutValue = mQueue.offer (msg,
TIMEOUT_SECONDS,
TimeUnit.SECONDS);
if (timeoutValue == false)
throw new TimeoutException ();
}
/**
* Remove msg from the head of the queue.
*/
public E take () throws InterruptedException, TimeoutException {
// Keep track of how many times we're called.
mConsumerCounter++;
E rValue = mQueue.poll (TIMEOUT_SECONDS,
TimeUnit.SECONDS);
if (rValue == null)
throw new TimeoutException ();
return rValue;
}
}
private static QueueAdapter<Integer> mQueue = null;
static Runnable producerRunnable = new Runnable () {
public void run () {
for (int i = 0; i < mMaxIterations; i++)
try {
mQueue.put (i);
if (Thread.interrupted ())
throw new InterruptedException ();
} catch (InterruptedException e) {
System.out.println ("Thread properly interrupted by "
+ e.toString () + " in producerRunnable");
// This isn't an error - it just means that
// we've been interrupted by the main Thread.
return;
} catch (TimeoutException e) {
System.out.println ("Exception " + e.toString ()
+ " occurred in producerRunnable");
// Indicate a timeout.
mProducerCounter = TIMEOUT_OCCURRED;
return;
} catch (Exception e) {
System.out.println ("Exception " + e.toString ()
+ " occurred in producerRunnable");
// Indicate a failure.
mProducerCounter = FAILURE_OCCURRED;
return;
}
}
};
static Runnable consumerRunnable = new Runnable () {
public void run () {
for (int i = 0; i < mMaxIterations; i++)
try {
if (Thread.interrupted ()) {
throw new InterruptedException ();
}
Integer result = (Integer) mQueue.take ();
System.out.println ("iteration = " + result);
} catch (InterruptedException e) {
System.out.println ("Thread properly interrupted by "
+ e.toString () + " in consumerRunnable");
// This isn't an error - it just means that
// we've been interrupted by the main Thread.
return;
} catch (TimeoutException e) {
System.out.println ("Exception " + e.toString ()
+ " occurred in consumerRunnable");
// Indicate a timeout.
mConsumerCounter = TIMEOUT_OCCURRED;
return;
} catch (Exception e) {
System.out.println ("Exception " + e.toString ()
+ " occurred in consumerRunnable");
// Indicate a failure.
mConsumerCounter = FAILURE_OCCURRED;
return;
}
}
};
// Number of iterations to test
public static int mMaxIterations = 1000000;
public static int mMaxQueueSize = (mMaxIterations / 10);
public static void main (String[] args) {
try {
mQueue = new QueueAdapter<Integer>(new ArrayBlockingQueue<Integer>(mMaxQueueSize));
// mQueue = new QueueAdapter<Integer>(new LinkedBlockingQueue ());
// create threads
Thread consumer = new Thread (consumerRunnable);
Thread producer = new Thread (producerRunnable);
// start the threads.
consumer.start ();
producer.start ();
// Give the Threads a chance to run before interrupting them.
Thread.sleep (100);
// interrupt the threads.
consumer.interrupt ();
producer.interrupt ();
// wait for the threads to exit.
consumer.join ();
producer.join ();
} catch (Exception e) {
}
}
}
Special Thanks
-
BlockingQueue を利用している例:
-
wait/notify を利用している例: