ITANDI TECH BLOG

イタンジのスタッフブログです。イベントや技術情報などを発信しています。

railsのactive jobでGoole Cloudのpub/subを使う

こんばんは、エンジニアの福崎です。 最近仕事でGCPを使ってます、AWSに比べて情報は少ないけど意外にドキュメント揃っているので ドキュメント読めば簡単なチュートリアルは動かせちゃいます。 まだネットに情報少ないので個人的には公式ドキュメントを読み漁るのが一番早かったです。 今日はrailsのactive jobでGAEのpub/subを使ってみたのでメモがてらブログに残します。

Pub/Subを使った背景

active job最初はいつもどおりsidekiqでやろうとしたんですがGCPにはフルマネージドなredisが無いみたいなので 自分でサーバー立てて管理するのはちょっと。。と思っていたらPub/Subに行き着きました。

Pub/Subとは

Cloud Pub/Sub はフルマネージドのリアルタイム メッセージング サービスで、個別のアプリケーション間でメッセージを送受信できます。

今回はこちらのメッセージングサービスにjobをキューイングしていきworkerでpullして処理します。

日本語チュートリアルもあるので読んでおくと良いと思います。

たこの内容はこちらの公式ドキュメントをベースにした内容になってます。

完成物

ここに置いてます。

前準備

pub/subを有効にする

GCPで新しいプロジェクトを作ってpub/subをapi managerから有効にしておいてください。(デフォが無効になってます)

rails環境の準備

今回はrails 5.1でやってます

認証ファイルの取得

GCPにログインして「API Manager」→「認証情報」→「認証情報を作成」→「サービスアカウントキー」→「GAEと選択」→「jsonを選択」→「作成」保存したjsonファイルを /config/pub-sub-sample-auth.jsonに設置してください。

project idの記述

/config/settings.ymlを作りproject_idにGCPのproject idを入れてください。

development:
  project_id: <your project_id>
  auth_file: pub-sub-sample-auth.json

実装

gemを入れる

Gem Fileにpub/sub用のgemを定義してbundleで入れておきます。

gem 'google-cloud-pubsub'

APIドキュメントがあるので目を通しておくと良いです。

application.rbの設定

/config/application.rbに以下の定義を追記します

# 今回作るadapterを使うよう定義します
config.active_job.queue_adapter = :pub_sub_queue
# libの下にadapterとか置くのでautoloadの対象にしておきます
config.autoload_paths << Rails.root.join('lib')
# GCP周りの設定をsettingファイルに定義するので
config.x.settings = Rails.application.config_for(:settings)
# GCPにログ吐くようにします
if Dir.exist?('/var/log/app_engine/custom_logs')
  config.logger = ActiveSupport::TaggedLogging.new Logger.new('/var/log/app_engine/custom_logs/application.log')
end

adapterの定義

今回の本丸です。 /lib/active_job/queue_adapters/pub_sub_queue_adapter.rbを以下の内容で作成します。

require 'json'
require 'google/cloud/pubsub'

module ActiveJob
  module QueueAdapters
    class PubSubQueueAdapter

      def enqueue(job)
        Rails.logger.info "[PubSubQueueAdapter] enqueue job #{job.inspect}"

        topic = PubSubQueueAdapter.pubsub.topic(job.queue_name, autocreate: true)

        topic.publish(job.class.name, arg: job.arguments)
      end

      class << self
        def pubsub
          @pubsub ||= begin
            project_id = Rails.application.config.x.settings['project_id']
            Google::Cloud::Pubsub.new(
              project: project_id,
              keyfile: "#{Rails.root.join('config')}/#{Rails.application.config.x.settings['auth_file']}"
            )
          end
        end

        def run_worker!(queue_name = 'default')
          p 'Running worker'

          topic        = pubsub.topic(queue_name, autocreate: true)
          subscription = topic.subscription("#{queue_name}_task")

          topic.subscribe("#{queue_name}_task") if subscription.blank?

          subscription.listen(autoack: true) do |message|
            message.data.constantize.send(:perform_now, *JSON.parse(message.attributes['arg']))
          end
        end
      end
    end
  end
end

workerタスクの作成

後はいつもどおりjobを定義します。 app/jobs/sample_job.rb

class SampleJob < ApplicationJob
  queue_as :default
  def perform(num)
    p '==================='
    p "#{num} sample job executed!"
    p '==================='
  end
end

そしてjobを発行するrakeタスクを用意 lib/tasks/sample_job.rake

desc 'issue job'
task issue_job: :environment do
  (1..5).each do |num|
    SampleJob.perform_later(num)
  end
end

早速キューイングしてみましょう

bundle exec rake issue_job

最後にworkerを起動するとキューイングしたjobが処理されていくのを見ることが出来ます

bundle exec rake run_worker

まとめ

いかがでしたか? sidekiq程高機能ではないので時間を指定しての実行などは出来ませんが 単純にworkerの機能がほしいのであればこれでも十分です。 GCPを使う際はpub/subを是非使ってみて下さい!

今回はlocalで動かしたので次回はこれをGAEにdeployする方法を書きたいと思います