こんばんは、エンジニアの福崎です。 最近仕事でGCPを使ってます、AWSに比べて情報は少ないけど意外にドキュメント揃っているので ドキュメント読めば簡単なチュートリアルは動かせちゃいます。 まだネットに情報少ないので個人的には公式ドキュメントを読み漁るのが一番早かったです。 今日はrailsのactive jobでGAEのpub/subを使ってみたのでメモがてらブログに残します。
Pub/Subを使った背景
active job最初はいつもどおりsidekiqでやろうとしたんですがGCPにはフルマネージドなredisが無いみたいなので 自分でサーバー立てて管理するのはちょっと。。と思っていたらPub/Subに行き着きました。
Pub/Subとは
Cloud Pub/Sub はフルマネージドのリアルタイム メッセージング サービスで、個別のアプリケーション間でメッセージを送受信できます。
今回はこちらのメッセージングサービスにjobをキューイングしていきworkerでpullして処理します。
日本語チュートリアルもあるので読んでおくと良いと思います。
またこの内容はこちらの公式ドキュメントをベースにした内容になってます。
完成物
ここに置いてます。
前準備
pub/subを有効にする
GCPで新しいプロジェクトを作ってpub/subをapi managerから有効にしておいてください。(デフォが無効になってます)
rails環境の準備
今回はrails 5.1でやってます
認証ファイルの取得
GCPにログインして「API Manager」→「認証情報」→「認証情報を作成」→「サービスアカウントキー」→「GAEと選択」→「jsonを選択」→「作成」保存したjsonファイルを /config/pub-sub-sample-auth.json
に設置してください。
project idの記述
/config/settings.yml
を作りproject_idにGCPのproject idを入れてください。
development:
project_id: <your project_id>
auth_file: pub-sub-sample-auth.json
実装
gemを入れる
Gem Fileにpub/sub用のgemを定義してbundleで入れておきます。
gem 'google-cloud-pubsub'
APIドキュメントがあるので目を通しておくと良いです。
application.rbの設定
/config/application.rbに以下の定義を追記します
# 今回作るadapterを使うよう定義します
config.active_job.queue_adapter = :pub_sub_queue
# libの下にadapterとか置くのでautoloadの対象にしておきます
config.autoload_paths << Rails.root.join('lib')
# GCP周りの設定をsettingファイルに定義するので
config.x.settings = Rails.application.config_for(:settings)
# GCPにログ吐くようにします
if Dir.exist?('/var/log/app_engine/custom_logs')
config.logger = ActiveSupport::TaggedLogging.new Logger.new('/var/log/app_engine/custom_logs/application.log')
end
adapterの定義
今回の本丸です。 /lib/active_job/queue_adapters/pub_sub_queue_adapter.rbを以下の内容で作成します。
require 'json'
require 'google/cloud/pubsub'
module ActiveJob
module QueueAdapters
class PubSubQueueAdapter
def enqueue(job)
Rails.logger.info "[PubSubQueueAdapter] enqueue job #{job.inspect}"
topic = PubSubQueueAdapter.pubsub.topic(job.queue_name, autocreate: true)
topic.publish(job.class.name, arg: job.arguments)
end
class << self
def pubsub
@pubsub ||= begin
project_id = Rails.application.config.x.settings['project_id']
Google::Cloud::Pubsub.new(
project: project_id,
keyfile: "#{Rails.root.join('config')}/#{Rails.application.config.x.settings['auth_file']}"
)
end
end
def run_worker!(queue_name = 'default')
p 'Running worker'
topic = pubsub.topic(queue_name, autocreate: true)
subscription = topic.subscription("#{queue_name}_task")
topic.subscribe("#{queue_name}_task") if subscription.blank?
subscription.listen(autoack: true) do |message|
message.data.constantize.send(:perform_now, *JSON.parse(message.attributes['arg']))
end
end
end
end
end
end
workerタスクの作成
後はいつもどおりjobを定義します。 app/jobs/sample_job.rb
class SampleJob < ApplicationJob
queue_as :default
def perform(num)
p '==================='
p "#{num} sample job executed!"
p '==================='
end
end
そしてjobを発行するrakeタスクを用意 lib/tasks/sample_job.rake
desc 'issue job'
task issue_job: :environment do
(1..5).each do |num|
SampleJob.perform_later(num)
end
end
早速キューイングしてみましょう
bundle exec rake issue_job
最後にworkerを起動するとキューイングしたjobが処理されていくのを見ることが出来ます
bundle exec rake run_worker
まとめ
いかがでしたか? sidekiq程高機能ではないので時間を指定しての実行などは出来ませんが 単純にworkerの機能がほしいのであればこれでも十分です。 GCPを使う際はpub/subを是非使ってみて下さい!
今回はlocalで動かしたので次回はこれをGAEにdeployする方法を書きたいと思います