- - 目次

Thread


はじめに

Rubyではthread機能はコンパイル時に設定されるオプションです. 手元のrubyでthreadが使えるかどうか調べるためにはコマンドラインから 以下のように入力してみてください.


% ruby -le 'print defined?(Thread)'

この結果


constant

と表示されればそのrubyインタプリタではthreadが使えます.もし,


false

と表示されたら,残念ながらそのrubyではthreadは使えません. Rubyのthread機能は移植性が高いように作られているので, 再コンパイルだけで使えるようになる可能性が高いです. (もし自分で出来なければ)管理者に頼んでみてください.

Threadとは

Threadとはひとつのプログラムの中で複数の制御の流れを扱うことが 出来る機能です.OSで提供されるプロセスとは違ってthreadでは メモリ空間が共有されます.

Rubyで使われているthreadはユーザレベルthreadと呼ばれるもので, rubyインタプリタ自身が自分でthreadの切替えを行っています. この方法は,OSで実装されているものよりも効率が低い,マルチCPUを 活かすことが出来ない,というデメリットがありますが,その代わり 移植性が高いというメリットがあります.

Threadの生成

新しいThreadを作るためにはThread.startというメソッドを使います. 使い方は以下の通りです.


Thread.start { .... }

Thread.startは新しいthreadを作り,そのthreadでイテレータブロックを 評価します.簡単なプログラムでthreadが動く様子を見てみましょう.


     1  Thread.start {
     2    while TRUE
     3      print "thread 1\n"
     4    end
     5  }
     6
     7  while TRUE
     8    print "thread 2\n"
     9  end

このプログラムを動かすと「thread 1」と「thread 2」が混じって 表示されるので,二つの無限ループが同時に動作しているのが分か ると思います.このプログラムを終了させるためにはCtrl-Cを押し てください.

Threadの操作

Threadクラスのメソッドは以下の通りです.

Thread.start {...}
Thread.new {...}

新しいthreadを生成し,その中でイテレータブロックを評価す る.新しく生成されたthreadオブジェクトを返す.newはstart の別名.

Thread.current

現在実行しているthreadオブジェクトを返す.

Thread.exit

現在実行しているthreadを終了させる.

Thread.join thread

指定したthreadの実行が終了するまで,現在のthreadを停止さ せる.

Thread.kill thread

指定したthreadの実行を終了させる.

Thread.pass

実行可能な他のthreadに明示的に制御を渡す.

Thread.stop

現在のtheadの実行を停止する.他のthreadがthread#runを実 行するまで停止し続ける

Thread#exit

レシーバのthreadの実行を終了させる.

Thread#run

レシーバの実行を再開させる.

Thread#stop

レシーバの実行を停止させる.

Thread#status

レシーバがまだ生きていれば真を返す.例外によってthreadが 終了していればその例外を発生させる.

Thread#value

レシーバのイテレータブロックを評価した結果を返す.まだイ テレータブロックの評価が終了していない時にはそのthreadが 終了するまで待つ.

またthreadが使える条件でコンパイルされたrubyではsleepが再定 義されていて,現在のthreadだけを一定時間停止させることが出来 る.またselectもthreadを扱えるように拡張されている.

Thread間の同期

Threadはメモリ空間を共有しているのでThread間のデータのやりと りは普通の変数を使って行うことができますが,動作するタイミン グを合わせるために同期を行う必要があります.この同期に失敗す ると,来るはずの無いデータを待って永遠に待ち続けるデッドロッ クと呼ばれる状態になったり,期待するのと違うデータを受け取っ て見付けにくいバグの元になったりします.

Rubyのthreadライブラリでは二つの同期方法を提供しています.ひ とつは同期だけを行うMutexとデータの受渡しも行うQueueです.こ れらのライブラリを使うためにはプログラムの先頭で


require "thread"

を呼び出しておく必要があります.

Mutex

Mutexとはmutual-exclusion lock(相互排他ロック)の略です. Mutexをロックしようとした時にすでにロックされていれば, threadはロックが解除されるまで停止します.

並行アクセスから共有データを保護するためには以下のようなコー ドを用いて行います(ここでmをMutexのインスタンスとします).


begin
  m.lock
  # mで保護される共有データへのアクセス
ensure
  m.unlock
end

同じことをより簡単に行うためMutexにはsynchronizeというメソッ ドがあります.


  m.synchronize {
    # mで保護される共有データへのアクセス
  }

例として簡単なプログラムを用意してみましょう.


     1  require "thread"
     2
     3  m = Mutex.new
     4  v = 0;                          # mで保護されるデータ
     5
     6  Thread.start {
     7    while TRUE
     8      m.synchronize {
     9        v = v + 100
    10    }
    11    end
    12  }
    13
    14  while TRUE
    15    m.synchronize {
    16      v = v - 33
    17    }
    18  end

このプログラムをMutexで保護しないと,タイミングによっては vの値を取り出してから代入までの間に他のthreadによって値が 変更されてしまう可能性があります.

Mutexのメソッドは以下の通りです.

Mutex.new

新しいロックを生成する

Mutex#lock

ロックする.すでにロックされている場合にはロックが解除されるまで待つ.

Mutex#unlock

ロックを解除する.ロックを待っている他のthreadがあればそちらを走らせる.

Mutex#synchronize

ロックの獲得から解除までを行うイテレータ.

Mutex#try_lock

ロックを獲得する.すでにロックされている場合には停止せず FALSEを返す.

Queue

Queueはデータを読み書きするパイプのようなものです.データを 提供するthreadは一方からデータを書き込み,読み出すthreadは もう一方からデータを取り出します.Queueに読み出すデータが 残っていない時には読み出そうとしたthreadはデータが来るまで停止します.

Queueを使った簡単なプログラムは以下のようになります.


     1  require "thread"
     2
     3  q = Queue.new
     4
     5  Thread.start {
     6    while gets
     7      q.push $_
     8    end
     9  }
    10
    11  while TRUE
    12    while line = q.pop
    13      print line
    14    end
    15  end

このプログラムではひとつのthreadが読み込んだ行をもうひとつの threadが出力しています.3行目を「q = []」などとして 配列に変えてみるとthread間の同期が取れず,正しく動かないこと が分かるでしょう.

Queueのメソッドは以下の通りです.

Queue.new

新しいQueueを生成します.

Queue.empty?

Queueが空の時真を返します.

Queue.push value

Queueにvalueを追加します.

Queue.pop [non_block]

Queueからデータを取り出します.偽でない引数non_blockが与 えられた場合にはQueueが空の時に例外を発生させます.それ以 外の場合にはQueueが空の時にはQueueにデータが追加されるま で読み出したthreadを停止させます.

例題

並列プログラミングの世界では昔から有名な「哲学者の食事」問題 を作ってみましょう.

「哲学者の食事」問題とは以下のような状況で哲学者がどうやって 同期をとるかという問題です.

N人の哲学者が丸いテーブルに座っています.テーブルの真中に は大きなスパゲティの皿が置いてあります.またN本のフォーク があって哲学者と哲学者の席の間に置いてあります.哲学者は思 索を続けていますが,お腹がすくと両側のフォークを取ってスパ ゲティを食べます.お腹が一杯になると食べるのを止めてフォー クを返します.哲学者は紳士ですから,お腹が空いていても両方 のフォークが手に入るまでは待ちます.

このプログラムを実行すると現在の状態を次々と表示します.各文 字の意味は以下の通りです.

o: 考えている哲学者
*: 食事している哲学者
-: 使われていないフォーク
|: 使われているフォーク

哲学者が考えている時間と食事している時間は乱数で決めています.


     1	#
     2	# The Dining Philosophers - thread example
     3	#
     4	require "thread"
     5	
     6	N=7			# number of philosophers
     7	$forks = []
     8	for i in 0..N-1
     9	  $forks[i] = Mutex.new
    10	end
    11	$state = "-o"*N
    12	
    13	def wait
    14	  sleep rand(20)/10.0
    15	end
    16	
    17	def think(n)
    18	  wait();
    19	end
    20	
    21	def eat(n)
    22	  wait();
    23	end
    24	
    25	def philosopher(n)
    26	  while TRUE
    27	    think n
    28	    $forks[n].lock
    29	    if not $forks[(n+1)%N].try_lock
    30	      $forks[n].unlock	# avoid deadlock
    31	      next
    32	    end
    33	    $state[n*2] = ?|;
    34	    $state[(n+1)%N*2] = ?|;
    35	    $state[n*2+1] = ?*;
    36	    print $state, "\n"
    37	    eat(n)
    38	    $state[n*2] = ?-;
    39	    $state[(n+1)%N*2] = ?-;
    40	    $state[n*2+1] = ?o;
    41	    print $state, "\n"
    42	    $forks[n].unlock
    43	    $forks[(n+1)%N].unlock
    44	  end
    45	end
    46	
    47	for i in 0..N-1
    48	  Thread.start{philosopher(i)}
    49	  sleep 0.1
    50	end
    51	sleep
    52	exit

- - 目次

matz@caelum.co.jp
Last modified: Wed Jan 28 12:16:41 JST 1998