駄文型

プログラミングとか英語とかの話題を中心にした至極ちゃらんぽらんな日記です。

ストリーム処理で文章内にある単語の出現頻度をカウントする(Apache Apex)

Stream

ストリーム処理エンジンである Apache Apex でストリーム処理を行うアプリケーションを作成したので、まとめておく。Apex についてはこちら:

koheikimura.hatenablog.com

元ネタ

Webinar: Building Your First Apache Apex Application - YouTube

必要なもの

環境

OS

$ cat /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=16.04
DISTRIB_CODENAME=xenial
DISTRIB_DESCRIPTION="Ubuntu 16.04.1 LTS"

Java

$ java -version
openjdk version "1.8.0_131"
OpenJDK Runtime Environment (build 1.8.0_131-8u131-b11-0ubuntu1.16.04.2-b11)
OpenJDK 64-Bit Server VM (build 25.131-b11, mixed mode)
$ javac -version
javac 1.8.0_131

git

$ git version
git version 2.7.4

Maven

$ mvn --version
Apache Maven 3.3.9
Maven home: /usr/share/maven
Java version: 1.8.0_131, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: en_US, platform encoding: ISO-8859-1
OS name: "linux", version: "4.4.0-57-generic", arch: "amd64", family: "unix"

Hadoop

$ hadoop version
Hadoop 2.8.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 91f2b7a13d1e97be65db92ddabc627cc29ac0009
Compiled by jdu on 2017-03-17T04:12Z
Compiled with protoc 2.5.0
From source with checksum 60125541c2b3e266cbf3becc5bda666
This command was run using /usr/local/hadoop/share/hadoop/common/hadoop-common-2.8.0.jar

git clone

まずは必要なリポジトリをクローンする。使用するのは apex-core , apex-malhar , DataTorrent/examples の3つ。ホームに apex ディレクトリを作ってそこで作業する。

$ mkdir apex
$ cd apex
$ git clone git@github.com:apache/apex-core.git
$ git clone git@github.com:apache/apex-malhar.git
$ git clone git@github.com:DataTorrent/examples.git

Apex インストー

mavenapex-coreapex-malhar をビルド・インストールします。 -DskipTests でテストの実行をスキップすれば(少し)早く完了する。時間がかかるので、しばらく待つ :coffee:

$ ls              
apex-core   apex-malhar examples
$ cd apex-core
$ mvn clean install -DskipTests
$ cd apex-malhar
$ mvn clean install -DskipTests

便利ツール

ビルドの完了を待っている間、ツールを準備しておく。 DataTorrent/examples の中に aliases というスクリプトがあるので、それを読み込んで使えるようにする。

$ source examples/tutorials/topnwords/scripts/aliases

aliases の中身は以下の通り。

# bash aliases and functions useful for working on input and out directories
#

# input and output directories
in=/tmp/test/input-dir out=/tmp/test/output-dir

# list files in input directory
alias ls-input="hdfs dfs -ls $in"

# list files in output directory
alias ls-output="hdfs dfs -ls $out"

# clean input directory
alias clean-input="hdfs dfs -rm $in/*"

# clean output directory
alias clean-output="hdfs dfs -rm $out/*"

# convenient alias to run dtcli from code repository
alias dtcli3="$HOME/src/incubator-apex-core/engine/src/main/scripts/dtcli"

# copy local file (argument) to input directory
function put-file ( ) {
    hdfs dfs -put "$1" "$in"
}

# make local copy of output file (argument) from output directory
function get-file ( ) {
    hdfs dfs -get "$out/$1" "$1".out
}

newapp~/apex にコピー呼び出しやすくしておく。

$ cp examples/tutorials/topnwords/scripts/newapp .

中身はただの mvn コマンドで、必要に応じて myapexapp のところを書き換えて使うことができる。

#!/bin/bash
# script to create a new project
 
# change project name and archetype version as needed
name=myapexapp
version=3.3.0-incubating

mvn -B archetype:generate \
  -DarchetypeGroupId=org.apache.apex \
  -DarchetypeArtifactId=apex-app-archetype \
  -DarchetypeVersion=$version  \
  -DgroupId=com.example \
  -Dpackage=com.example.$name \
  -DartifactId=$name \
  -Dversion=1.0-SNAPSHOT

プロジェクト作成

newapp を実行してプロジェクトを作成。 Y: : が出てきたら、そこでエンター(ッターン!)。

$ bash newapp
[INFO] Scanning for projects...
[INFO] 
[INFO] ------------------------------------------------------------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] ------------------------------------------------------------------------
[INFO] 
...
$ ls
apex-core   apex-malhar examples myapexapp newapp
$ cd myapexapp

プロジェクトの作成が完了したら、ソースコードを確認する。

$ find src -name "*.java"
src/main/java/com/example/myapexapp/Application.java
src/main/java/com/example/myapexapp/RandomNumberGenerator.java
src/test/java/com/example/myapexapp/ApplicationTest.java

今回ユニットテストは不要なのでポイしておく。こうすることでビルド時に -DskipTests を忘れても、テストが実行されない。今回は /tmp に退避させたが、もちろん削除してもいい。

$ mv src/test/java/com/example/myapexapp/ApplicationTest.java /tmp

サンプルコードを導入

DataTorrent/examples のコードを導入する。まずはソースのディレクトリに移動する。

$ cd src/main/java/com/example/myapexapp

tutorial/topnwordsソースコードを持ってくる。入力された文章内にある各単語の出現頻度を出力する。詳細は割愛。

$ ls   
Application.java           RandomNumberGenerator.java
$ ls ~/apex/examples/tutorials/topnwords/webinar/
ApplicationWordCount.java      WCPair.java                    WordReader.java
FileWordCount.java             WindowWordCount.java           properties-SortedWordCount.xml
LineReader.java                WordCountWriter.java
$ cp ~/apex/examples/tutorials/topnwords/webinar/*.java .
$ ls
Application.java           LineReader.java            WindowWordCount.java
ApplicationWordCount.java  RandomNumberGenerator.java WordCountWriter.java
FileWordCount.java         WCPair.java                WordReader.java

プロパティファイルも必要なので、それも resources にコピーする。

$ cd ../../../../resources/META-INF 
$ cp ~/apex/examples/tutorials/topnwords/webinar/properties-SortedWordCount.xml .
$ ls
properties-SortedWordCount.xml properties.xml

アプリケーションのビルド

アプリケーションのトップディレクトリに移動してビルドする。ビルド後、 target ディレクトリに myapexapp-1.0-SNAPSHOT.apa が作成される。

$ cd ~/apex/myapexapp
$ mvn clean package -DskipTests
$ ls target 
antrun                     generated-resources        maven-archiver             site
archive-tmp                generated-sources          maven-status               test-classes
classes                    generated-test-sources     myapexapp-1.0-SNAPSHOT.apa
deps                       javadoc-bundle-options     myapexapp-1.0-SNAPSHOT.jar

アプリケーションの実行

ビルドしたアプリケーションの実行には Apex の CLI ツール を使う。 apex-core の下のあるので、 myapexapp ディレクトリ内で実行する。

$ ../apex-core/engine/src/main/scripts/apex 
Apex CLI 3.7.0-SNAPSHOT 02.06.2017 @ 09:02:30 JST rev: 22feeed branch: master
apex> 

そこで launch target/myapexapp-1.0-SNAPSHOT.apa と打つと実行される。 MyFirstApplicationSortedWordCount のどちらを実行するか聞かれるので、2を選ぶとアプリケーションが実行され、 appId が発行される。

apex> launch target/myapexapp-1.0-SNAPSHOT.apa
  1. MyFirstApplication
  2. SortedWordCount
Choose application: 2
{"appId": "application_1496704660177_0001"}
apex (application_1496704660177_0001) > 

動作確認

まず hdfs dfs で入出力ディレクトリを作成する。

$ hdfs dfs -mkdir -p /tmp/test/input-dir
$ hdfs dfs -mkdir -p /tmp/test/output-dir

この時点ではまだなにもない。

$ ls-input
... 何も表示されない
$ ls-output
... 何も表示されない

aliaces で定義されている put-file を使って入力ディレクトリにテキストファイルを put する。今回は https://www.dummytextgenerator.com で英文を作成して dummy.txt として保存した。

$ ls -lh ~/*.txt
/home/user/dummy.txt
$ put-file ~/dummy.txt

ls-input すると、先ほどのテキストファイルと同じものがみつかる。

$ ls-input
Found 1 items
-rw-r--r--   1 vagrant supergroup      15167 2017-06-06 04:34 /tmp/test/input-dir/dummy.txt

ls-output すると、出力ファイルができているので、中身を確認する。

$ ls-output
Found 1 items
-rwxrwxrwx   1 vagrant supergroup       3028 2017-06-06 04:34 /tmp/test/output-dir/dummy.txt
$ hdfs dfs -cat /tmp/test/output-dir/dummy.txt | head
t : 75
you : 43
re : 29
sixth : 22
seasons : 22
creature : 22
every : 22
moved : 20
together : 20
subdue : 20

ちゃんと単語の出現頻度が出力されました!

Hadoop 第3版

Hadoop 第3版