HadoopStreaming で MapReduce を Python で動かす
HadoopStreaming を 使って Python で Hadoop を動かす方法です.
Mapper と Reducer は通常 Java で記述しなければいけませんが,HadoopStreaming を使えば Python などの他の言語で書けるので実装がとても楽になります!
HadoopStreaming とは?
Mapper と Reducer のやり取りを標準入出力を介することで,他のプログラミング言語からも Hadoop (MapReduce) を利用できるようにする仕組みです.
標準入出力さえ扱えれば基本的にどんな言語でも構わないので,Python だけでなく Ruby, Perl, シェルスクリプト等を使って Mapper と Reducer を実装することができます.
詳しい説明は以下をどうぞ!
Hadoop Streamingメモ(Hishidama's Hadoop Streaming Memo)
HadoopStreaming の入手
HadoopStreaming を利用するには,「hadoop-streaming-***.jar」という jar ファイルが必要です(***
には Hadoop のバージョンが入る).
以下,入手方法です.
- Hadoop の DL ページにアクセス.
http://ftp.jaist.ac.jp/pub/apache/hadoop/common/ - 使用しているバージョンの Hadoop のディレクトリに入り,その中にある「hadoop-***.tar.gz」を DL する.
- DL したファイルを解凍.
- 解凍してできたディレクトリの以下に「hadoop-streaming-***.jar」がある.
hadoop-***/share/hadoop/tools/lib/hadoop-streaming-***.jar
HadoopStreaming の使い方
Hadoop のサンプルとして定番の「Word Count」を例に説明します.
以下の記事を参考にさせて頂きました.
入力ファイルの用意
単語を数え上げるファイルを用意します.
$ echo "a a a b b c" > input.txt
別に自前で用意しても構いません.
Mapper と Reducer を実装
今回は Python で実装します.
mapper.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys # 入力テキストを標準入力から受け取り1行ずつ処理 for line in sys.stdin: # テキストをスペースで区切って単語に分割 for word in line.strip().split(): # 各単語を「word 1」という形式で標準出力に出力 # タブで区切られた左が key, 右が value に相当 print '{0}\t1'.format(word)
reducer.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys from collections import Counter # 単語のカウントを行う Counter オブジェクト counter = Counter() # Mapper の出力を標準入力から受け取り1行ずつ処理 for line in sys.stdin: # key(=word), value(=count) に分割する word, count = line.strip().split('\t') # カウント counter[word] += int(count) # カウント結果を1単語ずつ「word count」という形式で標準出力に出力 for word, count in counter.most_common(): print '{0}\t{1}'.format(word, count)
※ collections.Counter
モジュールは Python2.7 以降じゃないと動きません.2.6 以下の場合は参考記事のコードを使って下さい(reducer.py も同様).
実行
HDFS に入力ファイルを置く
先程作成した input.txt を HDFS に転送します.
$ hadoop fs -put input.txt
HadoopStreaming を実行
以下のコマンドで実行できます.
オプションの意味は 下記 を参照して下さい.
$ hadoop jar hadoop-streaming-***.jar -mapper mapper.py -reducer reducer.py -file mapper.py reducer.py -input input.txt -output wc.out
あとは MapReduce が終了するのを待つだけです!
結果を確認
結果の確認方法は普段と同じです.
$ hadoop fs -cat 'wc.out/part-*' a 3 b 2 c 1
その他
ローカルで HadoopStreaming のテストを行う
下記のようにシェルコマンドをパイプで繋ぐことで,ローカル環境で簡単に HadoopStreaming の動作テストができます.
$ cat input.txt | python mapper.py | sort | python reducer.py a 3 b 2 c 1
このコマンドは以下のような仕組みで動作しています.
cat
で入力ファイルを標準出力に出力し,パイプでmapper.py
の標準入力に渡す.mapper.py
の出力をsort
に渡す.(MapReduce の Shuffle に相当)sort
の結果を reducer.py に渡し,結果を出力.
標準入出力を使ってデータをやり取りする HadoopStreaming の性質をうまく利用していますね!
HadoopStreaming のオプション
代表的なオプションを以下に記載しておきます.
-mapper [file]
: Mapper の実行ファイルを指定-reducer [file]
: Reducer の実行ファイルを指定-file [file1] [file2] ...
: Mapper や Reducer で実行するファイルをリモートサーバに配布-input [file or dir]
: HDFS上の入力ファイルを指定-output [dir]
: 結果を出力するHDFS上のディレクトリを指定-numReduceTasks [num]
: 実行する Reducer の数を指定-combiner [file]
: Combiner の実行ファイルを指定-cacheFile [file]#[symlink]
: DistributedCache に渡すHDFS上のファイルを指定.また,それを Mapper や Reducer で読み込むために利用する symlink も指定します.
以下のコマンドですべてのオプションを参照できます.
$ hadoop jar hadoop-streaming-***.jar -info
HadoopStreaming で Combiner を使う
HadoopStreaming でも Combiner が使えます.
使い方は簡単で,HadoopStreaming 実行時に -combiner [file]
オプションで Combiner の実行ファイルを指定するだけです.
例えば先程の Word Count は次のようにすれば Combiner が使えます.
$ hadoop jar hadoop-streaming-***.jar -mapper mapper.py -reducer reducer.py -combiner reducer.py -file mapper.py reducer.py -input input.txt -output wc.out
今回は Reducer と Combiner は全く同じファイルなので問題ありませんが,異なる場合は -file
オプションに Combier のファイルを含めるようにして下さい.
数GBぐらいの大きなテキストファイルを入力にすると,Combier を使うことによる実行時間の短縮が実感できるかと思います.
HadoopStreaming で DistributedCache を使う
DistributedCache ももちろん使えます.-cacheFile [file]#[symlink]
というオプションを付けるだけです!
例として,特定のワードはカウントしない Word Count を実装します.
カウントしないワード (= ストップワード) のリストを DistributedCache を使って読み込みます.
まずはストップワードのファイルを作成し,HDFS に置きます.
今回は 'b' をストップワードに指定します.
$ echo b > stopwords.txt $ hadoop fs -put stopwords.txt
次に mapper.py にストップワードを省く処理を書き足します.
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys # ストップワードを読み込んでセットに格納 stopwords = set() for line in open('stopwords.txt'): stopwords.add(line.rstrip()) # 入力テキストを標準入力から受け取り1行ずつ処理 for line in sys.stdin: # テキストをスペースで区切って単語に分割 for word in line.strip().split(): # ストップワードなら処理しない if word in stopwords: continue # 各単語を「word 1」という形式で標準出力に出力 # タブで区切られた左が key, 右が value に相当 print '{0}\t1'.format(word)
これらができたら実行します.-cacheFile 'stopwords.txt#stopwords.txt'
を忘れずに付けるようにします.
$ hadoop jar hadoop-streaming-***.jar -mapper mapper.py -reducer reducer.py -file mapper.py reducer.py -input input.txt -output wc2.out -cacheFile 'stopwords.txt#stopwords.txt'
出力ディレクトリも先程と別の場所を指定するのを忘れないようにして下さい.エラー出ます…
実行が終わったら結果を見てみます.
$ hadoop fs -cat 'wc2.out/*' a 3 c 1
確かに 'b' がカウントされていませんね!
まとめ
HadoopStreaming を 使って Python で Hadoop を動かす方法を解説しました.
Python に限らず,簡単に好きな言語から Hadoop を動かすことができ,自分の得意な言語が使えるのは便利ですね!
ただ1点注意として,Javaで実装した場合に比べて処理時間は少し長くなってしまうようです*1.けど実装にかかる時間を大幅に短縮できるので,実装から処理までトータルで考えれば時短になるとは思います笑
Javaだから…とHadoopを敬遠されていたも,この機会に是非チャレンジしてみてはいかがでしょうか.