embulkでPostgreSQLにデータを投入する

こんにちはKUJIRAです。前回はemvulkのインストールとCSVの読み込みを行いました。

今回はこの読み込んだデータをPostgreSQLに投入するやり方を書きます。

embulkをPostgreSQLにアクセスさせる

まず、embulkでPostgreSQLにデータを投入するにはアウトプットの際 PostgreSQLに接続できるようにする必要があります。

そこで導入するのが、embulkのPostgreSQL用のプラグインです。

embulk-input-postgresql

embulk-output-postgresql

今回は出力側なので embulk-output-postgresql プラグインを使います。

プラグインをインストールする

embulkのプラグインはgemパッケージで配布されています。また、本体にgemパッケージの管理機能が内蔵されているため、インストールしたいプラグイン名を指定しサブコマンドを実行することで簡単にプラグインがインストールできます。

$ embulk gem install [plugin name]

今回は、embulk-output-postgresqlプラグインですので、以下のようになります。

$ embulk gem install embulk-output-postgresql

プラグインを使ってみる

プラグインがインストールできたら、設定を行っていきます。

設定記述は「out:」部分に行いますが、基本的には以下の情報を入力すれば最低限の動きをします。

設定キー 概要
host データベースのホスト名
port データベース接続時のポート
user データベースへのログインユーザ名
password データベースへのログインパスワード
database データ投入対象となるデータベース名
schema データ投入対象となるスキーマ名
table データ投入対象となるテーブル名
mode データ投入のモード

例えば、以下のようなデータベースがあったとき、

host localhsot
port 5432
user testuser
password testpassword
database test_db
schema test_schema
table test_table

前回の設定を修正し、上のデータベースにcsvファイルのデータを投入しようとすると以下のようになります。

実行結果は以下です。

$ embulk run config.yml
2018-04-17 00:09:20.779 +0900: Embulk v0.9.6
2018-04-17 00:09:22.544 +0900 [INFO] (main): Started Embulk v0.9.6
2018-04-17 00:09:25.306 +0900 [INFO] (0001:transaction): Gem's home and path are set by default: "/Users/test_user/.embulk/lib/gems"
2018-04-17 00:09:26.079 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-postgresql (0.8.0)
2018-04-17 00:09:26.113 +0900 [INFO] (0001:transaction): Listing local files at directory '/tmp/example_path' filtering filename by prefix 'test.csv'
2018-04-17 00:09:26.114 +0900 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2018-04-17 00:09:26.117 +0900 [INFO] (0001:transaction): Loading files [/tmp/example_path/test.csv]
2018-04-17 00:09:26.155 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2018-04-17 00:09:26.177 +0900 [INFO] (0001:transaction): JDBC Driver = /Users/test_user/.embulk/lib/gems/gems/embulk-output-postgresql-0.8.0/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar
2018-04-17 00:09:26.184 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/test_db options {user=test_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2018-04-17 00:09:26.266 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "test_schema"
2018-04-17 00:09:26.268 +0900 [INFO] (0001:transaction): > 0.00 seconds
2018-04-17 00:09:26.268 +0900 [INFO] (0001:transaction): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205)
2018-04-17 00:09:26.268 +0900 [INFO] (0001:transaction): Using truncate_insert mode
2018-04-17 00:09:26.439 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE "test_schema"."test_table_00000162cf00ea23_embulk000" ("id" TEXT, "testString" TEXT)
2018-04-17 00:09:26.516 +0900 [INFO] (0001:transaction): > 0.08 seconds
2018-04-17 00:09:26.522 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE "test_schema"."test_table_00000162cf00ea23_embulk001" ("id" TEXT, "testString" TEXT)
2018-04-17 00:09:26.530 +0900 [INFO] (0001:transaction): > 0.01 seconds
2018-04-17 00:09:26.533 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE "test_schema"."test_table_00000162cf00ea23_embulk002" ("id" TEXT, "testString" TEXT)
2018-04-17 00:09:26.545 +0900 [INFO] (0001:transaction): > 0.01 seconds
2018-04-17 00:09:26.550 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE "test_schema"."test_table_00000162cf00ea23_embulk003" ("id" TEXT, "testString" TEXT)
2018-04-17 00:09:26.557 +0900 [INFO] (0001:transaction): > 0.01 seconds
2018-04-17 00:09:26.690 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2018-04-17 00:09:26.717 +0900 [INFO] (0016:task-0000): Connecting to jdbc:postgresql://localhost:5432/test_db options {user=test_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2018-04-17 00:09:26.732 +0900 [INFO] (0016:task-0000): SQL: SET search_path TO "test_schema"
2018-04-17 00:09:26.733 +0900 [INFO] (0016:task-0000): > 0.00 seconds
2018-04-17 00:09:26.734 +0900 [INFO] (0016:task-0000): Copy SQL: COPY "test_schema"."test_table_00000162cf00ea23_embulk000" ("id", "testString") FROM STDIN
2018-04-17 00:09:26.739 +0900 [INFO] (0016:task-0000): Connecting to jdbc:postgresql://localhost:5432/test_db options {user=test_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2018-04-17 00:09:26.760 +0900 [INFO] (0016:task-0000): SQL: SET search_path TO "test_schema"
2018-04-17 00:09:26.761 +0900 [INFO] (0016:task-0000): > 0.00 seconds
2018-04-17 00:09:26.761 +0900 [INFO] (0016:task-0000): Copy SQL: COPY "test_schema"."test_table_00000162cf00ea23_embulk001" ("id", "testString") FROM STDIN
2018-04-17 00:09:26.764 +0900 [INFO] (0016:task-0000): Connecting to jdbc:postgresql://localhost:5432/test_db options {user=test_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2018-04-17 00:09:26.780 +0900 [INFO] (0016:task-0000): SQL: SET search_path TO "test_schema"
2018-04-17 00:09:26.781 +0900 [INFO] (0016:task-0000): > 0.00 seconds
2018-04-17 00:09:26.782 +0900 [INFO] (0016:task-0000): Copy SQL: COPY "test_schema"."test_table_00000162cf00ea23_embulk002" ("id", "testString") FROM STDIN
2018-04-17 00:09:26.786 +0900 [INFO] (0016:task-0000): Connecting to jdbc:postgresql://localhost:5432/test_db options {user=test_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2018-04-17 00:09:26.801 +0900 [INFO] (0016:task-0000): SQL: SET search_path TO "test_schema"
2018-04-17 00:09:26.802 +0900 [INFO] (0016:task-0000): > 0.00 seconds
2018-04-17 00:09:26.802 +0900 [INFO] (0016:task-0000): Copy SQL: COPY "test_schema"."test_table_00000162cf00ea23_embulk003" ("id", "testString") FROM STDIN
2018-04-17 00:09:26.856 +0900 [INFO] (0016:task-0000): Loading 5 rows (40 bytes)
2018-04-17 00:09:26.861 +0900 [INFO] (0016:task-0000): > 0.01 seconds (loaded 5 rows in total)
2018-04-17 00:09:26.864 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2018-04-17 00:09:26.865 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/test_db options {user=test_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2018-04-17 00:09:26.877 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "test_schema"
2018-04-17 00:09:26.878 +0900 [INFO] (0001:transaction): > 0.00 seconds
2018-04-17 00:09:26.880 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "test_schema"."test_table" ("id" TEXT, "testString" TEXT)
2018-04-17 00:09:26.895 +0900 [INFO] (0001:transaction): > 0.02 seconds
2018-04-17 00:09:26.898 +0900 [INFO] (0001:transaction): SQL: DELETE FROM "test_schema"."test_table"
2018-04-17 00:09:26.903 +0900 [INFO] (0001:transaction): > 0.01 seconds
2018-04-17 00:09:26.904 +0900 [INFO] (0001:transaction): SQL: INSERT INTO "test_schema"."test_table" ("id", "testString") SELECT "id", "testString" FROM "test_schema"."test_table_00000162cf00ea23_embulk000" UNION ALL SELECT "id", "testString" FROM "test_schema"."test_table_00000162cf00ea23_embulk001" UNION ALL SELECT "id", "testString" FROM "test_schema"."test_table_00000162cf00ea23_embulk002" UNION ALL SELECT "id", "testString" FROM "test_schema"."test_table_00000162cf00ea23_embulk003"
2018-04-17 00:09:26.908 +0900 [INFO] (0001:transaction): > 0.00 seconds (5 rows)
2018-04-17 00:09:26.915 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/test_db options {user=test_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2018-04-17 00:09:26.926 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "test_schema"
2018-04-17 00:09:26.927 +0900 [INFO] (0001:transaction): > 0.00 seconds
2018-04-17 00:09:26.927 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "test_schema"."test_table_00000162cf00ea23_embulk000"
2018-04-17 00:09:26.948 +0900 [INFO] (0001:transaction): > 0.02 seconds
2018-04-17 00:09:26.949 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "test_schema"."test_table_00000162cf00ea23_embulk001"
2018-04-17 00:09:26.953 +0900 [INFO] (0001:transaction): > 0.00 seconds
2018-04-17 00:09:26.953 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "test_schema"."test_table_00000162cf00ea23_embulk002"
2018-04-17 00:09:26.958 +0900 [INFO] (0001:transaction): > 0.01 seconds
2018-04-17 00:09:26.959 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "test_schema"."test_table_00000162cf00ea23_embulk003"
2018-04-17 00:09:26.963 +0900 [INFO] (0001:transaction): > 0.00 seconds
2018-04-17 00:09:26.964 +0900 [INFO] (main): Committed.
2018-04-17 00:09:26.965 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"/tmp/example_path/test.csv"},"out":{}}
$

データの投入結果は以下

$ psql test_db -U test_user -p 5432 -h localhost -c "SELECT * FROM test_schema.test_table;"
Password for user testuser:
 id | testString
----+------------
 1  | test1
 2  | test2
 3  | test3
 4  | test4
 5  | test5
(5 rows)

$

まとめ

今回はcsvファイルのデータをembulkを使用してPostgreSQLに投入しました。先にも記載しましたが、このプラグインにはインプット用のプラグインもあります。そのためPostgreSQLからデータを抽出し、別のPostgreSQLにデータを移すこともできます(もちろん同じPostgreSQLの別テーブルに移すことも可)。

なので、データの抽出・投入を行う際の引き出しとして知ってて損はないと思いました。

コメントを残す