Blog
どうも,実は今年から開発チームにjoinしていた中川です.可愛い犬の写真がなかったので,可愛いマスコットの画像を貼っておきます.
最近MapReduceとかその実装であるHadoopとかをよく聞くようになりました.これはつまり,それだけ大量のデータをなんとか処理したいという要望があるからだと思います.しかし当たり前ですが,MapReduceは銀の弾丸ではありません.
ということで,最近気になっているMapReduceとは違ったアプローチを取っている分散処理基盤について,社内のTechTalkで話した内容を簡単にまとめて紹介したいと思います.
Bulk Sychronous Parallel
このアルゴリズム自体は1990年に誕生したものです.長いのでBSPと書きます.さて,グラフから最短経路を求める時,MapReduceは使えるでしょうか?このような論文が出るくらいですから出来ないことはありませんが,効率的に行うのは難しいです.
主な理由は,MapReduceが任意のノードへの通信手段を提供していないからです.しかしグラフ構造などを処理するときは「真横のノードに渡したい!」とか出てきます.このような処理を行うプロダクトで,BSPを採用したりしているようです.
BSPそのものはsuperstepという単位でイテレーションすることで,処理を行うモデルです.superstepは以下の3つで構成されます.
- Concurrent Computation
タスクの計算を行う所.ノード間で並行に行われる. - Communication
各ノード間で通信をする所. - Synchronization
全ての処理を待ち受ける所.ここで一度全てのタスクが合流します.
以下はWikipediaにあるイメージ画像です.Masterが書かれていませんが,各実装ではMapReduceのように各WorkerをマネージするMasterは存在しています.
各ノードはメッセージキューみたいなものを持っており,前のsuperstepから送られた値はここに貯まっていき,使う時になったらここから取得します.各ノードは計算が終わった後,イテレーションを終了するかどうかを選択します.
全てのノードが処理の終了をマスタに通知したら,全体のイテレーションが終わり,結果が取得できます.BSPはMapReduceよりも柔軟に処理が可能ですが,耐障害性を考えると複雑な処理ではチェックポイントを適宜挟んでやる必要があります(でないとイテレーションを最初からやり直すはめに…).
大規模データ向けのBSP実装としては,今の所Google PregelとApache Hamaの二つがあります(BSP Worldwideにその他の関連リンクがたくさんあります).GoogleのPregelはグラフ構造に特化したもののようですが,Hamaは汎用なBSP実装を目指しているようです.以下はPregelのページランクの例です(slideshareからの引用.コメントは著者がつけました).
class PageRankVertex : public Vertex { public: /* * Apache Hamaもそうだけど一つの関数だけ書く * msgsに貯まっているメッセージが入っている */ virtual void Compute(MessageIterator* msgs) { // コアとなる計算部分 if (superstep() >= 1) { double sum = 0; for (; !msgs->Done(); msgs->Next()) sum += msgs->Value(); *MutableValue() = 0.15 / NumVertices() + 0.85 * sum; } if (superstep() < 30) { const int64 n = GetOutEdgeIterator().size(); SendMessageToAllNeighbors(GetValue() / n); // 任意のノード(ここでは隣)に次の値を送信! } else { VoteToHalt(); // 処理が終了したことをMasterに通知 } } };
HamaがどういうAPIでBSPを提供しているかはWikiに書いてあります. 後,Neo4jというグラフデータベースがBSPモデルを採用予定のようです.
Spark (http://www.cs.berkeley.edu/~matei/spark/)
Sparkは今人気が出てきているかもしれないScalaで書かれているフレームワークです.
MapReduceで機械学習を行おうとすると,同じデータセットに対して収束するまで何度も処理を繰り返すことも多々あり,これが結構な負荷になります.これに対してSparkは「それぞれの処理の中でデータを再利用できる仕組みを提供してるから効率的に動くよ!」とアピールしてます.
Sparkで肝になるのはResilient Distributed Datasets(RDDs)と呼ばれる読み込み専用のデータセットと,操作が制限された共有変数です.RDDはソースを見れば分かりますが,何か裏にストレージが必要というわけではなく,とりあえずデータセットとして振る舞えばリソースは何でもいいです.基本lazyに処理されるので,reduceのような実際に値を必要とする操作が行われるまで,データそのものは生成されません.
それに加え,RDD自体がチェックポイントのようにlineageに管理されているので,あるノードが落ちても,他のノードでRDDを使うことで再度処理を続けることが出来ます.
下のはhdfsにあるログデータからERRORの行をカウントするだけの簡単なサンプルです.
val file = spark.textFile("hdfs://...") val errs = file.filter(_.contains("ERROR")) val ones = errs.map(_ => 1) // この段階でerrsとonesにデータが読み込まれているわけではない // 実装としてはイテレータを保持しているだけ,都度計算している // reduceすることでRDDから実際の値が計算される val count = ones.reduce(_+_) // cacheを読んであげるとSpark環境にキャッシュが生成され // ノード間で同じデータセットを使い回すことも出来る val cachedErrs = errs.cache()
下のはサイトにも載っているPi Estimatingのサンプルです
val spark = new SparkContext() var count = spark.accumulator(0) for (i <- spark.parallelize(1 to 10000, 10)) { val x = Math.random * 2 - 1 val y = Math.random * 2 - 1 if (x*x + y*y < 1) count += 1 } println("Pi is roughly " + 4 * count.value / 10000.0)
countが共有変数になってます.spark.parallelizeの中は別マシンで計算されるものの,普通の変数と同じようにアクセス出来ます.mapとかreduceとか,Scalaでプログラムを書いて,数行を変更を加えるだけで,Sparkが宜しくやってくれるというわけです.ScalaはAkkaとかもそうですが,この辺活発なのが羨ましい所です.
Piccolo (http://piccolo.news.cs.nyu.edu/)
最後に紹介するPiccoloは毛色が違って,Sparkの共有変数を突き詰めたような感じです.Piccoloが提供するのはイテレーションなどの枠組みではなく,各マシンの間で共有されるグローバルなテーブルです(タプルスペースを分散処理向けに拡張したものとイメージした方が分かり易いかもしれません).
処理の図はこんな感じです(PiccoloのPDFから引用しました).1, 2, 3が処理行っているノードで,適宜グローバルテーブルにアクセスします.対象となるインメモリのデータに対して直接更新を行うので,ページランクやWebクローラのような読み書きが大量に行われる処理に非常に強いです.後実装のMutableGlobalTableなどを見れば分かりますが,シャードのチェックを適宜行いローカリティもかなり考慮しているので,その辺を意識してコーディングしてあげると,かなり速くなるようです.勿論,提供されるのはgetやputやアキュムレータなどのプリミティブなので,データのフローに関しては,プログラマが考えてやらないといけません.またSparkなどとは違い,耐障害性のためにチェックポイントが必要になります.
下のは簡単なワードカウントの例です.PMapはPiccoloのための特別な記法で,テーブルの各要素をiterateしながらgetなどの定型の処理を省き,コードブロックを簡単に実行するためのマクロみたいなものです(チェックポイントなどが入るサンプルは長くなるので,PiccoloのサンプルやPDFを参照してください).
counts = CreateTable(0, 1, new Sharding::String, new Accumulators::Sum); books = CreateTextTable(1, FLAGS_book_source, false); StartWorker(conf); Master m(conf); PMap({ line : books }, { vector words = StringPiece::split(line, " "); for (int j = 0; j < words.size(); ++j) { words[j].strip(); counts->update(words[j].AsString(), 1); } }); PMap({ c : counts}, { if (c > 50) { printf("%20s : %dn", k.c_str(), c); } });
Piccolo自体は好きなアプローチなんですが,C++とPythonしか今の所APIがないのがつらい所です.内部ではProtocol Buffersを使っているようなので,HBaseのThriftサーバみたいにProtocol Buffersサーバとか立てれば,色々幅が広がりそうな気はしているのですが…
まとめ
とりあえず3つを紹介しました.どのアプローチにも得意なものと苦手なものがあります.MapReduceが色々と制限してプログラマの負担を減らしたのに比べ,今回紹介したのは制限がゆるめです.あまり汎用に過ぎると,逆に使い難くなるというのはよくあることで,BSPの汎用実装を目指しているApache Hamaが今後どうなるのか気になる所です. また,大量のデータが日々生成されるようになってきている今,ユースケースに特化した分散処理基盤がまだまだ作られそうで,面白いのが出てきてくれたらなぁと思っています.
今回紹介したPiccoloやSparkはコード量がとても少ないので,実装が気になる人は是非読んでみましょう!
Tag