21 Oct 2014, 14:46

セマフォを利用して 2 つのスレッドに交互に処理をさせる方法 (Java)

Java の並列処理用のライブラリについて調べたまとめ. 主に, 先日受けた coursera の POSA の復習だったりします.

Semaphore

並列プログラミング環境での複数のプロセスが共有する資源に アクセスするのを制御する際の単純だが便利な抽象化を提供する変数または抽象データ型

2 種類に分けられる.

Counting Semaphores

カウンティングセマフォ. 任意個の資源を扱うセマフォ

Binary Semaphores

バイナリセマフォ. 値が 0 と 1 に制限されている (ロック/ アンロック, 使用可能/ 使用不可の意味がある) セマフォ.

ミューテックスともいう.

Java

ConditionObject

wait/notify によるスレッド間の通知では, 一つのスレッドで一ヶ所でしか wait できない.たとえば, 条件 A と条件 B の両方がそろうまでまつなど.

ConditionObject を利用すると, 複数箇所で wait することができる.

Condition は, Object 監視メソッド (wait, notify, および notifyAll) を別個のオブジェクトに分解し, それらに任意の Lock 実装の使用を組み合わせて, オブジェクトごとに複数の待機セットを保持する効果を付与します.

Java

CountdonwLatch

他のスレッドをある地点でまち合わせるための機構.

競馬のスタートバーのようなイメージ.

または, 旅行のツアーガイド. 集合時間が決められていてるので, それまでに旅行客は集合場所に集合する. 全員が集合したら, 次の移動場所へ移動する.

Java

PingPong

2 つのスレッドがセマフォを利用しつつ, 交互に処理をするプログラム.

セマフォ処理

import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;

public class SimpleSemaphore {
    private Lock lock;
    private Condition notEmpty;
    private volatile int count;

    public SimpleSemaphore (int permits, boolean fair) {
        count = permits;
        lock = new ReentrantLock (fair);
        notEmpty = lock.newCondition ();
    }

    public void acquire () throws InterruptedException {
        lock.lockInterruptibly ();
        try { 
            while (count == 0) 
                notEmpty.await ();
            count--;
        }finally{ lock.unlock (); }
    }

    public void acquireUninterruptibly () {
        lock.lock ();
        try { 
            while (count == 0) 
                notEmpty.awaitUninterruptibly ();
            count--;
        }finally{ lock.unlock (); }
    }

  public void release () {
        lock.lock ();
        try{ 
            count++;
            notEmpty.signal ();
        }
        finally{lock.unlock ();}
    }

    public int availablePermits () {
        return count;
    }
}

メイン処理

import java.util.concurrent.CountDownLatch;

public class PingPongRight {

    // イテレーション数
    public final static int mMaxIterations = 10;

    // 2 つのスレッドの待ちあわせ用
    public static CountDownLatch mLatch;

    public static class PlayPingPongThread extends Thread {

        private int mMaxLoopIterations = 0;
        String mStringToPrint;

        SimpleSemaphore mSemaphoreOne;
        SimpleSemaphore mSemaphoreTwo;

        public PlayPingPongThread (String stringToPrint,
                                                             SimpleSemaphore semaphoreOne, SimpleSemaphore semaphoreTwo,
                                                             int maxIterations) {
            mStringToPrint = stringToPrint;
            mSemaphoreOne = semaphoreOne;
            mSemaphoreTwo = semaphoreTwo;
            mMaxLoopIterations = maxIterations;
        }

        public void run () {

            for (int loopsDone = 1; loopsDone <= mMaxLoopIterations; ++loopsDone) {
                try {
                    // 処理の権利を取得して, 処理を実施
                    acquire ();
                } catch (InterruptedException e) {
                    e.printStackTrace ();
                }
                System.out.println (mStringToPrint + "(" + loopsDone + ")");
                // 次の処理の権利を解放
                release ();
            }

            // 自スレッドの処理がすべて終わったらカウントダウン
            mLatch.countDown ();
        }

        private void acquire () throws InterruptedException {
            mSemaphoreOne.acquire ();
        }

        private void release () {
            mSemaphoreTwo.release ();
        }
    }

    public static void process (int maxIterations) throws InterruptedException {
        // 待ち合わせ
        mLatch = new CountDownLatch (2);

        // バイナリセマフォ
        // セマフォを獲得できたら 次のステップに進める
        SimpleSemaphore pingSema = new SimpleSemaphore (1, true);
        SimpleSemaphore pongSema = new SimpleSemaphore (1, true);

        // pong が動作しないようにセマフォ獲得
        // これで, 確実に ping から処理をすすめることができる.
        pongSema.acquire ();

        // 二つのスレッド生成
        PlayPingPongThread ping = new PlayPingPongThread ("Ping! ",
                                                                                                            pingSema,
                                                                                                            pongSema,
                                                                                                            maxIterations);
        PlayPingPongThread pong = new PlayPingPongThread (" Pong!",
                                                                                                            pongSema,
                                                                                                            pingSema,
                                                                                                            maxIterations);

        System.out.println ("Go!");

        // スレッドスタート
        pong.start ();
        ping.start ();

        // 二つのスレッドの待ち合わせ
        mLatch.await ();

        System.out.println ("Done!");
    }

    public static void main (String[] args) throws InterruptedException {
        process (mMaxIterations);
    }
}