• このエントリーをはてなブックマークに追加

二つのスレッドで同期キューを用いて情報をやりとりするときに利用する アーキテクチャ・パターンに, Producer-Consumer Pattern というものがある.

Publisher-Subscriber パターン, 生産者-消費者パターンともいう.

これを Java で実装する場合は, BlockingQueue インターフェースを利用できる.

syncronised/wait/notify を利用してもできるが, BlockingQueue を利用したほうが楽.

以下, coursera の POSA での Assignment を改造してみて, 簡易版の Producer-consumer Pattern を書いてみた.

スポンサードリンク

source

import java.util.concurrent.*;

public class SynchronizedQueue {
	// Keep track of the number of times the producer test iterates.
	static volatile int mProducerCounter = 0;

	// Keep track of the number of times the consumer test iterates.
	static volatile int mConsumerCounter = 0;

	// Maximum timeout.
	static final int TIMEOUT_SECONDS = 5;

	// Error value for a timeout.
	static final int TIMEOUT_OCCURRED = -1;

	// Error value for a failure.
	static final int FAILURE_OCCURRED = -2;

	public static class QueueAdapter<E> {
		private BlockingQueue<E> mQueue;

		public QueueAdapter (BlockingQueue<E> queue) {
			mQueue = queue;
		}

		/**
		 * Insert msg at the tail of the queue.
		 */
		public void put (E msg) throws InterruptedException, TimeoutException {
			// Keep track of how many times we're called.
			mProducerCounter++;
			boolean timeoutValue = mQueue.offer (msg,
																					TIMEOUT_SECONDS,
																					TimeUnit.SECONDS);
			if (timeoutValue == false)
				throw new TimeoutException ();
		}

		/**
		 * Remove msg from the head of the queue.
		 */
		public E take () throws InterruptedException, TimeoutException {
			// Keep track of how many times we're called.
			mConsumerCounter++;
			E rValue = mQueue.poll (TIMEOUT_SECONDS,
														 TimeUnit.SECONDS);

			if (rValue == null)
				throw new TimeoutException ();

			return rValue;
		}
	}

	private static QueueAdapter<Integer> mQueue = null;

	static Runnable producerRunnable = new Runnable () {
			public void run () {
				for (int i = 0; i < mMaxIterations; i++)
					try {
						mQueue.put (i);
						if (Thread.interrupted ())
							throw new InterruptedException ();
					} catch (InterruptedException e) {
						System.out.println ("Thread properly interrupted by "
															 + e.toString () + " in producerRunnable");
						// This isn't an error - it just means that
						// we've been interrupted by the main Thread.
						return;
					} catch (TimeoutException e) {
						System.out.println ("Exception " + e.toString ()
															 + " occurred in producerRunnable");
						// Indicate a timeout.
						mProducerCounter = TIMEOUT_OCCURRED;
						return;
					} catch (Exception e) {
						System.out.println ("Exception " + e.toString ()
															 + " occurred in producerRunnable");
						// Indicate a failure.
						mProducerCounter = FAILURE_OCCURRED;
						return;
					}
			}
		};

	static Runnable consumerRunnable = new Runnable () {
			public void run () {
				for (int i = 0; i < mMaxIterations; i++)
					try {
						if (Thread.interrupted ()) {
							throw new InterruptedException ();
						}
						Integer result = (Integer) mQueue.take ();

						System.out.println ("iteration = " + result);
					} catch (InterruptedException e) {
						System.out.println ("Thread properly interrupted by "
															 + e.toString () + " in consumerRunnable");
						// This isn't an error - it just means that
						// we've been interrupted by the main Thread.
						return;
					} catch (TimeoutException e) {
						System.out.println ("Exception " + e.toString ()
															 + " occurred in consumerRunnable");
						// Indicate a timeout.
						mConsumerCounter = TIMEOUT_OCCURRED;
						return;
					} catch (Exception e) {
						System.out.println ("Exception " + e.toString ()
															 + " occurred in consumerRunnable");
						// Indicate a failure.
						mConsumerCounter = FAILURE_OCCURRED;
						return;
					}
			}
		};

	// Number of iterations to test
	public static int mMaxIterations = 1000000;
	public static int mMaxQueueSize  = (mMaxIterations / 10);

	public static void main (String&#91;&#93; args) {
		try {
			mQueue = new QueueAdapter<Integer>(new ArrayBlockingQueue<Integer>(mMaxQueueSize));
//			mQueue = new QueueAdapter<Integer>(new LinkedBlockingQueue ());

			// create threads
			Thread consumer = new Thread (consumerRunnable);
			Thread producer = new Thread (producerRunnable);

      // start the threads. 
			consumer.start ();
			producer.start ();
            
			// Give the Threads a chance to run before interrupting them.
			Thread.sleep (100);

      // interrupt the threads.
			consumer.interrupt ();
			producer.interrupt ();

			// wait for the threads to exit.
			consumer.join ();
			producer.join ();
            
		} catch (Exception e) {
		}
	}
}