embulkでCSVデータを読み込む

こんにちは、先々週ぐらいから大量データをマージしながらDBに投入する仕事をしていて、その時にembulkを使って便利だなぁと思ったのでブログを書きます。
embulkとは何か?
embulkはTreasure Dataが作成したデータ投入ツールです。対象からデータを取得し投入対象へインサートします。
JavaとRubyで作られており、プラグインなどはgemで配布されています。よくFluentdのバッチ版と言われますが、まさにその通りで、Fluentdみたいにリアルタイム実行の必要がないものについてはこちらで十分に代用ができます。
インストール
Mac
Macの場合はHome brewでインストールができます。
$ brew install embulk
Linux
Linuxの場合はjarを任意の場所にダウンロードして使います。(例は/opt配下にembulkというディレクトリを作成し、そこにembulkをダウンロードしてパスを通しています)
$ mkdir -p /opt/embulk
$ cd /opt/embulk
$ wget http://dl.embulk.org/embulk-latest.jar
$ echo pwd
>> ~/.bash_profile
$ . ~/.bash_profile
確認
インストールが終わったら以下のコマンドで確認します。
$ which embulk $ embulk -version embulk 0.9.5 $
CSVファイルを読み込む
今回は基本的なCSVのロードについて説明します。
embulkは設定ファイルを読み込み、その情報を元にデータ媒体からデータを抽出します。例えば以下のようなデータをもつ「test.csv」が「/tmp/example_path」配下にあった場合、
"1","test1" "2","test2" "3","test3" "4","test4" "5","test5"
このCSVファイルを読み込むためのconfigファイルはYAMLで以下のように記載します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
--- in: type: file path_prefix: /tmp/example_path/test.csv parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_hander_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: 'id', type: string} - {name: 'testString', type: string} out: type: stdout parser: type: csv |
この設定ファイルを作成したら以下のコマンドを実行して設定に不備がないかを確認します。
$ embulk preview config.yml
embulkは実行すると読み込んだYAMLファイルの内容にしたがってデータをロードし、出力します。
今回の場合は以下のような出力になります。
$ embulk preview config.yml 2018-04-09 15:35:07.440 +0900: Embulk v0.9.5 2018-04-09 15:35:08.339 +0900 [INFO] (main): Started Embulk v0.9.5 2018-04-09 15:35:08.405 +0900 [INFO] (0001:preview): Listing local files at directory '/tmp/example_path' filtering filename by prefix 'test.csv' 2018-04-09 15:35:08.407 +0900 [INFO] (0001:preview): "follow_symlinks" is set false. Note that symbolic links to directories are skipped. 2018-04-09 15:35:08.415 +0900 [INFO] (0001:preview): Loading files [/tmp/example_path/test.csv] 2018-04-09 15:35:08.433 +0900 [INFO] (0001:preview): Try to read 32,768 bytes from input source +-----------+-------------------+ | id:string | testString:string | +-----------+-------------------+ | 1 | test1 | | 2 | test2 | | 3 | test3 | | 4 | test4 | | 5 | test5 | +-----------+-------------------+ $
実行が成功したら以下のコマンドを実行します
$ embulk run config.yml
実行結果は以下になります。
$ embulk run config2.yml 2018-04-09 15:45:21.833 +0900: Embulk v0.9.5 2018-04-09 15:45:23.008 +0900 [INFO] (main): Started Embulk v0.9.5 2018-04-09 15:45:23.067 +0900 [INFO] (0001:transaction): Listing local files at directory '/tmp/example_path' filtering filename by prefix 'test.csv' 2018-04-09 15:45:23.068 +0900 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped. 2018-04-09 15:45:23.075 +0900 [INFO] (0001:transaction): Loading files [/tmp/example_path/test.csv] 2018-04-09 15:45:23.130 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4 2018-04-09 15:45:23.140 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0} 1,test1 2,test2 3,test3 4,test4 5,test5 2018-04-09 15:45:23.240 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0} 2018-04-09 15:45:23.245 +0900 [INFO] (main): Committed. 2018-04-09 15:45:23.245 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"/tmp/example_path/test.csv"},"out":{}} $
preview と run
embulkにはpreviewモードと、runモードが存在します。
previewモードは主に設定ファイルの試験走行を目的として実施します。実際行なっているのは設定ファイルの「in:」の部分だけで、出力されるデータは表形式で出力されます。
それに対してrunモードは設定ファイルの「out:」の部分も含めての実行になります。このモードで実行した場合は、データの出力までされるため注意が必要です。
以上の違いがるため、先ほどのセクションで載せた例ではpreviewとrunでデータの出力が違う結果となった訳です。
embulkのconfig自動生成機能
embulkにはconfigを自動生成する機能が備わっています。例えば先ほどの「test.csv」で使用した「config.yml」を自動生成してみます。
自動生成に必要なファイルは、対象となるCSVファイル(先ほどのtest.csv)とその対象を読み込むための設定ファイル(今回は例としてseed.ymlを後述します)です。
対象を読み込むための設定は必要最低限の内容だけを記載します。
1 2 3 4 5 6 7 |
--- in: type: file path_prefix: /tmp/example_path/test.csv out: {stdout: null} |
設定ファイルを作成したら以下のコマンドを実行します。
$ embulk guess seed.yml -o config.yml
「-o」コマンドは結果をファイルに出力します。今回は「config.yml」です。
実行が終了するとconfig.ymlが作成されます。ここで注意が必要なのは、この状態で作成したものは出力側の設定がされていないというところです。
ここについては適切に設定をする必要があります。
before
in: type: file path_prefix: /tmp/example_path/test.csv parser: charset: UTF-8 newline: LF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_header_lines: 0 allow_extra_columns: false allow_optional_columns: false columns: - {name: c0, type: long} - {name: c1, type: string} out: {stdout: null}
after
in: type: file path_prefix: /tmp/example_path/test.csv parser: charset: UTF-8 newline: LF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_header_lines: 0 allow_extra_columns: false allow_optional_columns: false columns: - {name: c0, type: long} - {name: c1, type: string} out: type: stdout parser: type: csv
この修正を行った後、runを実行すると以下のような結果が得られます。
実行
$ embulk run config.yml
結果
$ embulk run config.yml 2018-04-09 16:11:47.743 +0900: Embulk v0.9.5 2018-04-09 16:11:48.966 +0900 [INFO] (main): Started Embulk v0.9.5 2018-04-09 16:11:49.045 +0900 [INFO] (0001:transaction): Listing local files at directory '/tmp/example_path' filtering filename by prefix 'test.csv' 2018-04-09 16:11:49.046 +0900 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped. 2018-04-09 16:11:49.056 +0900 [INFO] (0001:transaction): Loading files [/tmp/example_path/test.csv] 2018-04-09 16:11:49.141 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4 2018-04-09 16:11:49.158 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0} 1,test1 2,test2 3,test3 4,test4 5,test5 2018-04-09 16:11:49.272 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0} 2018-04-09 16:11:49.279 +0900 [INFO] (main): Committed. 2018-04-09 16:11:49.279 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"/tmp/example_path/test.csv"},"out":{}} $
ちなみに今回、guessコマンドを実行した時はそこまでカラムの多くないCSVだったのでたいした恩恵はありませんでしたが、カラムが100とかの単位になった時にこの機能はすごく役立ちます。
まとめ
今回はembulkのインストールと読み込みについてやりました。
embulkは強力なツールでかつ手軽に使えるものなので、データ投入に困ったらまず検討してみるといいかもしれません。
それでは今日はここまで。
“embulkでCSVデータを読み込む” への1件のフィードバック