駄文型

プログラミングとか英語とかの話題を中心にした至極ちゃらんぽらんな日記です。

ストリーム処理で文章内にある単語の出現頻度をカウントする(Apache Apex) その2 -コーディング編-

ストリーム処理で文章内にある単語の出現頻度をカウントする(Apache Apex) - 駄文型 の続き。前回は環境構築や実行のみを行い、サンプルコードの中身までは触れていない。そこで今回はサンプルコードを読み解き、 Apex ストリーミングアプリケーション開発の雰囲気を掴んでいく。

概要

サンプルコードのファイルは以下の7つ。

topnwords - GitHub

参考

ApplicationWordCount

まずはアプリケーション本体。やっていることは単純で、オペレーターを作成し、それらをストリームで繋いで DAG を作っているだけ。

package com.example.myapexapp;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;

import org.apache.hadoop.conf.Configuration;

@ApplicationAnnotation(name="SortedWordCount")
public class ApplicationWordCount implements StreamingApplication
{
  private static final Logger LOG = LoggerFactory.getLogger(ApplicationWordCount.class);

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    // create operators

    LineReader lineReader            = dag.addOperator("lineReader", new LineReader());
    WordReader wordReader            = dag.addOperator("wordReader", new WordReader());
    WindowWordCount windowWordCount  = dag.addOperator("windowWordCount", new WindowWordCount());
    FileWordCount fileWordCount      = dag.addOperator("fileWordCount", new FileWordCount());
    WordCountWriter wcWriter         = dag.addOperator("wcWriter", new WordCountWriter());

    // create streams

    dag.addStream("lines",   lineReader.output,  wordReader.input);
    dag.addStream("control", lineReader.control, fileWordCount.control);
    dag.addStream("words",   wordReader.output,  windowWordCount.input);
    dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input);
    dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input);
  }

}

@ApplicationAnnotation(name="SortedWordCount") でアプリケーション名を設定できる。これは前回実行時に出てきた名称と一致する。

apex> launch target/myapexapp-1.0-SNAPSHOT.apa
  1. MyFirstApplication
  2. SortedWordCount     # これ
Choose application: 2
{"appId": "application_1496704660177_0001"}
apex (application_1496704660177_0001) > 

各オペレーターとストリームの関係を図示すると下の図のようになる。ほぼ一直線だが、 lineReader から fileWordCountcontrol というストリームが飛んでいる。 Apex ではこのようにオペレーターを繋いで DAG をつくりことでアプリケーションを組むことができる。オペレーターの接続点をポートと呼ぶ。

f:id:koheikimura:20170622075628j:plain

LineReader

先頭 lineReader オペレーターから見ていく。このオペレーターは、ファイルを読み込み、中身を output ポートに流し、ファイルパスを control に流す。

package com.example.myapexapp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import org.apache.hadoop.fs.Path;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;

// reads lines from input file and returns them; if end-of-file is reached, a control tuple
// is emitted on the control port
//
public class LineReader extends AbstractFileInputOperator<String>
{
  private static final Logger LOG = LoggerFactory.getLogger(LineReader.class);

  public final transient DefaultOutputPort<String> output  = new DefaultOutputPort<>();

  @OutputPortFieldAnnotation(optional = true)
  public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>();

  private transient BufferedReader br = null;

  private Path path;

  @Override
  protected InputStream openFile(Path curPath) throws IOException
  {
    LOG.info("openFile: curPath = {}", curPath);
    path = curPath;
    InputStream is = super.openFile(path);
    br = new BufferedReader(new InputStreamReader(is));
    return is;
  }

  @Override
  protected void closeFile(InputStream is) throws IOException
  {
    super.closeFile(is);
    br.close();
    br = null;
    path = null;
  }

  // return empty string 
  @Override
  protected String readEntity() throws IOException
  {
    // try to read a line
    final String line = br.readLine();
    if (null != line) {    // common case
      LOG.debug("readEntity: line = {}", line);
      return line;
    }

    // end-of-file; send control tuple, containing only the last component of the path
    // (only file name) on control port
    //
    if (control.isConnected()) {
      LOG.info("readEntity: EOF for {}", path);
      final String name = path.getName();    // final component of path
      control.emit(name);
    }

    return null;
  }

  @Override
  protected void emit(String tuple)
  {
    output.emit(tuple);
  }
}

メンバー変数

このオペレータークラスのメンバは以下の5つ。

  • LOG: ロガー
  • output: アウトプットポート
  • control: アウトプットポート
  • br: バッファ
  • path: ファイルのパス

特に説明は不要だが、 control に対して

@OutputPortFieldAnnotation(optional = true)

というアノテーションが追加されている。オプショナルが true になっている。何故オプショナルにしているのかはよくわからない。わかったら追記します。

メソッド

LineReader では openFile , closeFile , readEntity , emit というメソッドが定義されており、それぞれでファイルを開いた時の処理、閉じたときの処理、開いたファイルから次のタプルを読み込む処理、タプルへの処理を記述している。

readEntitiy では、バッファから1行読み込み、それを返している。 EOF まで読み込み終わったら contorl ポートにファイル名を流し、ストリームを終了させている。戻り値として返したものが次のタプルになり、 emit メソッドで処理される。 null を返すとストリームが終了する。

  // return empty string 
  @Override
  protected String readEntity() throws IOException
  {
    // try to read a line
    final String line = br.readLine();
    if (null != line) {    // common case
      LOG.debug("readEntity: line = {}", line);
      return line;
    }

    // end-of-file; send control tuple, containing only the last component of the path
    // (only file name) on control port
    //
    if (control.isConnected()) {
      LOG.info("readEntity: EOF for {}", path);
      final String name = path.getName();    // final component of path
      control.emit(name);
    }

    return null;
  }

WordReader

WordReader は、 LineReader から送られてきた1行分のデータを単語に分割している。

package com.example.myapexapp;

import java.util.regex.Pattern;

import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

// extracts words from input line
public class WordReader extends BaseOperator
{
  // default pattern for word-separators
  private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+");

  private String nonWordStr;    // configurable regex
  private transient Pattern nonWord;      // compiled regex

  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();

  public final transient DefaultInputPort<String>
    input = new DefaultInputPort<String>() {

    @Override
    public void process(String line)
    {
      // line; split it into words and emit them
      final String[] words = nonWord.split(line);
      for (String word : words) {
        if (word.isEmpty()) continue;
        output.emit(word);
      }
    }
  };

  public String getNonWordStr() {
    return nonWordStr;
  }

  public void setNonWordStr(String regex) {
    nonWordStr = regex;
  }

  @Override
  public void setup(OperatorContext context)
  {
    if (null == nonWordStr) {
      nonWord = nonWordDefault;
    } else {
      nonWord = Pattern.compile(nonWordStr);
    }
  }

}

メンバー変数

  • nonWordDefault: デフォルトの単語分割パターン
  • nonWordStr: 単語分割用の正規表現
  • nonWord: 単語分割パターン
  • output: アウトプットポート
  • input: インプットポート

これもほぼ説明は不要かと思うが、インプットポートはアウトプットポート異なり単に new するだけでなく、なにやら処理が記述されている。

  public final transient DefaultInputPort<String>
    input = new DefaultInputPort<String>() {

    @Override
    public void process(String line)
    {
      // line; split it into words and emit them
      final String[] words = nonWord.split(line);
      for (String word : words) {
        if (word.isEmpty()) continue;
        output.emit(word);
      }
    }
  };

インプットポートには、入力されたデータをどう処理するかを記述する。これは他のオペレーターも同様で、 WordReader 以降の FileWordCount までのオペレーターも同様にインプットポートごとに process メソッドが定義されている。ここでは、入力データをパターンで単語に分割して、アウトプットポートに流している。

メソッド

  • getNonWordStr
  • setNonWordStr
  • setup

getNonWordStrsetNonWordStr はただの getter と setter 。 setup はセットアップ時に呼び出され、 nonWord の設定を行っている。

  @Override
  public void setup(OperatorContext context)
  {
    if (null == nonWordStr) {
      nonWord = nonWordDefault;
    } else {
      nonWord = Pattern.compile(nonWordStr);
    }
  }

WindowWordCount

WindowWordCount は、 WordReader から送られる単語をウィンドウ内で集計して単語と出現頻度のペアにまとめる。

package com.example.myapexapp;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

// Computes word frequency counts per window and emits them at each endWindow. The output is a
// list of pairs (word, frequency).
//
public class WindowWordCount extends BaseOperator
{
  private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class);

  // wordMap : word => frequency
  protected Map<String, WCPair> wordMap = new HashMap<>();

  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
  {
    @Override
    public void process(String word)
    {
      WCPair pair = wordMap.get(word);
      if (null != pair) {    // word seen previously
        pair.freq += 1;
        return;
      }

      // new word
      pair = new WCPair();
      pair.word = word;
      pair.freq = 1;
      wordMap.put(word, pair);
    }
  };

  // output port which emits the list of word frequencies for current window
  // fileName => list of (word, freq) pairs
  //
  public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>();

  @Override
  public void endWindow()
  {
    LOG.info("WindowWordCount: endWindow");

    // got EOF; if no words found, do nothing
    if (wordMap.isEmpty()) return;

    // have some words; emit single map and reset for next file
    final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
    output.emit(list);
    list.clear();
    wordMap.clear();
  }

}

メンバー変数

  • LOG: ロガー
  • wordMap: 単語と出現頻度のペアをマップにまとめたもの
  • input: インプットポート
  • output: アウトプットポート

wordMap 単語と頻度のペアは WCPair として WCPair.java で定義されている。

package com.example.myapexapp;

// a single (word, frequency) pair
public class WCPair {
  public String word;
  public int freq;

  public WCPair() {}

  public WCPair(String w, int f) {
    word = w;
    freq = f;
  }
  
  @Override
  public String toString() {
    return String.format("(%s, %d)", word, freq);
  }
}

input の中身を見ていく。 WordReader の出力は1単語なので、入力は単語がひとつになる。入力された単語を wordMap から探索し、存在していれば頻度に1を加算し、存在していなければ新たに WCPair を作成してマップに追加する。

  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
  {
    @Override
    public void process(String word)
    {
      WCPair pair = wordMap.get(word);
      if (null != pair) {    // word seen previously
        pair.freq += 1;
        return;
      }

      // new word
      pair = new WCPair();
      pair.word = word;
      pair.freq = 1;
      wordMap.put(word, pair);
    }
  };

メソッド

メソッドは endWindow のみです。このメソッドは、ウィンドウが終了したときに呼び出される。マップをリストにしてアウトプットポートに流している。ウィンドウの設定をしていないようなので、デフォルト値があると思われるが、デフォルト値がなんなのかよくわからない。わかったら追記しておきます。

  public void endWindow()
  {
    LOG.info("WindowWordCount: endWindow");

    // got EOF; if no words found, do nothing
    if (wordMap.isEmpty()) return;

    // have some words; emit single map and reset for next file
    final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
    output.emit(list);
    list.clear();
    wordMap.clear();
  }

FileWordCount

FileWordCount はファイル全体から単語の出現頻度を集計する。

public class FileWordCount extends BaseOperator
{
  private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class);

  // set to true when we get an EOF control tuple
  protected boolean eof = false;

  // last component of path (i.e. only file name)
  // incoming value from control tuple
  protected String fileName;

  // wordMapFile   : {word => frequency} map, current file, all words
  protected Map<String, WCPair> wordMapFile = new HashMap<>();

  // singleton map of fileName to sorted list of (word, frequency) pairs
  protected transient Map<String, Object> resultFileFinal;

  // final sorted list of (word,frequency) pairs
  protected transient List<WCPair> fileFinalList;

  public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
  {
    @Override
    public void process(List<WCPair> list)
    {
      // blend incoming list into wordMapFile and wordMapGlobal
      for (WCPair pair : list) {
        final String word = pair.word;
        WCPair filePair = wordMapFile.get(word);
        if (null != filePair) {    // word seen previously in current file
          filePair.freq += pair.freq;
          continue;
        }

        // new word in current file
        filePair = new WCPair(word, pair.freq);
        wordMapFile.put(word, filePair);
      }
    }
  };

  public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
  {
    @Override
    public void process(String msg)
    {
      if (msg.isEmpty()) {    // sanity check
        throw new RuntimeException("Empty file path");
      }
      LOG.info("FileWordCount: EOF for {}", msg);
      fileName = msg;
      eof = true;
      // NOTE: current version only supports processing one file at a time.
    }
  };

  // fileOutput -- tuple is singleton map {<fileName> => fileFinalList}; emitted on EOF
  public final transient DefaultOutputPort<Map<String, Object>>
    fileOutput = new DefaultOutputPort<>();

  @Override
  public void setup(OperatorContext context)
  {
    // singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName
    resultFileFinal = new HashMap<>(1);
    fileFinalList = new ArrayList<>();
  }

  @Override
  public void endWindow()
  {
    LOG.info("FileWordCount: endWindow for {}", fileName);

    if (wordMapFile.isEmpty()) {    // no words found
      if (eof) {                    // write empty list to fileOutput port
        // got EOF, so output empty list to output file
        fileFinalList.clear();
        resultFileFinal.put(fileName, fileFinalList);
        fileOutput.emit(resultFileFinal);

        // reset for next file
        eof = false;
        fileName = null;
        resultFileFinal.clear();
      }
      LOG.info("FileWordCount: endWindow for {}, no words", fileName);
      return;
    }

    LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
             fileName, wordMapFile.size(), eof);

    if (eof) {                     // got EOF earlier
      if (null == fileName) {      // need file name to emit topN pairs to file writer
        throw new RuntimeException("EOF but no fileName at endWindow");
      }

      // sort list from wordMapFile into fileFinalList and emit it
      getList(wordMapFile);
      resultFileFinal.put(fileName, fileFinalList);
      fileOutput.emit(resultFileFinal);

      // reset for next file
      eof = false;
      fileName = null;
      wordMapFile.clear();
      resultFileFinal.clear();
    }
  }

  // populate fileFinalList with topN frequencies from argument
  // This list is suitable input to WordCountWriter which writes it to a file
  // MUST have map.size() > 0 here
  //
  private void getList(final Map<String, WCPair> map)
  {
    fileFinalList.clear();
    fileFinalList.addAll(map.values());

    // sort entries in descending order of frequency
    Collections.sort(fileFinalList, new Comparator<WCPair>() {
        @Override
        public int compare(WCPair o1, WCPair o2) {
          return (int)(o2.freq - o1.freq);
        }
    });
  
    LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());
  }
}

メンバー変数

  • LOG: ロガー
  • eof: ファイルの読み込み終わりフラグ
  • fileName: ファイル名
  • wordMapFile: ファイル単位での単語の出現頻度
  • resultFileFinal: 集計結果を格納するマップ
  • fileFinalList: 集計結果を格納するリスト
  • input: インプットポート
  • control: インプットポート、ファイル名
  • fileOutput: アウトプットポート

input を見ていく。 WindowWordCount とほぼ同じで、単語の出現頻度を集計してマップに入れている。

  public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
  {
    @Override
    public void process(List<WCPair> list)
    {
      // blend incoming list into wordMapFile and wordMapGlobal
      for (WCPair pair : list) {
        final String word = pair.word;
        WCPair filePair = wordMapFile.get(word);
        if (null != filePair) {    // word seen previously in current file
          filePair.freq += pair.freq;
          continue;
        }

        // new word in current file
        filePair = new WCPair(word, pair.freq);
        wordMapFile.put(word, filePair);
      }
    }
  };

次に controllineReader からファイル名が流れてくるので、それを保存して eof フラグを ON に。

  public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
  {
    @Override
    public void process(String msg)
    {
      if (msg.isEmpty()) {    // sanity check
        throw new RuntimeException("Empty file path");
      }
      LOG.info("FileWordCount: EOF for {}", msg);
      fileName = msg;
      eof = true;
      // NOTE: current version only supports processing one file at a time.
    }
  };

メソッド

setup はマップとリストの初期化を行っているだけなので省略。

endWindow を見ていく。少々長いが、やっていることは単純で、ウィンドウが終わったタイミングで eof が ON になっている(つまりファイルの読み込みが終わっている)場合、集計結果をアウトプットポートに流す。流すときに getList でソートなどの加工を行っている。

  public void endWindow()
  {
    LOG.info("FileWordCount: endWindow for {}", fileName);

    if (wordMapFile.isEmpty()) {    // no words found
      if (eof) {                    // write empty list to fileOutput port
        // got EOF, so output empty list to output file
        fileFinalList.clear();
        resultFileFinal.put(fileName, fileFinalList);
        fileOutput.emit(resultFileFinal);

        // reset for next file
        eof = false;
        fileName = null;
        resultFileFinal.clear();
      }
      LOG.info("FileWordCount: endWindow for {}, no words", fileName);
      return;
    }

    LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
             fileName, wordMapFile.size(), eof);

    if (eof) {                     // got EOF earlier
      if (null == fileName) {      // need file name to emit topN pairs to file writer
        throw new RuntimeException("EOF but no fileName at endWindow");
      }

      // sort list from wordMapFile into fileFinalList and emit it
      getList(wordMapFile);
      resultFileFinal.put(fileName, fileFinalList);
      fileOutput.emit(resultFileFinal);

      // reset for next file
      eof = false;
      fileName = null;
      wordMapFile.clear();
      resultFileFinal.clear();
    }
  }

  private void getList(final Map<String, WCPair> map)
  {
    fileFinalList.clear();
    fileFinalList.addAll(map.values());

    // sort entries in descending order of frequency
    Collections.sort(fileFinalList, new Comparator<WCPair>() {
        @Override
        public int compare(WCPair o1, WCPair o2) {
          return (int)(o2.freq - o1.freq);
        }
    });
  
    LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());
  }

WordCountWriter

最後は WordCountWriter 。集計結果をファイルに出力するオペレーター。こちらは今までとは異なり、 AbstractFileOutputOperator を継承しているので、少々実装が違っている。

public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>>
{
  private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class);
  private static final String charsetName = "UTF-8";
  private static final String nl = System.lineSeparator();

  private String fileName;    // current file name
  private transient final StringBuilder sb = new StringBuilder();

  @Override
  public void endWindow()
  {
    if (null != fileName) {
      requestFinalize(fileName);
    }
    super.endWindow();
  }

  // input is a singleton list [M] where M is a singleton map {fileName => L} where L is a
  // list of pairs: (word, frequency)
  //
  @Override
  protected String getFileName(Map<String, Object> tuple)
  {
    LOG.info("getFileName: tuple.size = {}", tuple.size());

    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    fileName = entry.getKey();
    LOG.info("getFileName: fileName = {}", fileName);
    return fileName;
  }

  @Override
  protected byte[] getBytesForTuple(Map<String, Object> tuple)
  {
    LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());

    // get first and only pair; key is the fileName and is ignored here
    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    final List<WCPair> list = (List<WCPair>) entry.getValue();

    if (sb.length() > 0) {        // clear buffer
      sb.delete(0, sb.length());
    }

    for ( WCPair pair : list ) {
      sb.append(pair.word); sb.append(" : ");
      sb.append(pair.freq); sb.append(nl);
    }

    final String data = sb.toString();
    LOG.info("getBytesForTuple: data = {}", data);
    try {
      final byte[] result = data.getBytes(charsetName);
      return result;
    } catch (UnsupportedEncodingException ex) {
      throw new RuntimeException("Should never get here", ex);
    }
  }

}

メンバー変数

  • LOG: ロガー
  • charsetName: エンコード
  • nl: 改行コード
  • fileName: ファイル名
  • sb: StringBuilder

メソッド

endWindow では、 requestFinalize を呼び出してファイル処理を終了する。

  public void endWindow()
  {
    if (null != fileName) {
      requestFinalize(fileName);
    }
    super.endWindow();
  }

getFileName には、ファイル名の取得方法を記述。

  protected String getFileName(Map<String, Object> tuple)
  {
    LOG.info("getFileName: tuple.size = {}", tuple.size());

    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    fileName = entry.getKey();
    LOG.info("getFileName: fileName = {}", fileName);
    return fileName;
  }

getBytesForTuple では、タプルからファイルに出力する内容を記述する。ここではリストを整形して一つの文字列にして返している。

  protected byte[] getBytesForTuple(Map<String, Object> tuple)
  {
    LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());

    // get first and only pair; key is the fileName and is ignored here
    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    final List<WCPair> list = (List<WCPair>) entry.getValue();

    if (sb.length() > 0) {        // clear buffer
      sb.delete(0, sb.length());
    }

    for ( WCPair pair : list ) {
      sb.append(pair.word); sb.append(" : ");
      sb.append(pair.freq); sb.append(nl);
    }

    final String data = sb.toString();
    LOG.info("getBytesForTuple: data = {}", data);
    try {
      final byte[] result = data.getBytes(charsetName);
      return result;
    } catch (UnsupportedEncodingException ex) {
      throw new RuntimeException("Should never get here", ex);
    }
  }

まとめ

Apex ではまずアプリケーション全体をオペレーターの DAG で設計する。 DAG の構成が決まったら各オペレーターの処理を記述するだけ。他のストリーム処理エンジンに詳しくないので、比較はできないが、分散処理をあまり意識しなくていいので書きやすそう。また、よく使われそうな処理は Malhar にライブラリとして準備されているため、簡単に書くことができる。

Pythonではじめる機械学習 ―scikit-learnで学ぶ特徴量エンジニアリングと機械学習の基礎

Pythonではじめる機械学習 ―scikit-learnで学ぶ特徴量エンジニアリングと機械学習の基礎

広告を非表示にする

ストリーム処理で文章内にある単語の出現頻度をカウントする(Apache Apex)

Stream

ストリーム処理エンジンである Apache Apex でストリーム処理を行うアプリケーションを作成したので、まとめておく。Apex についてはこちら:

koheikimura.hatenablog.com

元ネタ

Webinar: Building Your First Apache Apex Application - YouTube

必要なもの

環境

OS

$ cat /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=16.04
DISTRIB_CODENAME=xenial
DISTRIB_DESCRIPTION="Ubuntu 16.04.1 LTS"

Java

$ java -version
openjdk version "1.8.0_131"
OpenJDK Runtime Environment (build 1.8.0_131-8u131-b11-0ubuntu1.16.04.2-b11)
OpenJDK 64-Bit Server VM (build 25.131-b11, mixed mode)
$ javac -version
javac 1.8.0_131

git

$ git version
git version 2.7.4

Maven

$ mvn --version
Apache Maven 3.3.9
Maven home: /usr/share/maven
Java version: 1.8.0_131, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: en_US, platform encoding: ISO-8859-1
OS name: "linux", version: "4.4.0-57-generic", arch: "amd64", family: "unix"

Hadoop

$ hadoop version
Hadoop 2.8.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 91f2b7a13d1e97be65db92ddabc627cc29ac0009
Compiled by jdu on 2017-03-17T04:12Z
Compiled with protoc 2.5.0
From source with checksum 60125541c2b3e266cbf3becc5bda666
This command was run using /usr/local/hadoop/share/hadoop/common/hadoop-common-2.8.0.jar

git clone

まずは必要なリポジトリをクローンする。使用するのは apex-core , apex-malhar , DataTorrent/examples の3つ。ホームに apex ディレクトリを作ってそこで作業する。

$ mkdir apex
$ cd apex
$ git clone git@github.com:apache/apex-core.git
$ git clone git@github.com:apache/apex-malhar.git
$ git clone git@github.com:DataTorrent/examples.git

Apex インストー

mavenapex-coreapex-malhar をビルド・インストールします。 -DskipTests でテストの実行をスキップすれば(少し)早く完了する。時間がかかるので、しばらく待つ :coffee:

$ ls              
apex-core   apex-malhar examples
$ cd apex-core
$ mvn clean install -DskipTests
$ cd apex-malhar
$ mvn clean install -DskipTests

便利ツール

ビルドの完了を待っている間、ツールを準備しておく。 DataTorrent/examples の中に aliases というスクリプトがあるので、それを読み込んで使えるようにする。

$ source examples/tutorials/topnwords/scripts/aliases

aliases の中身は以下の通り。

# bash aliases and functions useful for working on input and out directories
#

# input and output directories
in=/tmp/test/input-dir out=/tmp/test/output-dir

# list files in input directory
alias ls-input="hdfs dfs -ls $in"

# list files in output directory
alias ls-output="hdfs dfs -ls $out"

# clean input directory
alias clean-input="hdfs dfs -rm $in/*"

# clean output directory
alias clean-output="hdfs dfs -rm $out/*"

# convenient alias to run dtcli from code repository
alias dtcli3="$HOME/src/incubator-apex-core/engine/src/main/scripts/dtcli"

# copy local file (argument) to input directory
function put-file ( ) {
    hdfs dfs -put "$1" "$in"
}

# make local copy of output file (argument) from output directory
function get-file ( ) {
    hdfs dfs -get "$out/$1" "$1".out
}

newapp~/apex にコピー呼び出しやすくしておく。

$ cp examples/tutorials/topnwords/scripts/newapp .

中身はただの mvn コマンドで、必要に応じて myapexapp のところを書き換えて使うことができる。

#!/bin/bash
# script to create a new project
 
# change project name and archetype version as needed
name=myapexapp
version=3.3.0-incubating

mvn -B archetype:generate \
  -DarchetypeGroupId=org.apache.apex \
  -DarchetypeArtifactId=apex-app-archetype \
  -DarchetypeVersion=$version  \
  -DgroupId=com.example \
  -Dpackage=com.example.$name \
  -DartifactId=$name \
  -Dversion=1.0-SNAPSHOT

プロジェクト作成

newapp を実行してプロジェクトを作成。 Y: : が出てきたら、そこでエンター(ッターン!)。

$ bash newapp
[INFO] Scanning for projects...
[INFO] 
[INFO] ------------------------------------------------------------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] ------------------------------------------------------------------------
[INFO] 
...
$ ls
apex-core   apex-malhar examples myapexapp newapp
$ cd myapexapp

プロジェクトの作成が完了したら、ソースコードを確認する。

$ find src -name "*.java"
src/main/java/com/example/myapexapp/Application.java
src/main/java/com/example/myapexapp/RandomNumberGenerator.java
src/test/java/com/example/myapexapp/ApplicationTest.java

今回ユニットテストは不要なのでポイしておく。こうすることでビルド時に -DskipTests を忘れても、テストが実行されない。今回は /tmp に退避させたが、もちろん削除してもいい。

$ mv src/test/java/com/example/myapexapp/ApplicationTest.java /tmp

サンプルコードを導入

DataTorrent/examples のコードを導入する。まずはソースのディレクトリに移動する。

$ cd src/main/java/com/example/myapexapp

tutorial/topnwordsソースコードを持ってくる。入力された文章内にある各単語の出現頻度を出力する。詳細は割愛。

$ ls   
Application.java           RandomNumberGenerator.java
$ ls ~/apex/examples/tutorials/topnwords/webinar/
ApplicationWordCount.java      WCPair.java                    WordReader.java
FileWordCount.java             WindowWordCount.java           properties-SortedWordCount.xml
LineReader.java                WordCountWriter.java
$ cp ~/apex/examples/tutorials/topnwords/webinar/*.java .
$ ls
Application.java           LineReader.java            WindowWordCount.java
ApplicationWordCount.java  RandomNumberGenerator.java WordCountWriter.java
FileWordCount.java         WCPair.java                WordReader.java

プロパティファイルも必要なので、それも resources にコピーする。

$ cd ../../../../resources/META-INF 
$ cp ~/apex/examples/tutorials/topnwords/webinar/properties-SortedWordCount.xml .
$ ls
properties-SortedWordCount.xml properties.xml

アプリケーションのビルド

アプリケーションのトップディレクトリに移動してビルドする。ビルド後、 target ディレクトリに myapexapp-1.0-SNAPSHOT.apa が作成される。

$ cd ~/apex/myapexapp
$ mvn clean package -DskipTests
$ ls target 
antrun                     generated-resources        maven-archiver             site
archive-tmp                generated-sources          maven-status               test-classes
classes                    generated-test-sources     myapexapp-1.0-SNAPSHOT.apa
deps                       javadoc-bundle-options     myapexapp-1.0-SNAPSHOT.jar

アプリケーションの実行

ビルドしたアプリケーションの実行には Apex の CLI ツール を使う。 apex-core の下のあるので、 myapexapp ディレクトリ内で実行する。

$ ../apex-core/engine/src/main/scripts/apex 
Apex CLI 3.7.0-SNAPSHOT 02.06.2017 @ 09:02:30 JST rev: 22feeed branch: master
apex> 

そこで launch target/myapexapp-1.0-SNAPSHOT.apa と打つと実行される。 MyFirstApplicationSortedWordCount のどちらを実行するか聞かれるので、2を選ぶとアプリケーションが実行され、 appId が発行される。

apex> launch target/myapexapp-1.0-SNAPSHOT.apa
  1. MyFirstApplication
  2. SortedWordCount
Choose application: 2
{"appId": "application_1496704660177_0001"}
apex (application_1496704660177_0001) > 

動作確認

まず hdfs dfs で入出力ディレクトリを作成する。

$ hdfs dfs -mkdir -p /tmp/test/input-dir
$ hdfs dfs -mkdir -p /tmp/test/output-dir

この時点ではまだなにもない。

$ ls-input
... 何も表示されない
$ ls-output
... 何も表示されない

aliaces で定義されている put-file を使って入力ディレクトリにテキストファイルを put する。今回は https://www.dummytextgenerator.com で英文を作成して dummy.txt として保存した。

$ ls -lh ~/*.txt
/home/user/dummy.txt
$ put-file ~/dummy.txt

ls-input すると、先ほどのテキストファイルと同じものがみつかる。

$ ls-input
Found 1 items
-rw-r--r--   1 vagrant supergroup      15167 2017-06-06 04:34 /tmp/test/input-dir/dummy.txt

ls-output すると、出力ファイルができているので、中身を確認する。

$ ls-output
Found 1 items
-rwxrwxrwx   1 vagrant supergroup       3028 2017-06-06 04:34 /tmp/test/output-dir/dummy.txt
$ hdfs dfs -cat /tmp/test/output-dir/dummy.txt | head
t : 75
you : 43
re : 29
sixth : 22
seasons : 22
creature : 22
every : 22
moved : 20
together : 20
subdue : 20

ちゃんと単語の出現頻度が出力されました!

Hadoop 第3版

Hadoop 第3版

広告を非表示にする

ストリーム処理基盤 Apache Apex で Hello World

ストリーム処理基盤のひとつである Apache Apex の開発環境を構築します。

参考

Apache Apex Documentation

Apache Apex とは

Apex is a Hadoop YARN native platform that unifies stream and batch processing.

Apache Apex

Apache Apex はストリーム処理とバッチ処理を統合する Hadoop YARN ネイティブの基盤で、

  • 高いスケーラビリティとパフォーマンス
  • フォールトトレランスと状態管理
  • Hadoop YARN & HDFS
  • シンプルな API

などが特徴(らしい)。 Operator と呼ばれる処理単位を作って DAG (有向非巡回グラフ) を組んでストリーム処理アプリケーションを作成する。各 Operator は状態を持つことができる。データフローやコネクティビティ、フォールトトレランスは Apex 側が面倒みてくれるので、開発者は各 Operator が入力タプルをどう扱うか、出力をいつ(そしてどのポートに)送るかだけわかっていれば良い。

Operators - Apache Apex Documentation

Apex Malhar

apache/apex-malhar: Mirror of Apache Apex malhar

Operator を作成するためのライブラリ。 JSONCSV 操作、 Kafka 連携、 S3への出力などができる(らしい)。

必要なもの

開発に必要なものは下記の通り。

詳細は: Development Setup - Apache Apex Documentation

Apache Maven のインストー

Maven はなかったので Homebrew で雑にインストールした。

$ brew search maven
maven               maven-completion    maven-shell         maven@3.1           maven@3.2           maven@3.3
$ brew install maven
...
🍺  /usr/local/Cellar/maven/3.5.0: 106 files, 9.8MB, built in 17 seconds

バージョン

Java

$ java -version
java version "1.8.0_102"
Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
$ javac -version
javac 1.8.0_102

git

$ git version
git version 2.10.1 (Apple Git-78)

Maven

$ mvn --version
Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 2017-04-04T04:39:06+09:00)
Maven home: /usr/local/Cellar/maven/3.5.0/libexec
Java version: 1.8.0_102, vendor: Oracle Corporation
Java home: /Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "mac os x", version: "10.11.6", arch: "x86_64", family: "mac"

プロジェクトの作成

下記のコマンドでプロジェクトが作成される。

$ mvn archetype:generate \
 -DarchetypeGroupId=org.apache.apex \
 -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.4.0 \
 -DgroupId=com.example -Dpackage=com.example.myapexapp -DartifactId=myapexapp \
 -Dversion=1.0-SNAPSHOT

しばらく待てば完了するので、ビルド。

$ cd myapexapp
$ mvn clean package -DskipTests

Unit Test

mvn test すればOK。おわり。

$ mvn test
...
hello world: 0.19778332097175633
hello world: 0.3872563781951901
hello world: 0.6822472419746941
hello world: 0.705621338023663
hello world: 0.11655955618022251
hello world: 0.7625487759662669
hello world: 0.029114499662175386
...

チュートリアル

Hello World の次は公式ドキュメントを読むと良い。

サンプルコード

apache/apex-malhar リポジトリexamplesDataTorrent/examples が参考になる。

Kafka: The Definitive Guide: Real-time Data and Stream Processing at Scale

Kafka: The Definitive Guide: Real-time Data and Stream Processing at Scale

koheikimura.hatenablog.com

koheikimura.hatenablog.com

広告を非表示にする

kafka-docker で作った kafka クラスタに Elixir クライアントから接続する

kafka-docker でローカルに kafka クラスタを構築する - 駄文型 の続き。kafkaex/kafka_ex という Kafka クライアントで Producer を作ってみます。

ライブラリの取得

mix.exs を編集して mix deps.get するだけ。

参考: Introduction to Mix - Elixir

defmodule ProducerSampleEx.Mixfile do
  # ...

  def application do
    [
      applications: [
        :kafka_ex,
        :snappy
      ]
    ]
  end

  defp deps do
    [
      {:kafka_ex, "~> 0.6.5"},
      {:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif"}
    ]
  end
end
$ mix deps.get

設定

まず、kafka-docker でローカルに kafka クラスタを構築する - 駄文型 で作った Kafka コンテナのポートを確認。今回は 32776 32778

$ docker-compose ps
                Name                              Command               State                          Ports
-----------------------------------------------------------------------------------------------------------------------------------
66f23109d78a_kafkadocker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:32777->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
kafkadocker_kafka_1                    start-kafka.sh                   Up      0.0.0.0:32778->9092/tcp
kafkadocker_kafka_2                    start-kafka.sh                   Up      0.0.0.0:32776->9092/tcp

次にKafkaEx.Config – kafka_ex を参考に config.exs を書く。以下は最低限の設定。

config :kafka_ex,
  brokers: [
    {"192.168.145.65", 32776}, # {"hostname", port}
    {"192.168.145.65", 32778}
  ],
  consumer_group: :no_consumer_group,
  use_ssl: false

iex で確認

iex -S mix で立ち上げる。設定がおかしいとここでエラーがでる。

$ iex -S mix
Erlang/OTP 19 [erts-8.3] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]


11:53:23.055 [debug] Succesfully connected to broker "192.168.145.65":32776

11:53:23.056 [debug] Succesfully connected to broker "192.168.145.65":32778

11:53:23.075 [debug] Establishing connection to broker 1009: "192.168.145.65" on port 32778

11:53:23.076 [debug] Succesfully connected to broker "192.168.145.65":32778

11:53:23.076 [debug] Establishing connection to broker 1010: "192.168.145.65" on port 32776

11:53:23.077 [debug] Succesfully connected to broker "192.168.145.65":32776
Interactive Elixir (1.4.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>

つながった! metadata/1 で確認できる。

iex(1)> KafkaEx.metadata(topic: "topic")
%KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "192.168.145.65",
   node_id: 1009, port: 32778, socket: nil},
  %KafkaEx.Protocol.Metadata.Broker{host: "192.168.145.65", node_id: 1010,
   port: 32776, socket: nil}],
 topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
   partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :leader_not_available,
     isrs: [], leader: -1, partition_id: 0, replicas: []},
    %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
     isrs: [1010], leader: 1010, partition_id: 3, replicas: [1010]},
    %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
     isrs: [1010], leader: 1010, partition_id: 1, replicas: [1010]},
    %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
     isrs: [1009], leader: 1009, partition_id: 2, replicas: [1009]}],
   topic: "topic"}]}

あとは produce/4 で送るだけ!トピック名、パーティション番号、メッセージを渡す。

iex(2)> KafkaEx.produce("topic", 0, "msg") # opt は省略
:leader_not_available

11:56:41.715 [error] Leader for topic topic is not available

あれ?どうやら、トピック名が topic だとだめそう。別のトピックを指定すると通る。

iex(3)> KafkaEx.produce("new_topic", 0, "msg")
:ok

(追記) 使えなかったのはトピックではなくパーティションだった。リーダーが使用不可になっている。それがなぜかは不明だが。。。

kafka-console-consumer.sh で確認

前回同様、 Kafka Shell を起動して

$ start-kafka-shell.sh 192.168.145.65 192.168.145.65:32777 # Kafka Shell を起動

kafka-console-consumer.sh を叩く。 KafkaEx.produce/4 でもう一度送り、メッセージが表示されればOK!!!

$ kafka-console-consumer.sh --topic new_topic --zookeeper $ZK
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by pas
sing [bootstrap-server] instead of [zookeeper].
msg

ハマったところ

  • config.exs の設定
    • brokers{"192.168.145.65", "32776"} と設定してしまう凡ミスを犯していた。
  • トピック名が topic だとだめそう。
  • mix.exs の設定
    • kafka_ex の README.md には mod: {MyApp, []}, という行があったので入れていた。
    • これはモジュールのコールバックの設定を行うためのもの。
    • 入れておくと MyApp.start/2 (今回の場合 ProducerSampleEx.start/2 )を実行しようとしてしまう。
    • 今回は不要なので削除した。

Apache Kafka入門

Apache Kafka入門

広告を非表示にする

kafka-docker でローカルに kafka クラスタを構築する

参考

環境

  • OS X El Captain 10.11.6
  • docker 17.03.1-ce
  • docker-compose 1.11.2

手順

1. Download kafka-docker

$ git clone git@github.com:wurstmeister/kafka-docker.git

2. Update docker-compose.yml

docker-compose.yml を編集して zookeeper の Port と KAFKA_ADVERTISED_HOST_NAME を変更します。

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181" # "2181"に変更する
  kafka:
    build: .
    ports:
      - "9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100 # ifconfig で調べる
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

3. Start the cluster

起動します。

$ docker-compose up -d

4. Scale Kafka instances

Kafkaのインスタンスを2つにしてみます。

$ docker-compose scale kafka=2

5. Note ZK_PORT

Kafka Shell に入る前に zookeeper コンテナのポートを確認しておきます。

                Name                              Command               State                          Ports
-----------------------------------------------------------------------------------------------------------------------------------
66f23109d78a_kafkadocker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:32774->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
kafkadocker_kafka_1                    start-kafka.sh                   Up      0.0.0.0:32775->9092/tcp
kafkadocker_kafka_2                    start-kafka.sh                   Up      0.0.0.0:32773->9092/tcp

この場合、 ZK_PORT32774 でした。

6. Start Kafka Shell

Kafka Shell を起動して Kafka の設定を行います。

$ ./start-kafka-shell.sh $DOCKER_HOST_IP $ZK_HOST:$ZK_PORT
  • $DOCKER_HOST_IP: DockerホストのIP。 dockcer-compose.ymlKAFKA_ADVERTISED_HOST_NAME に設定した値。
  • $ZK_HOST: この場合は $DOCKER_HOST_IP と同じ。
  • $ZK_PORT: 5.で調べた値。

7. Create Topic

トピック名 topicパーティション数4でトピックを作成します。トピックについては こちらkafka-topics.sh の詳しい使い方は こちら

$ $KAFKA_HOME/bin/kafka-topics.sh --create --topic topic \
--partitions 4 --zookeeper $ZK --replication-factor 1 # `$KAFKA_HOME/bin/` は省略できました。

確認

1. Describe topic

トピック名、パーティション数などを確認します。

$ ./start-kafka-shell.sh $DOCKER_HOST_IP $ZK_HOST:$ZK_PORT
$ $KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic --zookeeper $ZK
Topic:topic     PartitionCount:4        ReplicationFactor:1     Configs:
        Topic: topic    Partition: 0    Leader: 1002    Replicas: 1002  Isr: 1002
        Topic: topic    Partition: 1    Leader: 1010    Replicas: 1010  Isr: 1010
        Topic: topic    Partition: 2    Leader: 1009    Replicas: 1009  Isr: 1009
        Topic: topic    Partition: 3    Leader: 1010    Replicas: 1010  Isr: 1010 

2. Start a producer

kafka-console-producer.shkafka-console-consumer.sh を使ってみます。まずは producer を起動します。

$ $KAFKA_HOME/bin/kafka-console-producer.sh --topic=topic \
--broker-list=`broker-list.sh`

3. Start a consumer

次に、別のシェルから consumer を起動します。 producer 側のシェルに入力した文字が表示されれば成功です!

$ ./start-kafka-shell.sh $DOCKER_HOST_IP $ZK_HOST:$ZK_PORT
$ $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=topic --zookeeper=$ZK

おわり

コンテナを止めます。

$ docker-compose stop

ハマったところ

  1. zookeeper コンテナのポート設定
    • 2181:2181 から 2181 に変更する必要がありました。
  2. KAFKA_ADVERTISED_HOST_NAME の設定
    • チュートリアルに「Macの場合は docker-machine ip で確認した値を入れる」とあったので、docker-machine上でコンテナを立ち上げないとだめなのか?といろいろ右往左往してしましました。
  3. start-kafka-shell.sh に渡す zookeeper のポート
    • docker-compose ps あるいは docker ps で確認する必要があったのですが、はじめは間違った値を入れていました。
  4. トピックの設定
    • パーティション数を4にしたかったのですが、うまくいかずに kafka-topics.sh --delete したり kafka-topics.sh --alter したりしてました。

続編: kafka-docker で作った kafka クラスタに Elixir クライアントから接続する - 駄文型

Kafka: The Definitive Guide: Real-time Data and Stream Processing at Scale

Kafka: The Definitive Guide: Real-time Data and Stream Processing at Scale

Apache Kafka入門

Apache Kafka入門

広告を非表示にする

ITエンジニアとデザイナーが英語を勉強するなら「ITの英語」がおすすめ

medium.com

このゴールデンウィークで「ITの英語」を読んだ。機械学習やVR、デザインやマネジメントなど、おもしろいトピックが多いので、英語を勉強したいと思っているプログラマやデザイナーに強くおすすめしたい。以下のような人には特に強くおすすめする。

  • IT、特にシリコンバレーやベイエリアの会社のテクノロジーに興味がある。
  • だが、英語のブログやニュース記事を読むのは少しむずかしいと感じている。
  • 英語力はTOEICで言うと500超えくらい。
  • 英文を読みながら語彙力を強化したい。

僕も英語のブログやニュースを読むのだが、場合によっては内容を理解するのが難しいことがある。例えばdocs.ruby-lang.orgのような公式ドキュメント以外を読む場合は普通事前にその記事の概要を知ることができない。言い換えれば、「何が書かれているか」は公式ドキュメントの場合は読む前にわかっていることが多いが、ブログやニュースの記事は中身を読むまで、その記事が言いたいことがわからない。そのため、「文中の各単語はわかるが、その文全体の意味がよくわからない」ということがよく起こる。多分読解力や熟語の知識の不足が原因なんだろうと思う。

「ITの英語」には日本語訳が付いているので、そういう事態は避けられる。英文を読みながら、日本語訳で自分の解釈が正しいかどうか確認できる。さらに、この本はインタビューやプレゼンテーションで構成されているので、ネイティブのリアルで自然な英語から、彼らがよく使う単語、熟語、フレーズを学ぶことができる。音声データはダウンロード可能。

ひとまず一通り読み終えたので、 DUO 3.0 と並行して音読とリスニングを継続して単語の定着を図る予定。

DUO 3.0

DUO 3.0

ITプロジェクトの英語(無料MP3音声付き) (ビジネスエキスパートEnglish)

ITプロジェクトの英語(無料MP3音声付き) (ビジネスエキスパートEnglish)

広告を非表示にする

Rによるデータサイエンス 読書メモ その3(所感あり)

Rによるデータサイエンス データ解析の基礎から最新手法まで

Rによるデータサイエンス データ解析の基礎から最新手法まで

目次

koheikimura.hatenablog.com

の続き。

16章 集団学習

  • 複数のモデルを組合せて精度の高いモデルを構築する手法
  • バギング libraly(adabag) bagging
    • ブートストラップというリサンプリング法で複数の学習データセットを作成
  • ブースティング boosting
    • 逐次重みの調整を繰り返す
  • ランダムフォレスト libraly(randomForest) randamForest
    • ランダムサンプリングしたデータに対してバギングを適用
    • 精度および計算リソースの節約の面でバギングとブースティングより ◯

17章 カーネル法サポートベクターマシン

  • カーネル法: カーネル関数 にりデータを別空間に射影
    • 別空間に射影すれば非線形データを線形モデルで分類できる可能性がある
  • カーネル主成分分析 libraly(kernlab) kpca
  • サポートベクタマシン ksvm

18章 ニューラルネットワーク

  • 神経細胞ニューロン)が多数並列に接続されたシステムを数理的にモデル化したもの
  • 出力結果が目標値に近づくように重みを変える計算を繰り返す
  • パターン認識,分類,ノイズが混在しているデータの処理が得意
  • モデルの分類
    • 教師あり ⇔ 教師なし
    • 階層型ネットワーク ⇔ 非階層型ネットワーク
  • 中間層ありの NN パッケージ nnet

深層学習

  • 階層型ネットワークに属す
  • 中間層を多く用いるため計算量が多い
  • 画像認識,音声認識 → 畳み込みNN (CNN)
  • 時系列データ → 再帰型 NN (RNN)
  • パッケージ ho2 darch mxnet deepnet など
    • この本では ho2 を例に
  • アルゴリズム,隠れ層の数,各層のユニット数,学習の回数をデータ構造に基いて決める必要がある
    • 関連知識と経験が必要

19章 ネットワーク分析

  • 何らかの関係の有無や度合いを分析
  • グラフ理論に基礎をおいていいるためグラフ分析とも
  • 隣接行列の作成 libraly(igraph) graph.adjacency
  • データフレームからグラフオブジェクトに変換 graph.data.frame
  • ネットワークの統計量
    • ノード数 vcount
    • エッジ数 ecount
    • 隣接ノード neighbors
    • 次数(自由度) degree
    • 密度 graph.density
    • 中心性 定義によっていろいろ(次数中心性,接近中心性など)
    • クラスターの係数 transitivity 隣り合うノードの間に三角形グループがいくつあるか
    • ニューマンの次数の相関係数 assortativity.degree 2つのノード間の次数の関連性
    • 最短パス値 shortest.paths
    • 平均パス average.path.length
  • 次数の平均,密度,中心性,クラスター係数などから複雑さを比較可能
  • グラフ間の差分 graph.difference
  • コミュニティ抽出 p244 表19.6
  • オーバーラッピングを許すコミュニティ抽出 linkcomm
  • ベイジアンネットワーク: 因果関係を推測

20章 アソシエーション分析

  • マーケット・バスケット・トランザクションデータの分析
  • libraly(arules)
  • 相関ルール apriori 商品の組合せの規則を抽出
  • 頻出アイテムの抽出 eclat
  • 抽出結果の補助分析 dissimilarity

21章 時系列分析

  • 時系列データ: 一定の時間間隔で測定・観測したデータ
  • libraly(stats)
  • 時系列データの図示 ts.plot
  • 自己共分散と自己相関
  • スペクトル分析 spec.pgram 周期性を解析(トレンドやノイズを除去するようなイメージ)
  • 単位今検定 libraly(tseris) adf.text データがランダムウォークかどうか
  • AR(自己回帰)モデル ar
  • ARMA/ARIMAモデル arima ARモデルに誤差の移動平均を加えたモデル
  • 成分の分解 stl トレンド,周期変動,残差に分解

22章 生存分析

  • イベントが起きるまでの時間とイベントとの関係に焦点
  • 機械システムの故障や患者の疾患,死亡を対象
  • libraly(survival)
  • ノンパラメトリックモデル survfit
  • セミパラメトリックモデル coxph
  • パラメトリックモデル survreg
  • 共変量(時間以外の説明変数)の有無,分布を仮定するか否かに違い

全体の感想

網羅的に手法をカバーしているので課題に対してどの手法を使うべきか?の検討に使うことができる。網羅的な分,説明が短いので入門書として使うのは厳しい。Rについても統計についても,すでにある程度知っている,あるいは昔学習したが忘れているくらいの知識レベルだとちょうどいい印象。Rの関数の使い方だけでなく,結果データの見方や手法の選び方も解説しているので実用性は高い。一度目を通しておけばリファレンスとして使えそう。さらに深く知りたければ参考文献に載っている各手法の専門書を読むとよさそう。

Rによるデータサイエンス データ解析の基礎から最新手法まで

Rによるデータサイエンス データ解析の基礎から最新手法まで

広告を非表示にする