データエンジニアリングで活用するPydantic

エンジニアの長部です。社内の分析用データ基盤構築の傍ら、物件連動基盤のリアーキテクチャや新規機能に関わる開発・設計に携わっています。

イタンジでは大量で多様な物件データを用いたETL処理を行っていますが、その信頼性向上のため行っている工夫の一つとして、Pydanticを利用したモデルベースのバリデーションについて紹介します。

イタンジにおける物件連動基盤とは

イタンジでは様々な経路で入力された物件情報を社内の物件情報基盤に取り込み、それを組み合わせたり加工した上で、社が運営するサービスで利用したり、必要に応じてさらに外部のシステムに出稿したりといったことを行っています。

イタンジ社内における物件連動システムの位置づけ

データ環境としての特色は、「社外から様々なフォーマットのデータが送られてくる」「物理的なファイル生成・取り込みが多い」ということです。

昨今のデータ基盤のトレンドに乗ってウェアハウス内で行う処理を厚くしてELTの形に寄せていきたいところですが、物件連動処理ではそもそもウェアハウスに流し込める形式にするところまでに様々な課題があります。

例えばログなどのデータの取り込みについて、社内に閉じたデータ基盤であれば標準的な形式を取り決めてそれを前提に実装すればよいのですが、イタンジの場合社外の様々な企業からデータを納品いただくことがあります。
CSVで来ることもJSONLで来ることもあり、エンコーディングや改行コードのようなファイルレベルの性質も考慮しなければなりません。カラム構成もばらばらな他、それぞれ仕様が変更されることもあります。
さらにファイルを得るまでの過程も様々で、(S)FTPやS3経由でデータを落としてくる、落としてきたデータはZIPで固められているのでまずは解凍、といったレベルの処理も考えなければいけません。

Transformしなければウェアハウスに容易にLoadできない状態でデータが来るという事情があり、必然的にETLの形で処理されるレイヤが分厚くなり、ウェアハウス外で取り組まなければならないデータ処理上の課題が多くなります。

リアーキテクチャ後の連動基盤では型付けやバリデーションを活用して、効率的にETL処理時の問題に対処できるような構成を目指しています。ここで役立っているのがPydanticというOSSです。

Pydanticについて

https://docs.pydantic.dev/

Pydanticは型注釈を利用してデータの検証を行うためのPython製のライブラリです。

フロントエンドに携わる方にはZodのようなライブラリであるというと分かりやすいかもしれません。予め定義したモデルの型や制約に当てはめる形で入力データをパースしようとし、解釈不能な部分があればバリデーションエラーを返してくれます。

FastAPIで採用されているためWebバックエンドでは広く活用されていますが、データエンジニアリングにおけるデータ処理の品質担保にも効果的に活用することができます。

例として、APIから以下のようなデータを取り出し、定期的にバッチ処理で加工するということを考えます。

{
    "rooms": [
        {
            "id": 1,
            "name": "イタンジマンション 101号室",
            "rent_price": 100000,
            "shikikin_price": 100000,
            "shikikin_type": "yen"
        },
        {
            "id": 2,
            "name": "イタンジマンション 102号室",
            "rent_price": 80000,
            "shikikin_price": 1,
            "shikikin_type": "month"
        },
        ...
    ]
}

そしてtypeに応じて分岐するなどして加工するわけですが、

def handle_shikikin(room: dict) -> str:
    shikikin_type = room["shikikin_type"]
    match shikikin_type:
        case "yen":
            # 円単位で来た際の処理
            return some_function(room.shikikin_price)
        case "month":
            # 賃料のnヶ月分という形式で来た際の処理
            return some_function(room.shikikin_price)

明確な仕様が不明でデータのみから仕様を推測する必要がある場合や、途中で仕様変更が発生した場合などに、後から想定外のデータが送られてくることがあります。

{
    "id": 3,
    "name": "イタンジマンション 103号室",
    "rent_price": 90000,
    "shikikin_price": 100,
    "shikikin_type": "percent"  # 未知の種別
}

こうした不安定な入力に対応するには、個別の処理で入力が想定する範囲に収まるかどうかのバリデーションを行っていく必要があります。

しかし、数百カラムから成るCSVを何段階も加工して取り込むような処理において、途中で経由する処理の入力が不明瞭であり、都度複雑なバリデーションや例外処理を行わなければならないとなると、コードのボリュームも複雑性も膨大になりメンテナンスが困難になってしまいます。

そこで、処理の頭で入力の曖昧さを消してあげることにします。データをAPIから取り出したタイミングでこのようにしてあげます。

class Room(BaseModel, extra=Extra.forbid):
    id: int
    name: str
    rent_price: int
    shikikin_price: int
    shikikin_type: Literal["yen", "month"]

class SomeApiResponse(BaseModel, extra=Extra.forbid):
    rooms: list[Room]

def handle_shikikin(room: Room) -> str:
    match room.shikikin_type:
        case "yen":
            # 円単位で来た際の処理
            return some_function(room.shikikin_price)
        case "month":
            # 賃料のnヶ月分という形式で来た際の処理
            return some_function(room.shikikin_price)

model = SomeApiResponse(**response)
for room in model.rooms:
    handle_shikikin(room)

こうすることで、まずレスポンスを想定する型に当てはめ、後段の処理ではここまでは検証済である前提で処理を書いていくことができます。

上記のように入力をモデルで縛った状態でshikikin_type="percent"のような想定外の入力が来た場合は、

1 validation error for SomeApiResponse
rooms -> 2 -> shikikin_type
  unexpected value; permitted: 'yen', 'month' (type=value_error.const; given=percent; permitted=('yen', 'month'))

モデルに当てはめた段階でこのようなエラーが生じることになります。

後段の処理ではモデルの定義を参照し、モデルの制約の内側にある値のみが来ることを想定すれば漏れのない網羅的な実装が可能になります。
型で縛って渡してあげることにより、後段の処理では補完が効くようになるというのも開発効率上見逃せないところです。

便利な使い方

詳細な制約を加えた型

id: PositiveInt
rent_price: NonNegativeInt
shikikin_price: NonNegativeInt

Pydanticでは組み込み型により詳細な制約を加えた型が用意されています。

https://docs.pydantic.dev/usage/types/

idがRDBから取得するauto incrementなid値なのであれば、最小値は1なので PositiveIntが適切です。価格のように0円であるケースも想定されるならNonNegativeIntとなります。例えばidに0、rent_priceに-1を指定すると以下のようなエラーとなります。

rooms -> 2 -> id
  ensure this value is greater than 0 (type=value_error.number.not_gt; limit_value=0)
rooms -> 2 -> rent_price
  ensure this value is greater than or equal to 0 (type=value_error.number.not_ge; limit_value=0)

その他、受け渡されるデータにJSON文字列やメールアドレス、URLなどが含まれることもありますが、これらに対してvalidな文字列かどうかを検証できる型も用意されています。

class Example(BaseModel):
    foo: Json
    bar: EmailStr
    baz: AnyUrl

Example(foo="aaa", bar="bbb", baz="ccc")
foo
  Invalid JSON (type=value_error.json)
bar
  value is not a valid email address (type=value_error.email)
baz
  invalid or missing URL scheme (type=value_error.url.scheme)

enumを利用したモデリング

価格種別を示すカテゴリ値を上記の例ではliteralで定義し、パターン外の値を不正なものとして弾いていました。同様のバリデーションをenumを経由して行うこともできます。

class MoneyType(str, Enum):
    YEN = "yen"
    MONTH = "month"
    PERCENT = "percent"

こうするとデータ処理時の分岐も型で守って網羅的に書くことができます。

def handle_shikikin(room: Room) -> str:
    match room.shikikin_type:
        case MoneyType.YEN:
            # 円単位で来た際の処理
            return some_function(room.shikikin_price)
        case MoneyType.MONTH:
            # 賃料のnヶ月分という形式で来た際の処理
            return some_function(room.shikikin_price)
        case MoneyType.PERCENT:
            # 賃料のn%という形式で来た際の処理
            return some_function(room.shikikin_price)

このようにするとcase文に指定する分岐に対するtypoなどもlinterで防げるほか、後からenum上のパターンが減ったにも関わらず分岐が残っているといった場合もlinterがエラーにしてくれます。

また、Pyrightなどの分岐網羅に対応した型チェッカを使っている場合は、新たにモデル定義にパターンが増えた場合もこのmatch文が網羅的でなくなるためエラーとして検出できます。例えば MoneyType.YEN に関する分岐を外すとPyrightでは以下のエラーが出ます。

Function with declared type of "str" must return value on all code paths
    Type "None" cannot be assigned to type "str"

入力の仕様が変更されてPydantic側のモデルを変更した際、そのモデルを参照する処理の追従が漏れると問題ですが、このようにすることでモデル追従時のロジックの修正漏れも防げることになります。

おわりに

イタンジでは不動産取引をなめらかにすべく、業界の負の解消にテクノロジーの力で取り組んでいます。
なめらかな不動産取引の実現には、なめらかなデータの流れが不可欠です。
物件連動基盤の構築に共に取り組んでいけるデータエンジニアを募集しています。以下からぜひお声がけください。