どうもこんにちは、イタンジ開発のコヤマです。物件連動チームに所属しています。 テックブログデビュー戦です。一発どうぞよろしくお願いいたします🤜🏻🤛🏻
背景
物件連動チームにはEmbulkを利用したプロダクトがあります。 私がチーム異動したのが最近であること、元々Embulkを知らなかったことから設定ファイルを見たり処理の流れなどを追ったりするのに苦労しました。 苦労したんですがなんとなくそのままになってたので、この機会によく見てみようと思います! 特に初見でよくわからなかったFilterプラグインにフォーカスします。
Embulkとは
Embulkとはプラグインを利用して、データを効率的に転送・処理できるETL(Extract, Transform, Load)ツールの一種です。 ETL(Extract, Transform, Load)とは、データを異なるソースから抽出(Extract)、必要に応じて変換(Transform)、そして別のデータストレージに格納(Load)するプロセスのことです。
イタンジでは物件情報基盤に物件データを取り込む際にEmbulkを活用しています 。イタンジの物件連動システムについてはこちらにも記述がありますのでぜひ!
プラグインとは
Embulkでは、さまざまな形式のデータソースやフォーマットに対応するために、プラグインという拡張機能を利用します。 プラグインは、Embulkの機能を拡張するための追加コードであり、inputプラグイン、outputプラグイン、filterプラグインなどがあります。 これらを組み合わせることで、データの読み込み、変換、書き込みを効率的に行うことができます。
また、プラグインは拡張カスタマイズすることができます。
Filterプラグインのサンプルから処理を学ぼう
イタンジでも、Filterプラグインをカスタマイズして、取り込んだデータを整形しています。
このFilterプラグインの処理を追うのが初見では辛かったので、Filterプラグインのサンプルコードを見ながら説明します。
サンプルコードはEmbulkの環境が整っていれば以下のコマンドで生成できます。
$ embulk new ruby-filter sample_filter
ぱっと見だと、各メソッドの処理や引数の意味が分からなくてスクロールしたくなりますが人差し指をぐっと耐えてください。
module Embulk module Filter class SampleFilter < FilterPlugin Plugin.register_filter("sample_filter", self) def self.transaction(config, in_schema, &control) # configuration code: task = { "option1" => config.param("option1", :integer), # integer, required "option2" => config.param("option2", :string, default: "myvalue"), # string, optional "option3" => config.param("option3", :string, default: nil), # string, optional } columns = [ Column.new(nil, "example", :string), Column.new(nil, "column", :long), Column.new(nil, "value", :double), ] out_columns = in_schema + columns yield(task, out_columns) end def init # initialization code: @option1 = task["option1"] @option2 = task["option2"] @option3 = task["option3"] end def close end def add(page) # filtering code: add_columns = ["example",1,1.0] page.each do |record| page_builder.add(record + add_columns) end end def finish page_builder.finish end end end end
メソッド毎にみていきます。
transactionメソッド
def self.transaction(config, in_schema, &control) # confingで定義した値を読み込む task = { "option1" => config.param("option1", :integer), "option2" => config.param("option2", :string, default: "myvalue"), "option3" => config.param("option3", :string, default: nil), } # 新しいカラムを定義(任意) columns = [ Column.new(nil, "example", :string), Column.new(nil, "column", :long), Column.new(nil, "value", :double), ] # 出力スキーマを定義 out_columns = in_schema + columns # 処理を開始 yield(task, out_columns) end
EmbulkのFilterプラグインの初期設定を行うためのメソッドです。
Filterプラグインが実行される際に実行され、初期設定を行い、フィルタ処理に必要な情報をtaskに格納します。また、出力スキーマを定義し、次のプラグインに渡します。
引数
- config: 設定ファイルに記述したプラグインの設定が渡されます。
- in_schema: 入力データの構造(カラム名、データ型など)を定義したスキーマです。
- &control: コードブロックを受け取るための引数です。
- Filterプラグインの場合、&controlにはフィルタ処理を呼び出すためのブロックが渡されます。このブロックを実行することで、フィルタ処理が実行されます。
- プラグイン間でデータ受け渡しなどの制御ができます。
- &controlブロックは、yieldで簡単に実行できるため、通常はyieldを使用します。(Rubyの話)
# どちらも同じ # yieldを使用して&controlブロックを実行 yield(config, out_columns) # control.callを使用して&controlブロックを実行(同じ効果) control.call(config, out_columns)
initメソッド
def init # 初期化 @option1 = task["option1"] @option2 = task["option2"] @option3 = task["option3"] end
プラグインのインスタンスが生成される際に呼び出されて初期化します。プラグインが使用する変数やデータなどを定義します。
closeメソッド
フィルタリングの処理が完了し、addメソッドで最後のページが処理された後にリソースの解放などの後処理を行うために呼び出されます。記述を省略することもできます。
addメソッド
def add(page) # レコードごとに実行される処理 add_columns = ["example",1,1.0] page.each do |record| page_builder.add(record + add_columns) # 加工後のデータを格納 end end
pageの各レコードをフィルタリングし、加工後のデータをpage_builderに追加する処理を行います。
- page: フィルタリングを行う対象のデータが格納されたものです。CSVからデータを読み込んでいる場合、pageはCSVファイルの内容を指します。
finishメソッド
def finish # プラグインの処理を終了する page_builder.finish end
フィルタリングが完了したときに、最後に呼び出されるメソッドです。
まとめ
実際のコードだと、データの定義や加工処理などがあって混乱しますが、サンプルコードだとなんとかいけたんじゃないでしょうか!
同じピヨっこエンジニアの手助けになれたら幸いです。共に頑張りましょう。
また、初めて記事を書きましたが大変でした。いつも調べたら当たり前のようにある情報も当たり前ですが誰かが時間をかけて書いてくれてることを実感しました。 ありがとうございます。