Amazon EMRのHadoop完全分散モードクラスタ上でApache Sparkを利用したアプリケーションを15円未満で実行してみた
IT技術
Apache Spark を Amazon EMR 上で稼働させて、アプリを実行してみる
前回は、Windows の環境上で Apache Spark をセットアップし、テキスト検索アプリを実行してみました。
前回の記事はこちら
完全分散モードで Apache Spark を使う
今回、Apache Spark を使うのは、ローカルの環境ではありません。
Amazon EMR の Hadoop クラスター上で Apache Spark をセットアップし、wordcount アプリを実行してみましょう!
これは「スタンドアローン」の Apache Spark ではなくて、「完全分散モード」の Apache Spark です。
つまり、本物の Apache Spark というわけです。
完全分散モードで Apache Spark を使うメリット
完全分散モードで Apache Spark を使う場合、ローカル環境に比べて以下のメリットがあります。
- コストが安上がり
- セットアップの手順が簡単でスピーディー
- クラスタのメンテナンスが不要
大まかな流れ
大まかな手順は、以下のようになります。
- ローカル環境に Apache Spark をインストール
- ローカル環境で作成した Apache Spark を Maven でパッケージング
- Amazon S3 へアップロード
- Amazon EMR で Hadoop クラスタを構築
- クラスタを稼働させて、アプリを実行
今回のローカルとクラウド環境
今回のローカルとクラウド環境は以下の通りです。
- OS は macOS Catalina
- Java 8 と Homebrew はインストール済み
- IDE は、Intellij IDEA 2019.3.3 Community Edition
- AWS アカウントはすでに登録済み(本記事では、AWS の S3 及び EMR を利用)
ローカル環境における Apache Spark のインストール
それでは、Spark アプリケーションを作成していきましょう。
まず、以下の手順に従って、macOS Catalina で Apache Spark をインストールしてください。
xcode-select をインストール
ターミナルで、以下のコマンドで「xcode-select」をインストールします。
1$ install xcode-select
2$ xcode-select -v
3xcode-select version 2373.
scala をインストール
Homebrew を用いて、「scala」をインストールしましょう。
1$ brew install scala
2Updating Homebrew...
3==> Auto-updated Homebrew!
4...
5==> Downloading https://downloads.lightbend.com/scala/2.13.2/scala-2.13.2.tgz
6######################################################################## 100.0%
7==> Caveats
8To use with IntelliJ, set the Scala home to:
9 /usr/local/opt/scala/idea
10==> Summary
11🍺 /usr/local/Cellar/scala/2.13.2: 41 files, 22.5MB, built in 24 seconds
12
13$ scala -version
14Scala code runner version 2.13.2 -- Copyright 2002-2020, LAMP/EPFL and Lightbend, Inc.
Apache Spark をインストール
同じく、Homebrewを用いて、「Apache Spark」をインストールします。
1$ brew install apache-spark
2Updating Homebrew...
3==> Downloading https://www.apache.org/dyn/closer.lua?path=spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
4==> Downloading from https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
5######################################################################## 100.0%
6🍺 /usr/local/Cellar/apache-spark/2.4.5: 1,059 files, 250.9MB, built in 9 minutes 10 seconds
7
8$ spark-shell
920/06/04 10:50:21 WARN Utils: Your hostname, Wayans-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.4 instead (on interface en0)
1020/06/04 10:50:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
1120/06/04 10:50:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
13Setting default log level to "WARN".
14To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
15Spark context Web UI available at http://192.168.1.4:4040
16Spark context available as 'sc' (master = local[*], app id = local-1591239030237).
17Spark session available as 'spark'.
18Welcome to
19 ____ __
20 / __/__ ___ _____/ /__
21 _\ \/ _ \/ _ `/ __/ '_/
22 /___/ .__/\_,_/_/ /_/\_\ version 2.4.5
23 /_/
24
25Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
26Type in expressions to have them evaluated.
27Type :help for more information.
28
29scala>
インストール完了
これで、Apache Spark が「/usr/local/Cellar/apache-spark/」ディレクトリーにインストールされました。
ローカル環境で Sparkアプリケーションを作成し、Maven でパッケージング
次は Intellij IDEA を用いて、Spark アプリケーションを作成し、Mavenでパッケージします。
Intellij IDEA で Maven プロジェクトを作る
Intellij IDEA で「新しいプロジェクト」を作り、「Mavenプロジェクト」を選択します。
ここでは、「spark-local-mapred」と名前をつけました。
プロジェクトの構成は、以下のようになっています。
pom.xml ファイルを編集
Maven プロジェクトの pom.xml ファイルを、次のように編集します。
1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion>
6
7 <groupId>org.example</groupId>
8 <artifactId>spark-local-mapred</artifactId>
9 <version>1.0-SNAPSHOT</version>
10
11 <properties>
12 <maven.compiler.target>1.8</maven.compiler.target>
13 <maven.compiler.source>1.8</maven.compiler.source>
14 </properties>
15
16 <dependencies>
17 <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
18 <dependency>
19 <groupId>org.apache.spark</groupId>
20 <artifactId>spark-core_2.11</artifactId>
21 <version>2.4.5</version>
22 </dependency>
23 <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
24 <dependency>
25 <groupId>org.apache.spark</groupId>
26 <artifactId>spark-sql_2.11</artifactId>
27 <version>2.4.5</version>
28 </dependency>
29 </dependencies>
30
31</project>
input ディレクトリーを作る
プロジェクトディレクトリー内に input ディレクトリーを作ります。
入力ファイルとして「silicon-valley.txt」を input ディレクトリーに置いておきましょう。
silicon-valley.txt は、下記からダウンロードできます。
【silicon-valley.txt】
https://github.com/wmwijaya/spark-local-mapred/blob/master/input/silicon-valley.txt
resources ディレクトリーへ en_stopwords.txt をコピー
プロジェクトの resources ディレクトリーへ、「en_stopwords.txt」をコピーします。
en_stopwords.txt は、下記からダウンロードして下さい。
【en_stopwords.txt】
https://github.com/wmwijaya/spark-local-mapred/blob/master/src/main/resources/en_stopwords.txt
WordCount.java を作成
「src/main/java/」の「com.teknologibigdata.spark」パッケージで、「WordCount.java」を作成します。
WordCount.java は、英文から単語の出現頻度を計算する Spark プログラムです。
1package com.teknologibigdata.spark;
2//WordCount.java
3
4import org.apache.spark.sql.*;
5import scala.Tuple2;
6import java.io.BufferedReader;
7import java.io.IOException;
8import java.io.InputStreamReader;
9import java.util.ArrayList;
10import java.util.Arrays;
11import java.util.List;
12import java.util.regex.Pattern;
13
14public class WordCount {
15
16 private static final Pattern SPACE = Pattern.compile(" ");
17 public List<String> enStopwords = new ArrayList<>();
18 public final SparkSession spark;
19
20 public WordCount() throws IOException {
21 spark = SparkSession
22 .builder()
23 .appName("WordCount")
24 //.master("local[1]")//ローカルで実行する場合、この行は必要です(uncommentして下さい)。
25 .getOrCreate();
26 readStopwords();
27 }
28
29 private void readStopwords() throws IOException {
30 BufferedReader bfr = new BufferedReader(
31 new InputStreamReader(
32 WordCount.class.getResourceAsStream("/en_stopwords.txt")
33 )
34 );
35
36 String line = null;
37 while ((line = bfr.readLine()) != null) {
38 enStopwords.add(line);
39 }
40 }
41
42 public static void main(String[] args) throws IOException {
43
44 if (args.length < 2) {
45 System.err.println("Usage: JavaWordCount <inputFile> <outputFile>");
46 System.exit(1);
47 }
48
49 WordCount wc = new WordCount();
50 List<String> stopwords = wc.enStopwords;
51 Dataset<String> textDf = wc.spark.read().textFile(args[0]);
52 textDf.show(10);
53
54 Dataset<Row> wordCount = textDf
55 .flatMap(line -> Arrays.asList(SPACE.split(line.replaceAll("\\W", " "))).iterator(), Encoders.STRING())
56 .filter(str -> !str.isEmpty())
57 .filter(str->!stopwords.contains(str)) //stopwordsを入力テキストから排除
58 .map(word -> new Tuple2<>(word.toLowerCase(), 1L), Encoders.tuple(Encoders.STRING(), Encoders.LONG()))
59 .toDF("word", "one") //テキストからDataframeを作り、wordフィルドにテキストから抽出した単語、oneフィルドに数字の1を格納
60 .groupBy("word")
61 .sum("one").orderBy(new Column("sum(one)").desc())
62 .withColumnRenamed("sum(one)", "count");
63
64 wordCount.show(10);
65 wordCount.write().format("csv").save(args[1]);
66 }
67}
引数を設定する
プログラムの引数を「input/silicon-valley.txt output」と設定し、実行してみましょう。
エラーがなければ、以下のような結果になります。
これで、WordCount.java が正常に動作することを確認しました。
パッケージング
次は、「mvn clean package」コマンドでパッケージしましょう。
生成された jar ファイルが、プロジェクトの「target」ディレクトリーに格納されます。
この jar ファイルの名前を編集してください。
ここでは、「spark-wordcount-1.0.jar」としておきます。
これで、Spark アプリケーションのパッケージングは完了です。
Amazon S3 への Spark アプリケーションのアップロード
Maven でパッケージングした Spark アプリケーションを実行するには、Amazon S3 へアップロードしなければなりません。
アプリの実行結果も、「Amazon S3」 へ書き込まれます。
新しいバケットを作成
まず、Amazon S3 のコンソールで、新しいバケットを作成します。
ここでは、「tekbig-spark」としておきます。
「apps」「input」フォルダーを作成
次に、「tekbig-spark」のバケット内に「apps」及び「input」というフォルダーを作ります。
アプリとテキストファイルをアップロード
Spark アプリケーション「spark-wordcount-1.0.jar」を、「apps」フォルダーにアップロードします。
入力テキストファイル「silicon-valley.txt」は、「input」フォルダーにアップロードしましょう。
Amazon EMR クラスタの構築及び Spark アプリケーションの実行
続いて、Amazon EMR で Hadoop クラスタを構築していきます。
ローカル環境での Hadoop クラスタの構築に比べたら、非常にシンプルです。
新しいクラスタを作成
Amazon EMR のコンソールで「Create cluster」をクリックして、新しいクラスタを作成しましょう。
ソフトウェアの設定
「Go to advanced options」をクリックし、ソフトウェアの設定を行います。
ここでは、「Hadoop」と「Spark」のみ選択します。
Step type を設定
「Step type」のドロップダウンメニューから「Spark application」を選択し、「Add step」をクリックします。
Spark application の設定
新しくダイアログボックスが表示されるので、「Spark application」の設定を行います。
Name
わかりやすい名前をつけましょう。
ここでは、「Spark application」としました。
Deploy mode
「Cluster」と入力します。
Spark-submit options
Spark アプリケーションの main 関数を含むパッケージ及びクラス名を指定します。
ここでは「--class com.teknologibigdata.spark.WordCount」となります。
Application location
Amazon S3 にアップロードした spark-wordcount-1.0.jar の path を選択します。
Arguments
プログラムの引数を入力します。
この場合は、Amazon S3 における入力ファイル「silicon-valley.txt」と出力フォルダ「output」の pathを入力します。
つまり、「s3://tekbig-spark/input/silicon-valley.txt と s3://tekbig-spark/output」です。
output フォルダは、プログラムによって自動的に生成されます。
Action on failure
「Terminate cluser」を選択します。
設定完了
最後に、「Add」をクリックしてから、「Next」をクリックします。
ハードウェアの設定
ハードウェア設定で、利用したいマシーンのスペックが選べます。
今回は「m4.large (4 vCore、8 GiB RAM、32 GiB Storage) 」タイプのマシーンを3台利用します。
つまり、構築するクラスタは「マスターノード」1台と「コアノード」2台から成り立っています。
General Cluster Settings を編集
続いて、General Cluster Settings を編集します。
Cluster name
クラスタの名前を入力します。
ここでは、「TekbigSparkCluster」としました。
Logging
Logging チェックボックスをチェックし、「Amazon S3 バケット」を選択します。
これで、Amazon S3 バケットにクラスタのログが記録されます。
他の設定はデフォルトのままいじらず、「Next」をクリックします。
Security Options の設定
次に、Security Options を設定します。
EC2 key pair
「Proceed without an EC2 key pair」を選択します。
残りの設定は、デフォルトのままいじらず、「Create cluster」をクリックします。
クラスタのステータスを確認
Starting
しばらくしたら、クラスタの構築が完成し、自動的にスタートします。
ここで、「Starting」というステータスが表示されます。
Running
その次が、「Running」状態です。
この時に、Spark アプリケーションが実行され、入力ファイルが処理されます。
Waiting
上記の処理が完了したら、クラスタは「Waiting」状態に移ります。
この状態になったら、「Terminate」をクリックしましょう。
Terminate
「Terminate」をクリックすると、クラスタが停止します。
クラスタを停止させない限り、料金がかかるので注意してください。
処理結果を確認
Spark アプリケーションの処理結果を見てみましょう。
Amazon S3 の「tekbig-spark」バケットを確認します。
現在、「tekbig-spark」バケットには3つのフォルダがあります。
すなわち、「apps」「input」「output」フォルダです。
output フォルダを確認
「output」フォルダは、Spark アプリケーションの処理結果を格納するため、自動的に生成されます。
この output フォルダを開くと、多数の CSV ファイルが存在します。
これらのファイルには、入力テキストファイルに含まれている「全ての単語」と「出現頻度」が書き込まれています。
さいごに
わずか10円で Spark アプリケーションが実行できる
これで、Spark アプリケーションを EMR の Hadoop 完全分散モードクラスタ上で実行できました。
AWS のアカウントページで、利用料金を確認したところ、なんとたったの10.8円でした!
Apache Spark への注目が高まっている
今、Apache Spark はビッグデータ分析エンジンとして非常に注目されています。
「Infoworld.com」によれば、Apache Sparkは、今や主要なプラットフォームとなっています。
また、「Databricks.com」によると、現在、世界で250以上の企業において、1000人以上が Apache Spark プロジェクトに携わっているとのことです。
【What is Apache Spark? The big data platform that crushed Hadoop】
https://www.infoworld.com/article/3236869/what-is-apache-spark-the-big-data-platform-that-crushed-hadoop.html
【5 Reasons to Become an Apache Spark Expert】
https://databricks.com/blog/2019/01/15/5-reasons-to-become-an-apache-spark-expert.html
Apache Spark のメリット
では、なぜ Apache Spark は、こんなにも盛んに採用されているのでしょうか?
それは、Apache Spark が以下のようなメリットを持っているからです。
- 開発者にとって使いやすい
- 高速
- 柔軟性がある
あなたも Spark アプリケーションを作ってみませんか?
Apache Spark は非常に優れた分析ツールです。
今回紹介した Spark アプリケーションは、比較的シンプルな上、低コストで作れます。
ぜひ、あなたも Spark アプリケーションに挑戦してみてください!
こちらの記事もオススメ!
2020.07.28Java 特集実装編※最新記事順に並べています。Amazon EMRのHadoop完全分散モードクラスタ上でApache Spark...
2020.07.17ライトコード的「やってみた!」シリーズ「やってみた!」を集めました!(株)ライトコードが今まで作ってきた「やってみた!」記事を集めてみました!※作成日が新し...
ライトコードでは、エンジニアを積極採用中!
ライトコードでは、エンジニアを積極採用しています!社長と一杯しながらお話しする機会もご用意しております。そのほかカジュアル面談等もございますので、くわしくは採用情報をご確認ください。
採用情報へ
「好きを仕事にするエンジニア集団」の(株)ライトコードです! ライトコードは、福岡、東京、大阪、名古屋の4拠点で事業展開するIT企業です。 現在は、国内を代表する大手IT企業を取引先にもち、ITシステムの受託事業が中心。 いずれも直取引で、月間PV数1億を超えるWebサービスのシステム開発・運営、インフラの構築・運用に携わっています。 システム開発依頼・お見積もり大歓迎! また、現在「WEBエンジニア」「モバイルエンジニア」「営業」「WEBデザイナー」を積極採用中です! インターンや新卒採用も行っております。 以下よりご応募をお待ちしております! https://rightcode.co.jp/recruit