embulkでPostgreSQLにデータを投入する

こんにちはKUJIRAです。前回はemvulkのインストールとCSVの読み込みを行いました。
今回はこの読み込んだデータをPostgreSQLに投入するやり方を書きます。
embulkをPostgreSQLにアクセスさせる
まず、embulkでPostgreSQLにデータを投入するにはアウトプットの際 PostgreSQLに接続できるようにする必要があります。
そこで導入するのが、embulkのPostgreSQL用のプラグインです。
embulk-input-postgresql
embulk-output-postgresql
今回は出力側なので embulk-output-postgresql プラグインを使います。
プラグインをインストールする
embulkのプラグインはgemパッケージで配布されています。また、本体にgemパッケージの管理機能が内蔵されているため、インストールしたいプラグイン名を指定しサブコマンドを実行することで簡単にプラグインがインストールできます。
$ embulk gem install [plugin name]
今回は、embulk-output-postgresqlプラグインですので、以下のようになります。
$ embulk gem install embulk-output-postgresql
プラグインを使ってみる
プラグインがインストールできたら、設定を行っていきます。
設定記述は「out:」部分に行いますが、基本的には以下の情報を入力すれば最低限の動きをします。
設定キー | 概要 |
---|---|
host | データベースのホスト名 |
port | データベース接続時のポート |
user | データベースへのログインユーザ名 |
password | データベースへのログインパスワード |
database | データ投入対象となるデータベース名 |
schema | データ投入対象となるスキーマ名 |
table | データ投入対象となるテーブル名 |
mode | データ投入のモード |
例えば、以下のようなデータベースがあったとき、
host | localhsot |
---|---|
port | 5432 |
user | testuser |
password | testpassword |
database | test_db |
schema | test_schema |
table | test_table |
前回の設定を修正し、上のデータベースにcsvファイルのデータを投入しようとすると以下のようになります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
--- in: type: file path_prefix: /tmp/example_path/test.csv parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_hander_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: 'id', type: string} - {name: 'testString', type: string} out: type: postgresql mode: truncate_insert default_timezone: "Asia/Tokyo" host: localhost port: 5432 user: testuser password: "testpassword" database: test_db schema: test_schema table: test_table |
実行結果は以下です。
$ embulk run config.yml 2018-04-17 00:09:20.779 +0900: Embulk v0.9.6 2018-04-17 00:09:22.544 +0900 [INFO] (main): Started Embulk v0.9.6 2018-04-17 00:09:25.306 +0900 [INFO] (0001:transaction): Gem's home and path are set by default: "/Users/test_user/.embulk/lib/gems" 2018-04-17 00:09:26.079 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-postgresql (0.8.0) 2018-04-17 00:09:26.113 +0900 [INFO] (0001:transaction): Listing local files at directory '/tmp/example_path' filtering filename by prefix 'test.csv' 2018-04-17 00:09:26.114 +0900 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped. 2018-04-17 00:09:26.117 +0900 [INFO] (0001:transaction): Loading files [/tmp/example_path/test.csv] 2018-04-17 00:09:26.155 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4 2018-04-17 00:09:26.177 +0900 [INFO] (0001:transaction): JDBC Driver = /Users/test_user/.embulk/lib/gems/gems/embulk-output-postgresql-0.8.0/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar 2018-04-17 00:09:26.184 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/test_db options {user=test_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2018-04-17 00:09:26.266 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "test_schema" 2018-04-17 00:09:26.268 +0900 [INFO] (0001:transaction): > 0.00 seconds 2018-04-17 00:09:26.268 +0900 [INFO] (0001:transaction): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205) 2018-04-17 00:09:26.268 +0900 [INFO] (0001:transaction): Using truncate_insert mode 2018-04-17 00:09:26.439 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE "test_schema"."test_table_00000162cf00ea23_embulk000" ("id" TEXT, "testString" TEXT) 2018-04-17 00:09:26.516 +0900 [INFO] (0001:transaction): > 0.08 seconds 2018-04-17 00:09:26.522 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE "test_schema"."test_table_00000162cf00ea23_embulk001" ("id" TEXT, "testString" TEXT) 2018-04-17 00:09:26.530 +0900 [INFO] (0001:transaction): > 0.01 seconds 2018-04-17 00:09:26.533 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE "test_schema"."test_table_00000162cf00ea23_embulk002" ("id" TEXT, "testString" TEXT) 2018-04-17 00:09:26.545 +0900 [INFO] (0001:transaction): > 0.01 seconds 2018-04-17 00:09:26.550 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE "test_schema"."test_table_00000162cf00ea23_embulk003" ("id" TEXT, "testString" TEXT) 2018-04-17 00:09:26.557 +0900 [INFO] (0001:transaction): > 0.01 seconds 2018-04-17 00:09:26.690 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0} 2018-04-17 00:09:26.717 +0900 [INFO] (0016:task-0000): Connecting to jdbc:postgresql://localhost:5432/test_db options {user=test_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2018-04-17 00:09:26.732 +0900 [INFO] (0016:task-0000): SQL: SET search_path TO "test_schema" 2018-04-17 00:09:26.733 +0900 [INFO] (0016:task-0000): > 0.00 seconds 2018-04-17 00:09:26.734 +0900 [INFO] (0016:task-0000): Copy SQL: COPY "test_schema"."test_table_00000162cf00ea23_embulk000" ("id", "testString") FROM STDIN 2018-04-17 00:09:26.739 +0900 [INFO] (0016:task-0000): Connecting to jdbc:postgresql://localhost:5432/test_db options {user=test_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2018-04-17 00:09:26.760 +0900 [INFO] (0016:task-0000): SQL: SET search_path TO "test_schema" 2018-04-17 00:09:26.761 +0900 [INFO] (0016:task-0000): > 0.00 seconds 2018-04-17 00:09:26.761 +0900 [INFO] (0016:task-0000): Copy SQL: COPY "test_schema"."test_table_00000162cf00ea23_embulk001" ("id", "testString") FROM STDIN 2018-04-17 00:09:26.764 +0900 [INFO] (0016:task-0000): Connecting to jdbc:postgresql://localhost:5432/test_db options {user=test_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2018-04-17 00:09:26.780 +0900 [INFO] (0016:task-0000): SQL: SET search_path TO "test_schema" 2018-04-17 00:09:26.781 +0900 [INFO] (0016:task-0000): > 0.00 seconds 2018-04-17 00:09:26.782 +0900 [INFO] (0016:task-0000): Copy SQL: COPY "test_schema"."test_table_00000162cf00ea23_embulk002" ("id", "testString") FROM STDIN 2018-04-17 00:09:26.786 +0900 [INFO] (0016:task-0000): Connecting to jdbc:postgresql://localhost:5432/test_db options {user=test_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2018-04-17 00:09:26.801 +0900 [INFO] (0016:task-0000): SQL: SET search_path TO "test_schema" 2018-04-17 00:09:26.802 +0900 [INFO] (0016:task-0000): > 0.00 seconds 2018-04-17 00:09:26.802 +0900 [INFO] (0016:task-0000): Copy SQL: COPY "test_schema"."test_table_00000162cf00ea23_embulk003" ("id", "testString") FROM STDIN 2018-04-17 00:09:26.856 +0900 [INFO] (0016:task-0000): Loading 5 rows (40 bytes) 2018-04-17 00:09:26.861 +0900 [INFO] (0016:task-0000): > 0.01 seconds (loaded 5 rows in total) 2018-04-17 00:09:26.864 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0} 2018-04-17 00:09:26.865 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/test_db options {user=test_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800} 2018-04-17 00:09:26.877 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "test_schema" 2018-04-17 00:09:26.878 +0900 [INFO] (0001:transaction): > 0.00 seconds 2018-04-17 00:09:26.880 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "test_schema"."test_table" ("id" TEXT, "testString" TEXT) 2018-04-17 00:09:26.895 +0900 [INFO] (0001:transaction): > 0.02 seconds 2018-04-17 00:09:26.898 +0900 [INFO] (0001:transaction): SQL: DELETE FROM "test_schema"."test_table" 2018-04-17 00:09:26.903 +0900 [INFO] (0001:transaction): > 0.01 seconds 2018-04-17 00:09:26.904 +0900 [INFO] (0001:transaction): SQL: INSERT INTO "test_schema"."test_table" ("id", "testString") SELECT "id", "testString" FROM "test_schema"."test_table_00000162cf00ea23_embulk000" UNION ALL SELECT "id", "testString" FROM "test_schema"."test_table_00000162cf00ea23_embulk001" UNION ALL SELECT "id", "testString" FROM "test_schema"."test_table_00000162cf00ea23_embulk002" UNION ALL SELECT "id", "testString" FROM "test_schema"."test_table_00000162cf00ea23_embulk003" 2018-04-17 00:09:26.908 +0900 [INFO] (0001:transaction): > 0.00 seconds (5 rows) 2018-04-17 00:09:26.915 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/test_db options {user=test_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2018-04-17 00:09:26.926 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "test_schema" 2018-04-17 00:09:26.927 +0900 [INFO] (0001:transaction): > 0.00 seconds 2018-04-17 00:09:26.927 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "test_schema"."test_table_00000162cf00ea23_embulk000" 2018-04-17 00:09:26.948 +0900 [INFO] (0001:transaction): > 0.02 seconds 2018-04-17 00:09:26.949 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "test_schema"."test_table_00000162cf00ea23_embulk001" 2018-04-17 00:09:26.953 +0900 [INFO] (0001:transaction): > 0.00 seconds 2018-04-17 00:09:26.953 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "test_schema"."test_table_00000162cf00ea23_embulk002" 2018-04-17 00:09:26.958 +0900 [INFO] (0001:transaction): > 0.01 seconds 2018-04-17 00:09:26.959 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "test_schema"."test_table_00000162cf00ea23_embulk003" 2018-04-17 00:09:26.963 +0900 [INFO] (0001:transaction): > 0.00 seconds 2018-04-17 00:09:26.964 +0900 [INFO] (main): Committed. 2018-04-17 00:09:26.965 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"/tmp/example_path/test.csv"},"out":{}} $
データの投入結果は以下
$ psql test_db -U test_user -p 5432 -h localhost -c "SELECT * FROM test_schema.test_table;" Password for user testuser: id | testString ----+------------ 1 | test1 2 | test2 3 | test3 4 | test4 5 | test5 (5 rows) $
まとめ
今回はcsvファイルのデータをembulkを使用してPostgreSQLに投入しました。先にも記載しましたが、このプラグインにはインプット用のプラグインもあります。そのためPostgreSQLからデータを抽出し、別のPostgreSQLにデータを移すこともできます(もちろん同じPostgreSQLの別テーブルに移すことも可)。
なので、データの抽出・投入を行う際の引き出しとして知ってて損はないと思いました。