MapReduceプログラミングモデルをJavaで実装し 日本語テキストの単語出現頻度を求める
IT技術

MapReduceプログラミングモデルをJavaで実装し日本語テキストの単語出現頻度を求めてみる
MapReduce とは、簡単に言うと「膨大なデータを処理するためのプログラミングモデル及びその実装」のことです。
もともと MapReduce は、Google社の Jeffrey Dean と Sanjay Ghemawat が執筆した論文に紹介されていました。
現在では、Google だけではなく、多数の大手会社によって導入されています。
一部ではありますが、導入している会社はこんなビッグネームばかりです。
- Apache Software Foundation
 - Cloudera
 - Hortonworks
 - IBM
 - Oracle
 - Amazon
 - Microsoft
 - MapR
 - Hitachi
 
最近のビッグデータの時代では、MapReduce が基盤技術のひとつとなっています。
MapReduce の利点
MapReduce の利点の1つとしては、比較的にシンプルな手法で、膨大なデータの並列分散処理が行うことが出来ることです。
MapReduce のプログラミングモデル
MapReduce は、「Mapフェーズ」と「Reduceフェーズ」から成り立っています。
Dean氏と Ghemawat氏の論文では、MapReduce のプログラミングモデルを説明するために、多大のテキストドキュメントにおける単語の出現頻度を求める例が挙げられました。
対象となるテキストは、もちろん「英文」です。
このサンプルケースにおいて、MapReduce のプログラミングモデルは以下のような疑似コード(pseudo-code)で実装されました。
1map(String key, String value):
2    // key: document name
3    // value: document contents
4    for each word w in value:
5        EmitIntermediate(w, "1");
6
7reduce(String key, Iterator values):
8    // key: a word
9    // values: a list of counts 
10    int result = 0;
11    for each v in values:
12        result += ParseInt(v);
13    Emit(AsString(result));疑似コードに基づいて MapReduce プログラミングモデルを実装してみる!
今回は、この疑似コードに基づいて、MapReduce プログラミングモデルを実装し、複数の日本語テキストドキュメントから単語の出現頻度を求めてみたいと思います!
こちらの記事もオススメ!
MapReduce のプログラミングモデルをJavaで実装
対象データは日本語のテキスト文章ですので、日本語の「自然言語処理(Natural Language Processing)ライブラリー」が必要になります。
そこで、今回は使うのは「Kuromoji」という、日本語自然言語処理 Java ライブラリーです。
【Kuromojiダウンロードサイト】
https://github.com/atilika/kuromoji/downloads
Kuromojiの詳細
Kuromoji の詳細については、以下の Atilika のウェーブページをご参照ください!
【Atilika】
https://www.atilika.org/
日本語のストップワードリスト
その他に、日本語の文章から「助詞」や「非常に一般な単語かつ文章の文脈にあまり影響を及ばない単語(stopword)」を取り除くために、日本語のストップワードリストも必要です。
これは Github からダウンロードすることが出来ます。
【Github(ストップワードリスト)】
https://github.com/stopwords-iso/stopwords-ja/blob/master/stopwords-ja.txt
開発環境
今回の執筆者の環境は、以下の通りとなっています。
- IDE: IntelliJ IDEA
 - JDK: Oracle Java 8
 - OS: MacOS Catalina
 
プロジェクトの構成
1mapreduce
2|_input
3  |_日本語テキスト.txt // 3個のテキストファイル
4|_lib
5  |_kuromoji-0.7.7.jar //日本語自然言語処理ライブラリー
6|_src
7  |_common //入力テキストファイルを読み込むためのJavaコードなど
8    |_StopwordReader.java
9    |_stopwords_jp.txt  //日本語のストップワードリスト
10    |_TextFileReader.java
11  |_MapReduceWordFreq //MapReduceの実装Javaコード
12    |_MapReduceWordFreq.java
13    |_Mapper.java
14    |_MapOut.java
15    |_Reducer.java
16    |_ReduceIn.java
17    |_Result.java
18  |_WordFreq //MapReduceを使わずに、単語出現頻度を求めるプログラム
19    |_WordFreq.javaMapReduce:Mapフェーズ
「Mapフェーズ」では、inputディレクターから読み込まれた3個の日本語テキストファイルを、Mapper.java の map(String key, String value) 関数に渡します。
- 引数 key は、テキストファイルのファイル名です。
 - 引数 value は、テキストファイルの内容です。
 
文を構造する全ての単語を取得し、トークンとして扱う
まず、regex を用いて、テキスト文章から単語でないものを削除しましょう。
そして、Kuromoji ライブラリーの Tokenizer を適用し、テキスト文章のトークン化(Tokenization)を行います。
つまり、文を構造する全ての単語を取得し、トークン(Token)として扱います。
そして、得られた全ての単語をemitIntermediate(String key, Integer value) 関数に渡します。
- 引数 key はひとつの単語です。
 - 引数 value は Integer の 1 です。
 
ストップワードである単語を除く
emitIntermediate(String key, Integer value) 関数では、Mapフェーズの出力(MapOutオブジェクト)が生成されます。
ここで、ストップワードのリストを用いて、ストップワードである単語を除きます。
MapOut.javaクラスは String key と Integer value の変数から成り立ちます。
変数 key はひとつの単語を格納し、value は数値の1を格納します。
MapOut.java のコード
MapOut.java のコードは以下のようになります。
1package MapReduceWordFreq;
2
3public class MapOut { //Mapperの出力
4    public String key; //ひとつの単語を格納
5    public Integer value; //数字の1を格納
6
7    public MapOut(String key, Integer value){
8        this.key = key;
9        this.value = value;
10    }
11}Mapフェーズのテキストデータ処理は、Mapper.javaクラスにて実装しました。
1package MapReduceWordFreq;
2
3import common.StopwordReader;
4import org.atilika.kuromoji.Token;
5import org.atilika.kuromoji.Tokenizer;
6
7import java.io.IOException;
8import java.util.ArrayList;
9import java.util.HashMap;
10import java.util.List;
11import java.util.Map;
12
13public class Mapper {
14    private List<String> stopwordList;
15    public List<MapOut> mapOuts;
16
17    public Mapper() throws IOException {
18        stopwordList = new StopwordReader().read();
19        mapOuts = new ArrayList<>();
20    }
21
22    public void map(String key, String value) { //key : テキストファイル名、value: テキストファイルの内容
23        System.out.println("=============================");
24        System.out.println("Mapper --> list(key, value)");
25        System.out.println("=============================");
26        Map<String, Integer> kv = new HashMap<>();
27        Tokenizer tokenizer = Tokenizer.builder().build();
28        value = value.replaceAll("\\P{LD}+", "");
29        List<Token> tokenList = tokenizer.tokenize(value);
30        for (Token token:tokenList){
31            String word = token.getSurfaceForm();
32            emitIntermediate(word, 1);
33        }
34    }
35
36    public void emitIntermediate(String key, Integer value){ // key: ひとつの単語、value : 数字の1
37        if (!stopwordList.contains(key)){
38            System.out.println(key + ":" + value.toString());
39            mapOuts.add(new MapOut(key, value));
40        }
41    }
42}MapReduce:Reduceフェーズ
「Reduceフェーズ」では、Mapフェーズの出力(MapOutオブジェクト)を、Reducer.java の reduce(String key, List<Integer> values) 関数に渡します。
reduce(String key, List<Integer> values) 関数の引数 key は、ひとつの単語で、引数 values は数値1からなるリスト(List[1,1,1,...])です。
MapOut のデータ構造が、reduce(String key, List<Integer> values) 関数の引数に合わないため、そのままデータを渡すことが出来ません。
ここで、ReduceIn.java の generate(List<MapOut> mapOuts) 関数で MapOut のデータ構造を ReduceIn のデータ構造に変換します。
ReduceIn.java のコード
ReduceIn.java のコードは、以下のようになります。
1package MapReduceWordFreq;
2
3import java.util.ArrayList;
4import java.util.List;
5
6public class ReduceIn { //Reducerの入力データ構造
7    public String key; //ひとつの単語
8    public List<Integer> values; //list[1,1,1,1,...]
9
10    public ReduceIn() {
11        key = new String();
12        values = new ArrayList<>();
13    }
14
15    public List<ReduceIn> generate(List<MapOut> mapOuts) { //Reducerの入力データ構造を生成します。
16        List<ReduceIn> reduceIns = new ArrayList<>();
17
18        for (MapOut mapOut:mapOuts){
19            boolean newKey = true;
20            for (ReduceIn reduceIn:reduceIns){
21                if(reduceIn.key.equals(mapOut.key)){
22                    newKey = false;
23                    reduceIn.values.add(mapOut.value);
24                    break;
25                }
26            }
27            if (newKey){
28                ReduceIn reduceIn = new ReduceIn();
29                reduceIn.key = mapOut.key;
30                reduceIn.values.add(mapOut.value);
31                reduceIns.add(reduceIn);
32            }
33        }
34
35        return reduceIns;
36    }
37}Reduceフェーズが開始
それから、変換されたデータを reduce(String key, List<Integer> values) に渡して、Reduceフェーズが開始されます。
これによって、各単語の出現頻度をカウントされ、Result が出力されます。
Resultクラスは、「key(単語)」と「count(出現頻度)」という2つのメンバーから成り立っています。
Reducer.java と Result.java のコード
Reducer.java と Result.java のコードは、それぞれ以下のようになります。
1package MapReduceWordFreq;
2
3import java.util.List;
4
5public class Reducer { //Reduceフェーズ
6    private Result result;
7    public Reducer(){
8        result = new Result();
9    }
10
11    public Result reduce(String key, List<Integer> values){
12        Integer count = 0;
13        for (Integer i:values){
14            count += i;
15        }
16        result.key = key; //ひとつの単語
17        result.count = count; //単語の出現頻度
18
19        return result;
20    }
21}1package MapReduceWordFreq;
2
3public class Result {
4    public String key; //単語
5    public Integer count; //出現頻度
6}これで、MapフェーズとReduceフェーズの準備が完了しました!
MapReduce の実行
MapReduce を実行してみましょう!
MapReduceWordFreq.java の main(String[] args) で、日本語テキスト文章の単語出現頻度を求める MapReduce を実行します。
ここで先ず、入力ファイルを TextFileReader で読み込んで、それから Mapper と Reducer を実行します。
MapReduceWordFreq.java のコード
MapReduceWordFreq.java のコードは、次のようになります。
1package MapReduceWordFreq;
2
3import common.TextFileReader;
4import java.io.IOException;
5import java.util.ArrayList;
6import java.util.HashMap;
7import java.util.List;
8import java.util.Map;
9
10public class MapReduceWordFreq {
11
12    public static void main(String[] args) throws IOException {
13        //テキストファイルの読み込み
14        TextFileReader textFileReader = new TextFileReader();
15        Map<String, String> textFiles = textFileReader.readFiles();
16
17        //Mapフェーズの実行
18        Mapper mapTask = new Mapper();
19        List<MapOut> mapOuts = new ArrayList<>();
20        textFiles.forEach((k,v)->{
21            mapTask.map(k,v);
22            mapOuts.addAll(mapTask.mapOuts);
23        });
24
25        //Reduceフェーズの実行
26        Reducer reduceTask = new Reducer();
27        List<ReduceIn> reduceIns = new ReduceIn().generate(mapTask.mapOuts);
28
29        System.out.println("=============================");
30        System.out.println("Reducer(key, values)");
31        System.out.println("=============================");
32
33        Map<String, Integer> results = new HashMap<>();
34        for (ReduceIn in: reduceIns){
35            System.out.println(in.key + ":" + in.values.toString());
36            Result result = reduceTask.reduce(in.key, in.values);
37            int count = results.containsKey(result.key) ? results.get(result.key) : 0;
38            results.put(result.key, count + result.count);
39        }
40
41        System.out.println("=============================");
42        System.out.println("Reducer Results --> word:count");
43        System.out.println("=============================");
44        for (Map.Entry<String, Integer> e:results.entrySet()){
45            System.out.println(e.getKey() + ":" + e.getValue());
46        }
47    }
48}実行結果
| ============================= Mapper --> list(key, value) ============================= NHK:1 大河ドラマ:1 麒麟:1 くる:1 主人公:1 明智:1 光秀:1 所有:1 脇差し:1 名刀:1... 省略 ============================= Mapper --> list(key, value) ============================= 新型:1 コロナ:1 ウイルス:1 感染:1 拡大:1 受け:1 日本:1 政府:1 きょう:1 閣議:1... 省略 ============================= Mapper --> list(key, value) ============================= 北海道:1 札幌:1 市:1 新た:1 7:1 人:1 新型:1 コロナ:1 ウイルス:1 感染:1... 省略 ============================= Reducer(key, values) ============================= NHK:[1] 大河ドラマ:[1] 麒麟:[1] くる:[1] 主人公:[1] 明智:[1, 1, 1, 1] 光秀:[1, 1, 1, 1] 所有:[1, 1, 1, 1] 脇差し:[1, 1] 名刀:[1, 1]... 省略 ============================= Reducer Results --> word:count ============================= 刀:1 マカオ:1 市:5 討っ:1 者:2 分:2 万:2 滞在:1 上:1 部分:1 午前:2 成瀬:3 子孫:1 話し:1 くる:1 明智:4 発給:2 14:1 15:1 およそ:3... 省略Process finished with exit code 0  | 
MapReduce の実行は成功しました!
MapReduce を使わずに簡単に求める方法とは?
でも、ちょっと待って。
単語の出現頻度を求める為なら、もっと簡単な方法があるのでは!?
その通りです。
以下のコード(WordFreq.java)で MapReduce を使わずに、より簡単な方法で単語の出現頻度を求めることが出来ます!
1package WordFreq;
2
3import common.StopwordReader;
4import common.TextFileReader;
5import org.atilika.kuromoji.Token;
6import org.atilika.kuromoji.Tokenizer;
7import java.io.IOException;
8import java.util.ArrayList;
9import java.util.HashMap;
10import java.util.List;
11import java.util.Map;
12
13public class WordFreq {
14    private Tokenizer tokenizer;
15    private List<String> stopwordList;
16    private Map<String, Integer> wordFreqMap;
17
18    public WordFreq() throws IOException {
19        tokenizer = Tokenizer.builder().build();
20        stopwordList = new StopwordReader().read();
21        wordFreqMap = new HashMap<>();
22    }
23
24    public void calculate(List<String> text){
25        List<Token> tokenList = new ArrayList<>();
26        text.forEach(item->{
27            item = item.replaceAll("\\P{LD}+", "");
28            tokenList.addAll(tokenizer.tokenize(item));
29        });
30        for (Token token:tokenList){
31            String currentWord = token.getSurfaceForm();
32            if (!stopwordList.contains(currentWord)){
33                int wordCount = wordFreqMap.containsKey(currentWord)? wordFreqMap.get(currentWord) : 0;
34                wordFreqMap.put(currentWord, wordCount + 1);
35            }
36        }
37    }
38
39    public static void main(String[] args) throws IOException {
40        WordFreq wordFreq = new WordFreq();
41        TextFileReader textFileReader = new TextFileReader();
42        List<String> text = new ArrayList<>(textFileReader.readFiles().values());
43        wordFreq.calculate(text);
44        for (Map.Entry<String, Integer> kv: wordFreq.wordFreqMap.entrySet()){
45            System.out.println(kv.getKey() + ":" + kv.getValue());
46        }
47    }
48}しかし、多数のファイルからなる膨大なテキストデータを処理する時に、WordFreq.java のような手法を用いると、処理スピードはマシーンの計算能力に制限されます。
それに対し MapReduce は、その膨大なデータを分割し、分散システム上で並列的に処理しますので、マシーンの数を増やすだけで、データ処理のスピードをどんどん向上させることが出来ます。
コードの追加
最後に、本記事の MapReduce 実装に用いた入力テキストファイルを読み込むコード(TextFileReader.java)と、日本語ストップワードファイルを読み込むコード(StopwordReader.java)を追加します。
1package common;
2
3import java.io.*;
4import java.nio.file.Files;
5import java.nio.file.Path;
6import java.nio.file.Paths;
7import java.util.ArrayList;
8import java.util.HashMap;
9import java.util.List;
10import java.util.Map;
11import java.util.stream.Collectors;
12import java.util.stream.Stream;
13
14public class TextFileReader {
15    private static final String DIR_NAME = "input/";
16
17    public Map<String, String> readFiles() throws IOException {
18        Map<String, String> textFiles = new HashMap<>();
19        List<String> fileNames = getFileNames();
20
21        fileNames.forEach(item->{
22            BufferedReader br = null;
23            try {
24                br = new BufferedReader(new InputStreamReader(new FileInputStream(item)));
25                String line = null;
26                String content = "";
27                while ((line = br.readLine()) != null){
28                    content += line;
29                }
30                textFiles.put(item, content);
31                br.close();
32            } catch (FileNotFoundException e) {
33                e.printStackTrace();
34            } catch (IOException e) {
35                e.printStackTrace();
36            }
37        });
38
39        return textFiles;
40    }
41
42    private List<String> getFileNames(){
43        List<String> result = new ArrayList<>();
44        try (Stream<Path> walk = Files.walk(Paths.get(DIR_NAME))) {
45            result = walk.map(x -> x.toString())
46                    .filter(f -> f.endsWith(".txt")).collect(Collectors.toList());
47        } catch (IOException e) {
48            e.printStackTrace();
49        }
50        return result;
51    }
52}1package common;
2
3import java.io.BufferedReader;
4import java.io.FileInputStream;
5import java.io.IOException;
6import java.io.InputStreamReader;
7import java.util.ArrayList;
8
9public class StopwordReader {
10
11    public ArrayList<String> read() throws IOException {
12        ArrayList<String > stwList = new ArrayList<>();
13        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("src/common/stopwords_jp.txt")));
14        String line = null;
15        while ((line = br.readLine()) != null){
16            stwList.add(line);
17        }
18        br.close();
19        return stwList;
20    }
21}さいごに
MapReduce プログラミングモデルを実装し、日本語テキストドキュメントから単語の出現頻度を求めてみました!
また、MapReduce を使わずに簡単に求める方法も、併せてご紹介いたしました。
ご自身の環境に合わせて使い分けてみてください!
こちらの記事もオススメ!
ライトコードでは、エンジニアを積極採用中!
ライトコードでは、エンジニアを積極採用しています!社長と一杯しながらお話しする機会もご用意しております。そのほかカジュアル面談等もございますので、くわしくは採用情報をご確認ください。
採用情報へ
「好きを仕事にするエンジニア集団」の(株)ライトコードです! ライトコードは、福岡、東京、大阪、名古屋の4拠点で事業展開するIT企業です。 現在は、国内を代表する大手IT企業を取引先にもち、ITシステムの受託事業が中心。 いずれも直取引で、月間PV数1億を超えるWebサービスのシステム開発・運営、インフラの構築・運用に携わっています。 システム開発依頼・お見積もり大歓迎! また、現在「WEBエンジニア」「モバイルエンジニア」「営業」「WEBデザイナー」を積極採用中です! インターンや新卒採用も行っております。 以下よりご応募をお待ちしております! https://rightcode.co.jp/recruit







