Monday, November 8, 2010

scalaでactor使ってparallel map その3 - Futures編

その1
その2
でparallel mapの実装を試みたところActorの理解が浅く、実はシングルスレッドだったという衝撃の事実が発覚して度肝を抜かれた。

めげずに、Using scala actor framework as fork-join computation?とかScalaActors.pdf
を参考にしてFuturesを使って実装してみた。

今回は、 FuturesをListの要素数分作って、mapでFuturesをapplyする場合と、
Futuresの数を制御したい場合の2パターンを実装してみた。

import scala.actors.Future
import scala.actors.Futures._

import java.util.concurrent._

trait Pmap {

  // FuturesをListの要素数分作る場合
  def pmap[T, U](xs: List[T])(fun: T => U): List[U] = {
    val mapfuncs = xs.map{ x => future { fun(x) } }
    mapfuncs.map{ _() }
  }

  // for Debug
  def pmap_debug[T, U](xs: List[T])(fun: T => U): List[U] = {
    val mapfuncs = xs.map{ x => future {
        println("start Thread " + currentThread.getId)
        val r = fun(x)
        println("end Thread " + currentThread.getId)
        r
    } }
    mapfuncs.map{ _() }
  }

  // Futuresの数を制限したい場合
  def pmap[T, U](xs: List[T], n: Int)(fun: T => U): List[U] = {
    val mapfuncs = divide(n, xs).map{ l =>
      future { l.map( fun(_) ) }
    }
    mapfuncs.flatMap{ _() }
  }

  // for Debug
  def pmap_debug[T, U](xs: List[T], n: Int)(fun: T => U): List[U] = {
    val mapfuncs = divide(n, xs).map{ l =>
      future {
        println("start Thread " + currentThread.getId)
        val result = l.map( fun(_) )
        println("end Thread " + currentThread.getId)
        result
      }
    }
    mapfuncs.flatMap{ _() }
  }

  def divide[T](n: Int, xs: List[T]): List[List[T]] = {
    val size = xs.size / n
    def r(ys: List[T], acc: List[List[T]]): List[List[T]] = {
      if(ys.size < size) (acc.head ::: ys) :: acc.tail
      else{
        val splitted = ys.splitAt(size)
        r(splitted._2, splitted._1 :: acc)
      }
    }
    r(xs, Nil).reverse
  }
}

object Main extends Application with Pmap {

  val l: List[Int] = (1 to 100000).toList
  pmap[Int, Int](l, 4)(x => x + 1)
  pmap_debug[Int, Int](l, 2)(x => x + 1)
  pmap[Int, Int](l)(x => x + 1)
  pmap_debug[Int, Int](l)(x => x + 1)
}


pmap_debugを実行すると
start Thread 13
start Thread 11
start Thread 12
start Thread 10
end Thread 10
end Thread 11
end Thread 12
end Thread 13

と出るのでマルチスレッドで非同期に実行されていることが確認できた。

Futures10万個の場合では実際に(2コア2スレッドのマシンでは)4スレッドで切り盛りしていた。
(環境依存。4コア8スレッドのマシン上では合計16スレッドだった)

2コア2スレッドマシン
% sort /tmp/output.txt |uniq -d
end Thread 10
end Thread 11
end Thread 12
end Thread 13
start Thread 10
start Thread 11
start Thread 12
start Thread 13

どっちがよいかは場合によるだろう。

リストの数がそれほど大きくなくて、mapにそこそこ時間がかかる場合を考えてみる。
リストをまず分割して、その断片にFuturesを割り当てる方式だと、
分割したリスト断片間を処理する時間にばらつきがあると、一番遅い断片の処理時間に引っ張られそう。
リストの要素ごとにFuturesを作った方が処理時間が平準化されてトータルでは早くなりそうだ。

今回のようにリストの数が膨大だとFuturesの数を抑えたが方が、高速でメモリにもやさしそうだ。

リストの要素数、mapの処理内容、CPUコア数によって変わるだろうから実際に動かして決めるのが良さそうだな。

No comments:

Post a Comment