digdag+embulkを使ってテーブル毎にカラムを設定してBigQueryに流し込んでみた

横澤です、平素より格別のご高配を賜り、厚く御礼申し上げます。

先日DWH系のネタについてAthenafluentdの二つを書いたのですが、結論としてdigdag+embulkという構成に落ち着いてしまったのでここにご報告させて頂きます。

ここに至った細かい経緯は本記事の最後にまとめておきますので、まずは設定ファイル例を公開しようと思います。なお、digdagやembulkの細かい使い方についてはWEB上に良記事があるので割愛します。

まずメインの呼び出し先となる.digファイルです、定期実行タイミングをscheduleで登録し、!includeを使って登録DBの接続情報を外に出してgitignoreで管理しています。digdagにはsecret機能もあるのですが、server利用でないと扱えないようなので取り敢えずこのような形でgithubに上がらないように逃しています。

timezone: UTC

schedule:
  daily>: 10:00:00

+main:
  !include : my-db.dig

step1では全カラムを転送するテーブルを並べています、embulk用のymlファイルについては後ほど解説します。

  +step1:
    for_each>:
      table: [
        users
        , products
      ]
    _do:
      embulk>: mysql_to_bigquery.yml

step2では全カラムではなく、特定のカラム指定や特定の行指定を行いたいテーブルをembulk側のqueryで設定しています。データ量が大して大きくないテーブルならば特に考えずに全カラム、全行を投げても良いのですが、データ量がそこそこ大きいテーブルで全部やってしまうと稼働するインスタンスのコストが増加してしまいます。重厚長大なテーブルについてはMECEにデータを指定する事で転送用のインスタンスコストを抑えつつ転送速度を確保させています。

  +step2:
    embulk>: mysql_to_bigquery_orders.yml

続いては接続設定用の.digファイルです、これはdigdagワークフロー内で変数を展開するexport機能を使って指定するだけです。

_export:
  host: 'localhost'
  user: 'me'
  password: 'my-pass'
  database: 'my-database'
  project_id: 'my-projectid'
  dataset: 'my-dataset'

最後にembulk用のymlファイルです、digdag側から変数設定されて呼び出すのでシンプルにワンファイルで住むのが魅力的ですね。またbigqueryの転送にはembulk-output-bigqueryを使っているのですが、テンポラリテーブルを作ってスキーマに合わせたloadをしてくれるので楽且つ安全で嬉しい感じです。

in:
  type: mysql
  host: ${host}
  user: ${user}
  password: '${password}'
  database: ${database}
  table: ${table}
out:
  type: bigquery
  mode: replace
  auth_method: json_key
  json_keyfile: ./my-bigquery.json
  project: ${project_id}
  dataset: ${dataset}
  auto_create_table: true
  table: ${table}
  allow_quoted_newlines: true

また.digで設定したstep2用のembulk設定ファイルを用意します。こちらでは前述したデータ範囲を指定する為にembulk-input-mysqlのqueryキーワードを使って取得カラムや範囲を指定しています。

in:
  type: mysql
  host: ${host}
  user: ${user}
  password: '${password}'
  database: ${database}
  query: |
    select id,user_id,product_id from orders
out:
  type: bigquery
  mode: replace
  auth_method: json_key
  json_keyfile: ./my-bigquery.json
  project: ${project_id}
  dataset: ${dataset}
  auto_create_table: true
  table: orders
  allow_quoted_newlines: true

embulk-input-mysqlにはselectやwhereといった設定キーワードもあるのでもっと綺麗な感じに出来ないか試行錯誤していたのですが、取り敢えず動かす事を目標に泥臭く書いています。embulkプラグインのオプションは豊富に用意されているのでよく読むと様々な使い方を発見できそうです。

そもそもAthenaは「使ってみた」という感じなのでDWHとしての本番採用は最初から考えておらず、この業界特有の雑多なCSVを良い感じにリレーショナルDB的に扱えるかどうか試してみる、という目的で使っていました。 fluentdについてはしばらく運用していたのですが、バッファリングサーバーの可用性や拡張性を維持するのが結構大変で、途中で運用を止めたりデータを減らしたりと思うような運用が出来ておりませんでした。 しかし、よくよく考えると今回のDWHはアドホックな分析に使われる目的のみを期待されているので、リアルタイムにデータ連携される必要はなく、fluentdによるストリーミング処理は廃止してembulkによるバッチ処理に切り替える事を決めました。 embulkをshellなりコマンドで呼んでも良いのですが、複数DBやテーブル毎に微妙にカラム構造を変えたい、という要件があったのでワークフローエンジンを使って上手い事出来ないかを調べたらdigdagがどうやら良さそうだったので、取り敢えずdigdag+embulkで構成を作ってみました。 現在のところはこの構成で上手く回っており、redashからもGBQに接続出来るようにしたので今まではエンジニアでなければ出来なかった(エンジニアでもかなり面倒でした)サービスを横断したデータ取得が出来るようになりQualityOfDataLifeが向上したような予感がしています。

イタンジでは多くのデータを活用してサービスのグロースにコミットするプロダクトエンジニアと、探索的にデータを活用して全く別の兆候を発見する機械学習エンジニアを募集しております!