JavaScript1.7で平行プログラミングモデルを

Firefox2.0に搭載されているJavaScript1.7にはPython2.5風のgenerator機構があります。

  • function内でyield式を使うことで、その呼び出し結果がiteratorと同じインタフェースで利用可能なgeneratorオブジェクトになる。
  • generatorオブジェクトはnext()のほかにsend(v)メソッドを持つ。send(v)でyield式に復帰したとき、その式の値をvに置き換えられる。

generatorを使って平行プログラミングモデルの(擬似)システムを作るのは常套手段です。関数だけで中断再開を実現するには、処理させるコードをContinuation Passing Styleで書かなくてはいけないのですが、generator機構のyieldによって中断可能な処理でも比較的フラットに書くことができるからです。

平行モデルはいろいろあるのですが、ここはstackless pythonにあるchannelが基本的な仕組みとしてよいと感じたので、channelを実装してみました。

利用サンプルコード

    // エンジンの作成
    var engine = newEngine();
    // チャネルの作成
    var ch1 = engine.newChannel();
    var ch2 = engine.newChannel();
    
    // タスクpingの定義
    function ping() {
      var v1 = yield ch1;
      print("ping receive  from ch1: " + v1);
      
      print("ping send 10 to ch2");
      ch2.send(10);
      
      var v2 = yield ch1;
      print("ping receive from ch1: " + v1);
      
      print("ping send 20 to ch2");
      ch2.send(20);
    }

    // タスクpongの定義
    function pong() {
      print("pong send 100 to ch1");
      ch1.send(100);
      
      var v1 = yield ch2;
      print("pong receive from ch2: " + v1);
      
      print("pong send 200 to ch2");
      ch1.send(200);
      
      var v2 = yield ch2;
      print("pong receive from ch1: " + v1);
    }
    
    // タスクの登録
    engine.add(ping);
    engine.add(pong);

    // 実行
    engine.run();

チャネルに送信はsendメソッドを使いますが、受信(receive)でyieldを使うようにします。
なぜならそこで待ち状態に入る可能性があるからです。
できれば隠したいところだけど、中断させるためには明示的にyieldさせる以外にありませんので。

単純にchannelでの受け渡しをするだけであれば、中はシングルスレッドなので各channelが待ち状態のtaskリストを持っていればよく、実のところengineインスタンスにする必要はないと思うのですが、stackless python風にしてみました。

エンジン実装

考え方は簡単です。

  • channelでreceiveに入ると、中断させ、channelの待ちリストにいれる
  • channelでsendされると、キューに値を貯める
  • 待ちtaskが入ったとき、またはキューに値がたまったとき、待ちtaskと値があればそのtaskにその値を渡して再開させる
    • 結果的に(sendの中で)channelの待ちtaskを再開する
    • もしくは中断の直後に再開される

task実行で行う必要があることは、

  • task実行で中断されたとき、channelの待ちリストにそのtaskを追加してあげる
    // お約束のarrayメソッド
    Array.prototype.findBy = function (func) {
      for (var i = 0; i < this.length; i += 1) {
        if (func(this[i])) return i;
      }
      return -1;
    };
    Array.prototype.removeAt = function (index) {
      if (index >= 0) this.splice(index, 1);
    };
    
    // channelオブジェクト生成
    function channel(engine) {
      return {
        queue: [],
        consumers: [],
        engine: engine,
        send : function (v) {
          this.queue.push(v);
          this.process();
        },
        push: function (consumer) {
          this.consumers.push(consumer);
          this.process();
        },
        process: function() {
          if (this.consumers.length > 0 && this.queue.length > 0) {
            var consumer = this.consumers.shift();
            var value = this.queue.shift();
            try {
              var ch = consumer.send(value);
              ch.push(consumer);
            } catch (stop) {
              this.engine.del(consumer);
            }
          }
        }
      };
    }
    
    // エンジン生成
    function newEngine() {
      return {
        tasks: [],
        add: function (task) {
          this.tasks.push(task);
        },
        del: function (generator) {
          var index = this.generators.findBy(function (o) { 
            return o.generator == generator; });
          this.tasks.removeAt(index);
        },
        run: function () {
          for (var i = 0; i < this.tasks.length; i += 1) {
            var consumer = this.tasks[i]();
            this.tasks[i].generator = consumer;
            try {
              var ch = consumer.next();
              ch.push(consumer);
            } catch (ex) {
              if (ex == StopIteration)
                this.del(consumer);
              throw ex;
            }
          }
        },
        newChannel: function () {
          return channel(this);
        }
      };
    }
    
    // task内を関数に切り分けたいとき、その関数の呼び出しで使う
    // 分ける関数もgenerator関数(どこかでyieldを使う)でなくてはいけない
    // r = yield genefunc.yieldfunc(a,b,c);
    Function.prototype.yieldfunc = function () {
      var genfun = this;
      var gene = genefun.apply(undefined, arguments);
      var ch = gene.next();
      var value = yield ch;
      gene.send(value);
    }
  • channel.process内部メソッドが、チャネルの待ちタスクを再開できるかをチェックして再開させます。
    • generatorは終われば例外StopIterationが出ます。
    • 例外が起きない場合は中断した場合です。そのタスクが待つchannelに登録して終わります。

実行結果

うちの環境ではこんな感じ。

pong send 100 to ch1
ping receive from ch1: 100
ping send 10 to ch2
pong receive from ch2: 10
pong send 200 to ch2
ping receive from ch1: 100
ping send 20 to ch2
pong receive from ch1: 10 

感想

lightweight threadは面白いことは面白いけど、このtaskはgeneratorなので動きはヘビーかも。中断可能な関数を深できるようにするには根からその奥までのあいだの記述全部でyieldされる必要もあり、これは面倒な制約です。フラットに書きたいとき限定で使うという感じか。

ソース

<html><head>

<script type="text/javascript;version=1.7">
  // <![CDATA[
    Array.prototype.findBy = function (func) {
      for (var i = 0; i < this.length; i += 1) {
        if (func(this[i])) return i;
      }
      return -1;
    };
    Array.prototype.removeAt = function (index) {
      if (index >= 0) this.splice(index, 1);
    };
    
    function channel(engine) {
      return {
        queue: [],
        consumers: [],
        engine: engine,
        send : function (v) {
          this.queue.push(v);
          this.process();
        },
        push: function (consumer) {
          this.consumers.push(consumer);
          this.process();
        },
        process: function() {
          if (this.consumers.length > 0 && this.queue.length > 0) {
            var consumer = this.consumers.shift();
            var value = this.queue.shift();
            try {
              var ch = consumer.send(value);
              ch.push(consumer);
            } catch (ex) {
              if (ex == StopIteration)
                this.engine.del(consumer);
            }
          }
        }
      };
    }
    
    function newEngine() {
      return {
        tasks: [],
        add: function (task) {
          this.tasks.push(task);
        },
        del: function (generator) {
          var index = this.generators.findBy(function (o) { return o.generator == generator; });
          this.tasks.removeAt(index);
        },
        run: function () {
          for (var i = 0; i < this.tasks.length; i += 1) {
            var consumer = this.tasks[i]();
            this.tasks[i].generator = consumer;
            try {
              var ch = consumer.next();
              ch.push(consumer);
            } catch (ex) {
              if (ex == StopIteration)
                this.del(consumer);
              throw ex;
            }
          }
        },
        newChannel: function () {
          return channel(this);
        }
      };
    }
    
    Function.prototype.yieldfunc = function () {
      var genfun = this;
      var gene = genefun.apply(undefined, arguments);
      var ch = gene.next();
      var value = yield ch;
      gene.send(value);
    }
    
    var engine = newEngine();
    var ch1 = engine.newChannel();
    var ch2 = engine.newChannel();
    function ping() {
      var v1 = yield ch1;
      print("ping receive  from ch1: " + v1);
      //yield subcall(genefun, 100);
      
      print("ping send 10 to ch2");
      ch2.send(10);
      
      var v2 = yield ch1;
      print("ping receive from ch1: " + v1);
      
      print("ping send 20 to ch2");
      ch2.send(20);
    }
    function pong() {
      print("pong send 100 to ch1");
      ch1.send(100);
      
      var v1 = yield ch2;
      print("pong receive from ch2: " + v1);
      
      print("pong send 200 to ch2");
      ch1.send(200);
      
      var v2 = yield ch2;
      print("pong receive from ch1: " + v1);
    }
    
    function print(v) {
      document.getElementById("out").innerHTML += <span>{v}<br/></span>;
    }
    function init() {
    }
    function run() {
      engine.add(ping);
      engine.add(pong);
      engine.run();
    }
    //]]>
</script></head><body onload="init()">
<form>
<input value="push!" onclick="run()" type="button">
</form>
<div id="out"></div>
</body></html>