• トップ
  • ブログ一覧
  • Amazon EMRのHadoop完全分散モードクラスタ上でApache Sparkを利用したアプリケーションを15円未満で実行してみた
  • Amazon EMRのHadoop完全分散モードクラスタ上でApache Sparkを利用したアプリケーションを15円未満で実行してみた

    広告メディア事業部広告メディア事業部
    2020.07.12

    IT技術

    Apache Spark を Amazon EMR 上で稼働させて、アプリを実行してみる

    前回は、Windows の環境上で Apache Spark をセットアップし、テキスト検索アプリを実行してみました。

    前回の記事はこちら

    featureImg2020.05.26HDFS + Apache Spark をインストールし、Javaでテキスト検索アプリを実行してみたApache Spark を HDFS 上で稼働させて、テキスト検索アプリを実行しようこのビッグデータの時代、膨大なデ...

    完全分散モードで Apache Spark を使う

    今回、Apache Spark を使うのは、ローカルの環境ではありません。

    Amazon EMR の Hadoop クラスター上で Apache Spark をセットアップし、wordcount アプリを実行してみましょう!

    これは「スタンドアローン」の Apache Spark ではなくて、「完全分散モード」の Apache Spark です。

    つまり、本物の Apache Spark というわけです。

    完全分散モードで Apache Spark を使うメリット

    完全分散モードで Apache Spark を使う場合、ローカル環境に比べて以下のメリットがあります。

    1. コストが安上がり
    2. セットアップの手順が簡単でスピーディー
    3. クラスタのメンテナンスが不要

    大まかな流れ

    大まかな手順は、以下のようになります。

    1. ローカル環境に Apache Spark をインストール
    2. ローカル環境で作成した Apache Spark を Maven でパッケージング
    3. Amazon S3 へアップロード
    4. Amazon EMR で Hadoop クラスタを構築
    5. クラスタを稼働させて、アプリを実行

    今回のローカルとクラウド環境

    今回のローカルとクラウド環境は以下の通りです。

    1. OS は macOS Catalina
    2. Java 8 と Homebrew はインストール済み
    3. IDE は、Intellij IDEA 2019.3.3 Community Edition
    4. AWS アカウントはすでに登録済み(本記事では、AWS の S3 及び EMR を利用)

    ローカル環境における Apache Spark のインストール

    それでは、Spark アプリケーションを作成していきましょう。

    まず、以下の手順に従って、macOS Catalina で Apache Spark をインストールしてください。

    xcode-select をインストール

    ターミナルで、以下のコマンドで「xcode-select」をインストールします。

    1$ install xcode-select
    2$ xcode-select -v
    3xcode-select version 2373.

    scala をインストール

    Homebrew を用いて、「scala」をインストールしましょう。

    1$ brew install scala
    2Updating Homebrew...
    3==> Auto-updated Homebrew!
    4...
    5==> Downloading https://downloads.lightbend.com/scala/2.13.2/scala-2.13.2.tgz
    6######################################################################## 100.0%
    7==> Caveats
    8To use with IntelliJ, set the Scala home to:
    9  /usr/local/opt/scala/idea
    10==> Summary
    11🍺  /usr/local/Cellar/scala/2.13.2: 41 files, 22.5MB, built in 24 seconds
    12
    13$ scala -version
    14Scala code runner version 2.13.2 -- Copyright 2002-2020, LAMP/EPFL and Lightbend, Inc.

    Apache Spark をインストール

    同じく、Homebrewを用いて、「Apache Spark」をインストールします。

    1$ brew install apache-spark
    2Updating Homebrew...
    3==> Downloading https://www.apache.org/dyn/closer.lua?path=spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
    4==> Downloading from https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
    5######################################################################## 100.0%
    6🍺  /usr/local/Cellar/apache-spark/2.4.5: 1,059 files, 250.9MB, built in 9 minutes 10 seconds
    7
    8$ spark-shell
    920/06/04 10:50:21 WARN Utils: Your hostname, Wayans-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.4 instead (on interface en0)
    1020/06/04 10:50:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
    1120/06/04 10:50:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    12Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    13Setting default log level to "WARN".
    14To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    15Spark context Web UI available at http://192.168.1.4:4040
    16Spark context available as 'sc' (master = local[*], app id = local-1591239030237).
    17Spark session available as 'spark'.
    18Welcome to
    19      ____              __
    20     / __/__  ___ _____/ /__
    21    _\ \/ _ \/ _ `/ __/  '_/
    22   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
    23      /_/
    24         
    25Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
    26Type in expressions to have them evaluated.
    27Type :help for more information.
    28
    29scala>

    インストール完了

    これで、Apache Spark が「/usr/local/Cellar/apache-spark/」ディレクトリーにインストールされました。

    ローカル環境で Sparkアプリケーションを作成し、Maven でパッケージング

    次は Intellij IDEA を用いて、Spark アプリケーションを作成し、Mavenでパッケージします。

    Intellij IDEA で Maven プロジェクトを作る

    Intellij IDEA で「新しいプロジェクト」を作り、「Mavenプロジェクト」を選択します。

    ここでは、「spark-local-mapred」と名前をつけました。

    プロジェクトの構成は、以下のようになっています。

    pom.xml ファイルを編集

    Maven プロジェクトの pom.xml ファイルを、次のように編集します。

    1<?xml version="1.0" encoding="UTF-8"?>
    2<project xmlns="http://maven.apache.org/POM/4.0.0"
    3         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5    <modelVersion>4.0.0</modelVersion>
    6
    7    <groupId>org.example</groupId>
    8    <artifactId>spark-local-mapred</artifactId>
    9    <version>1.0-SNAPSHOT</version>
    10
    11    <properties>
    12        <maven.compiler.target>1.8</maven.compiler.target>
    13        <maven.compiler.source>1.8</maven.compiler.source>
    14    </properties>
    15
    16    <dependencies>
    17        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    18        <dependency>
    19            <groupId>org.apache.spark</groupId>
    20            <artifactId>spark-core_2.11</artifactId>
    21            <version>2.4.5</version>
    22        </dependency>
    23        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    24        <dependency>
    25            <groupId>org.apache.spark</groupId>
    26            <artifactId>spark-sql_2.11</artifactId>
    27            <version>2.4.5</version>
    28        </dependency>
    29    </dependencies>
    30    
    31</project>

    input ディレクトリーを作る

    プロジェクトディレクトリー内に input ディレクトリーを作ります。

    入力ファイルとして「silicon-valley.txt」を input ディレクトリーに置いておきましょう。

    silicon-valley.txt は、下記からダウンロードできます。

    【silicon-valley.txt】
    https://github.com/wmwijaya/spark-local-mapred/blob/master/input/silicon-valley.txt

    resources ディレクトリーへ en_stopwords.txt をコピー

    プロジェクトの resources ディレクトリーへ、「en_stopwords.txt」をコピーします。

    en_stopwords.txt は、下記からダウンロードして下さい。

    【en_stopwords.txt】
    https://github.com/wmwijaya/spark-local-mapred/blob/master/src/main/resources/en_stopwords.txt

    WordCount.java を作成

    「src/main/java/」の「com.teknologibigdata.spark」パッケージで、「WordCount.java」を作成します。

    WordCount.java は、英文から単語の出現頻度を計算する Spark プログラムです。

    1package com.teknologibigdata.spark;
    2//WordCount.java
    3
    4import org.apache.spark.sql.*;
    5import scala.Tuple2;
    6import java.io.BufferedReader;
    7import java.io.IOException;
    8import java.io.InputStreamReader;
    9import java.util.ArrayList;
    10import java.util.Arrays;
    11import java.util.List;
    12import java.util.regex.Pattern;
    13
    14public class WordCount {
    15
    16    private static final Pattern SPACE = Pattern.compile(" ");
    17    public List<String> enStopwords = new ArrayList<>();
    18    public final SparkSession spark;
    19
    20    public WordCount() throws IOException {
    21        spark = SparkSession
    22                .builder()
    23                .appName("WordCount")
    24                //.master("local[1]")//ローカルで実行する場合、この行は必要です(uncommentして下さい)。
    25                .getOrCreate();
    26        readStopwords();
    27    }
    28
    29    private void readStopwords() throws IOException {
    30        BufferedReader bfr = new BufferedReader(
    31                new InputStreamReader(
    32                        WordCount.class.getResourceAsStream("/en_stopwords.txt")
    33                )
    34        );
    35
    36        String line = null;
    37        while ((line = bfr.readLine()) != null) {
    38            enStopwords.add(line);
    39        }
    40    }
    41
    42    public static void main(String[] args) throws IOException {
    43
    44        if (args.length < 2) {
    45            System.err.println("Usage: JavaWordCount <inputFile> <outputFile>");
    46            System.exit(1);
    47        }
    48
    49        WordCount wc = new WordCount();
    50        List<String> stopwords = wc.enStopwords;
    51        Dataset<String> textDf = wc.spark.read().textFile(args[0]);
    52        textDf.show(10);
    53
    54        Dataset<Row> wordCount = textDf
    55                .flatMap(line -> Arrays.asList(SPACE.split(line.replaceAll("\\W", " "))).iterator(), Encoders.STRING())
    56                .filter(str -> !str.isEmpty())
    57                .filter(str->!stopwords.contains(str)) //stopwordsを入力テキストから排除
    58                .map(word -> new Tuple2<>(word.toLowerCase(), 1L), Encoders.tuple(Encoders.STRING(), Encoders.LONG()))
    59                .toDF("word", "one") //テキストからDataframeを作り、wordフィルドにテキストから抽出した単語、oneフィルドに数字の1を格納
    60                .groupBy("word")
    61                .sum("one").orderBy(new Column("sum(one)").desc())
    62                .withColumnRenamed("sum(one)", "count");
    63
    64        wordCount.show(10);
    65        wordCount.write().format("csv").save(args[1]);
    66    }
    67}

    引数を設定する

    プログラムの引数を「input/silicon-valley.txt output」と設定し、実行してみましょう。

    エラーがなければ、以下のような結果になります。

    これで、WordCount.java が正常に動作することを確認しました。

    パッケージング

    次は、「mvn clean package」コマンドでパッケージしましょう。

    生成された jar ファイルが、プロジェクトの「target」ディレクトリーに格納されます。

    この jar ファイルの名前を編集してください。

    ここでは、「spark-wordcount-1.0.jar」としておきます。

    これで、Spark アプリケーションのパッケージングは完了です。

    Amazon S3 への Spark アプリケーションのアップロード

    Maven でパッケージングした Spark アプリケーションを実行するには、Amazon S3 へアップロードしなければなりません。

    アプリの実行結果も、「Amazon S3」 へ書き込まれます。

    新しいバケットを作成

    まず、Amazon S3 のコンソールで、新しいバケットを作成します。

    ここでは、「tekbig-spark」としておきます。

    「apps」「input」フォルダーを作成

    次に、「tekbig-spark」のバケット内に「apps」及び「input」というフォルダーを作ります。

    アプリとテキストファイルをアップロード

    Spark アプリケーション「spark-wordcount-1.0.jar」を、「apps」フォルダーにアップロードします。

    入力テキストファイル「silicon-valley.txt」は、「input」フォルダーにアップロードしましょう。

    Amazon EMR クラスタの構築及び Spark アプリケーションの実行

    続いて、Amazon EMR で Hadoop クラスタを構築していきます。

    ローカル環境での Hadoop クラスタの構築に比べたら、非常にシンプルです。

    新しいクラスタを作成

    Amazon EMR のコンソールで「Create cluster」をクリックして、新しいクラスタを作成しましょう。

    ソフトウェアの設定

    「Go to advanced options」をクリックし、ソフトウェアの設定を行います。

    ここでは、「Hadoop」と「Spark」のみ選択します。

    Step type を設定

    「Step type」のドロップダウンメニューから「Spark application」を選択し、「Add step」をクリックします。

    Spark application の設定

    新しくダイアログボックスが表示されるので、「Spark application」の設定を行います。

    Name

    わかりやすい名前をつけましょう。

    ここでは、「Spark application」としました。

    Deploy mode

    Cluster」と入力します。

    Spark-submit options

    Spark アプリケーションの main 関数を含むパッケージ及びクラス名を指定します。

    ここでは「--class com.teknologibigdata.spark.WordCount」となります。

    Application location

    Amazon S3 にアップロードした spark-wordcount-1.0.jar の path を選択します。

    Arguments

    プログラムの引数を入力します。

    この場合は、Amazon S3 における入力ファイル「silicon-valley.txt」と出力フォルダ「output」の pathを入力します。

    つまり、「s3://tekbig-spark/input/silicon-valley.txt と s3://tekbig-spark/output」です。

    output フォルダは、プログラムによって自動的に生成されます。

    Action on failure

    Terminate cluser」を選択します。

    設定完了

    最後に、「Add」をクリックしてから、「Next」をクリックします。

    ハードウェアの設定

    ハードウェア設定で、利用したいマシーンのスペックが選べます。

    今回は「m4.large (4 vCore、8 GiB RAM、32 GiB Storage) 」タイプのマシーンを3台利用します。

    つまり、構築するクラスタは「マスターノード」1台と「コアノード」2台から成り立っています。

    General Cluster Settings を編集

    続いて、General Cluster Settings を編集します。

    Cluster name

    クラスタの名前を入力します。

    ここでは、「TekbigSparkCluster」としました。

    Logging

    Logging チェックボックスをチェックし、「Amazon S3 バケット」を選択します。

    これで、Amazon S3 バケットにクラスタのログが記録されます。

    他の設定はデフォルトのままいじらず、「Next」をクリックします。

    Security Options の設定

    次に、Security Options を設定します。

    EC2 key pair

    Proceed without an EC2 key pair」を選択します。

    残りの設定は、デフォルトのままいじらず、「Create cluster」をクリックします。

    クラスタのステータスを確認

    Starting

    しばらくしたら、クラスタの構築が完成し、自動的にスタートします。

    ここで、「Starting」というステータスが表示されます。

    Running

    その次が、「Running」状態です。

    この時に、Spark アプリケーションが実行され、入力ファイルが処理されます。

    Waiting

    上記の処理が完了したら、クラスタは「Waiting」状態に移ります。

    この状態になったら、「Terminate」をクリックしましょう。

    Terminate

    Terminate」をクリックすると、クラスタが停止します。

    クラスタを停止させない限り、料金がかかるので注意してください。

    処理結果を確認

    Spark アプリケーションの処理結果を見てみましょう。

    Amazon S3 の「tekbig-spark」バケットを確認します。

    現在、「tekbig-spark」バケットには3つのフォルダがあります。

    すなわち、「apps」「input」「output」フォルダです。

    output フォルダを確認

    output」フォルダは、Spark アプリケーションの処理結果を格納するため、自動的に生成されます。

    この output フォルダを開くと、多数の CSV ファイルが存在します。

    これらのファイルには、入力テキストファイルに含まれている「全ての単語」と「出現頻度」が書き込まれています。

    さいごに

    わずか10円で Spark アプリケーションが実行できる

    これで、Spark アプリケーションを EMR の Hadoop 完全分散モードクラスタ上で実行できました。

    AWS のアカウントページで、利用料金を確認したところ、なんとたったの10.8円でした!

    Apache Spark への注目が高まっている

    今、Apache Spark はビッグデータ分析エンジンとして非常に注目されています。

    「Infoworld.com」によれば、Apache Sparkは、今や主要なプラットフォームとなっています。

    また、「Databricks.com」によると、現在、世界で250以上の企業において、1000人以上が Apache Spark プロジェクトに携わっているとのことです。

    【What is Apache Spark? The big data platform that crushed Hadoop】
    https://www.infoworld.com/article/3236869/what-is-apache-spark-the-big-data-platform-that-crushed-hadoop.html

    【5 Reasons to Become an Apache Spark Expert】
    https://databricks.com/blog/2019/01/15/5-reasons-to-become-an-apache-spark-expert.html

    Apache Spark のメリット

    では、なぜ Apache Spark は、こんなにも盛んに採用されているのでしょうか?

    それは、Apache Spark が以下のようなメリットを持っているからです。

    1. 開発者にとって使いやすい
    2. 高速
    3. 柔軟性がある

    あなたも Spark アプリケーションを作ってみませんか?

    Apache Spark は非常に優れた分析ツールです。

    今回紹介した Spark アプリケーションは、比較的シンプルな上、低コストで作れます。

    ぜひ、あなたも Spark アプリケーションに挑戦してみてください!

    こちらの記事もオススメ!

    featureImg2020.07.28Java 特集実装編※最新記事順に並べています。Amazon EMRのHadoop完全分散モードクラスタ上でApache Spark...

    featureImg2020.07.17ライトコード的「やってみた!」シリーズ「やってみた!」を集めました!(株)ライトコードが今まで作ってきた「やってみた!」記事を集めてみました!※作成日が新し...

    ライトコードでは、エンジニアを積極採用中!

    ライトコードでは、エンジニアを積極採用しています!社長と一杯しながらお話しする機会もご用意しております。そのほかカジュアル面談等もございますので、くわしくは採用情報をご確認ください。

    採用情報へ

    広告メディア事業部
    広告メディア事業部
    Show more...

    おすすめ記事

    エンジニア大募集中!

    ライトコードでは、エンジニアを積極採用中です。

    特に、WEBエンジニアとモバイルエンジニアは是非ご応募お待ちしております!

    また、フリーランスエンジニア様も大募集中です。

    background