こんにちは、イタンジ株式会社でエンジニアをしている磯谷です。 私は現在、物件情報を取り込むワークフローシステムの開発に携わっています。 そのアーキテクチャの再設計を検討する中で、Rails 8.1で導入されたActive Job Continuationsについて調査したので書いていきます。
はじめに
これまでRailsにおいて、コンテナ環境でのデプロイと長時間ジョブの両立には、少なからず工夫が必要な場面がありました。コンテナのライフサイクルは短く、デプロイ時には古いコンテナが停止されます。長時間ジョブがその巻き添えになると、処理は強制終了され、最初からのやり直しが必要になります。
こうした課題に対して、これまではShopifyのjob-iterationのような外部gemを導入する必要がありました。
Rails 8.1 で導入された Active Job Continuations は、こうした機能をRails標準で提供し、外部gemなしに「Retry(やり直し)」ではなく「Resume(再開)」というアプローチを実現できるようにする注目の機能です。
公式ニュースの言葉を借りつつ説明すると、この機能は長時間ジョブを個別のステップに分割し、再起動後に最初からやり直すのではなく完了済みのステップをスキップして未完了のステップから実行を続行できるようにするものです。
この機能を利用する際の実装はシンプルで、以下のようにActiveJob::Continuableをincludeしstepメソッドを使ってステップを定義するだけです。
class SampleJob < ApplicationJob include ActiveJob::Continuable def perform step :first_step do ProcessA.run end # first_step完了後中断された場合、次はこのステップから再開される step :second_step do ProcessB.run end end end
機能の詳細は公式ガイド等に譲るとして、本記事では運用上特に気になった以下の2点を深く掘り下げます。
- 内部実装: ジョブはどのように進捗を保存しながら中断し、再開しているのか?
- 挙動検証: 再開時にジョブのstepが追加されていた場合はどうなるのか?
内部実装: 中断と再開の仕組み
Active Job Continuationの中断と再開は、以下のような仕組みによって実現されています。
- 中断: 現在の進捗をJobのインスタンスに保持し、SIGTERMを検知したらその情報もシリアライズして再エンキューする
- 再開: デシリアライズされた進捗情報をもとに、完了済みのステップをスキップする
ここからは、進捗の保存、そして中断と再開の流れについてのコードを順を追って見ていきたいと思います。
進捗の保存
まず進捗の保存についてですが、ActiveJob::Continuableをincludeすると、初期化時にActiveJob::Continuationクラスのオブジェクトがインスタンス変数として用意されます。
https://github.com/rails/rails/blob/v8.1.0/activejob/lib/active_job/continuable.rb
# frozen_string_literal: true module ActiveJob # = Active Job Continuable # # The Continuable module provides the ability to track the progress of your # jobs, and continue from where they left off if interrupted. # # Mix ActiveJob::Continuable into your job to enable continuations. # # See {ActiveJob::Continuation}[rdoc-ref:ActiveJob::Continuation] for usage. # module Continuable extend ActiveSupport::Concern included do # ...省略... def initialize(...) super(...) self.resumptions = 0 self.continuation = Continuation.new(self, {}) end end # The number of times the job has been resumed. attr_accessor :resumptions attr_accessor :continuation # :nodoc: # ...省略... end # ...省略... end
このcontinuationオブジェクトが状態管理の要です。ジョブ内でstepメソッドが呼び出されると、continuationオブジェクトは現在の実行ステップや、完了したステップ名をメモリ上に記録します。
https://github.com/rails/rails/blob/v8.1.0/activejob/lib/active_job/continuation.rb#L299-L313
def run_step_inline(name, start:, **options, &block) @running_step = true @current ||= new_step(name, start, resumed: false) instrumenting_step(current) do block.call(current) end @completed << current.name @current = nil @advanced = true ensure @running_step = false @advanced ||= current&.advanced? end
しかし、この時点ではまだ情報はメモリ上にしかありません。これらが永続化されるのは、中断が発生したタイミングです。
中断
ここではActive JobバックエンドとしてSolid Queueを利用する想定としますが、Solid QueueはSIGTERMを受け取ると、そのキューアダプタでstopping?がtrueを返すように実装されています。
# frozen_string_literal: true module ActiveJob module QueueAdapters # == Active Job SolidQueue adapter # # To use it set the queue_adapter config to +:solid_queue+. # # Rails.application.config.active_job.queue_adapter = :solid_queue class SolidQueueAdapter < (Rails::VERSION::MAJOR == 7 && Rails::VERSION::MINOR == 1 ? Object : AbstractAdapter) class_attribute :stopping, default: false, instance_writer: false SolidQueue.on_worker_stop { self.stopping = true } # ...省略... end end end
一方、ジョブ側ではstepメソッドの実行直前にcheckpoint!が呼ばれ、バックエンドが終了しようとしているかを確認します。
https://github.com/rails/rails/blob/v8.1.0/activejob/lib/active_job/continuable.rb#L36-L50
def step(step_name, start: nil, isolated: false, &block) # ...省略... checkpoint! if continuation.advanced? continuation.step(step_name, start: start, isolated: isolated, &block) end
https://github.com/rails/rails/blob/v8.1.0/activejob/lib/active_job/continuable.rb#L62-L64
def checkpoint! # :nodoc: interrupt!(reason: :stopping) if queue_adapter.stopping? end
もしstopping?がtrueであれば、ここでContinuation::Interruptという例外が発生します。
https://github.com/rails/rails/blob/v8.1.0/activejob/lib/active_job/continuable.rb#L66-L69
def interrupt!(reason:) # :nodoc: instrument :interrupt, reason: reason, **continuation.instrumentation raise Continuation::Interrupt, "Interrupted #{continuation.description} (#{reason})" end
この例外はaround_performに設定されているcontinueメソッド内でrescueされます。
https://github.com/rails/rails/blob/v8.1.0/activejob/lib/active_job/continuable.rb#L72-L89
def continue(&block) if continuation.started? self.resumptions += 1 instrument :resume, **continuation.instrumentation end block.call rescue Continuation::Interrupt => e resume_job(e) # ...省略... end
resume_jobは最終的にジョブのエンキューを行います。エンキューの際にジョブはシリアライズされますが、ActiveJob::Continuableをincludeしたジョブではserializeがオーバーライドされており、進捗の情報を追加するようになっています。
https://github.com/rails/rails/blob/v8.1.0/activejob/lib/active_job/continuable.rb#L52-L54
def serialize # :nodoc: super.merge("continuation" => continuation.to_h, "resumptions" => resumptions) end
このように、進捗の情報をジョブのシリアライズされたデータ(job_data)の一部として保存しており、途中からの再開ができる用意を整えています。
ただし、キューアダプタがstopping?に対応している必要がある点には注意が必要です。
主要なアダプタの最新バージョンでの対応状況は以下の通りです(2025年12月執筆時点)。
| アダプタ | 対応状況 |
|---|---|
| Solid Queue | ✅ 対応済み |
| Sidekiq | ✅ 対応済み |
| Resque | ❌ 未対応 |
| Delayed Job | ❌ 未対応 |
未対応のアダプタを使用している場合は、Active Job Continuationsを利用するために独自の実装が必要になります。
再開
最後に、再開の動きです。以下のようにdeserializeがオーバーライドされており、中断されたジョブが再度実行される際には進捗の情報をメモリ上に復元します。
https://github.com/rails/rails/blob/v8.1.0/activejob/lib/active_job/continuable.rb#L56-L60
def deserialize(job_data) # :nodoc: super self.continuation = Continuation.new(self, job_data.fetch("continuation", {})) self.resumptions = job_data.fetch("resumptions", 0) end
この状態でstepメソッドに差し掛かると、以下のような判定により完了済みの処理はステップごとスキップされ、未実行のステップから処理が再開されます。
https://github.com/rails/rails/blob/v8.1.0/activejob/lib/active_job/continuation.rb#L224-L233
def step(name, **options, &block) # :nodoc: validate_step!(name) encountered << name if completed?(name) skip_step(name) else run_step(name, **options, &block) end end
挙動検証: step追加時の動作
ここまで、内部実装を追ってきて中断時に進捗の状態が永続化されていることがわかりました。 ここからは実際に手元で再開時の挙動を観察し、特に再開時にジョブのstepが追加されていた場合にどのような挙動となるのか確認してみます。
実際に中断し再開させてみる
まずは正常系の確認です。以下でもActive JobのバックエンドとしてSolid Queueを利用しています。
次のようなジョブを実行し、SECOND STEPが表示された直後にプロセスにSIGTERMを送ります。
class SampleJob < ApplicationJob include ActiveJob::Continuable def perform step :first_step do p "FIRST STEP" end step :second_step do p "SECOND STEP" sleep(5) end step :third_step do p "THIRD STEP" end end end
すると、当然ではありますが、Solid Queueのプロセス再開時にはTHIRD STEPから表示されることが確認できます。
solid_queue_jobsテーブルを見ても、進捗の情報をcontinuationキーに含みつつ再エンキューされたレコードを確認することができます。
mysql> select arguments from solid_queue_jobs order by id desc limit 1\G
*************************** 1. row ***************************
arguments: {
"job_class": "SampleJob",
"job_id": "7ec34b22-0092-472e-8d26-d4402a0d81b9",
"provider_job_id": 34,
"queue_name": "default",
"priority": null,
"arguments": [],
"executions": 1,
"exception_executions": { "Interrupted after 'second_step' (stopping)": 1 },
"locale": "en",
"timezone": "UTC",
"enqueued_at": "2025-12-16T03:33:33.142678502Z",
"scheduled_at": "2025-12-16T03:33:38.124035120Z",
"continuation": { "completed": ["first_step", "second_step"] },
"resumptions": 0
}
再開時にジョブのstepが追加されていた場合の挙動
ここからが本題です。再開時にジョブのstepが追加されていた場合です。
先ほどの正常系と同じジョブを同様に中断し再エンキューされている状態で、以下のようにステップを追加した場合はどうなるのでしょうか?
class SampleJob < ApplicationJob include ActiveJob::Continuable def perform step :first_step do p "FIRST STEP" end step :added_step do p "ADDED STEP" end step :second_step do p "SECOND STEP" sleep(5) end step :third_step do p "THIRD STEP" end end end
このように完了しているsecond_stepの上にadded_stepを追加した状態でジョブを再開しようとすると、ジョブはActiveJob::Continuation::InvalidStepErrorを発生させ失敗してしまいます。
{
"exception_class":"ActiveJob::Continuation::InvalidStepError",
"message":"Step 'added_step' found, expected to see 'second_step'",
...
}
これは、ステップ実行時にvalidate_step!によって「保存された完了ステップの履歴」と「現在のコード上のステップ定義」の整合性をチェックしているためです。
なお、完了しているsecond_stepの後にステップを追加する場合は、問題なく処理されます。
運用においてステップ構成を変更するデプロイを行う際は、この挙動に十分注意する必要がありそうです。
おわりに
本記事では、Rails 8.1 Active Job Continuations の内部実装と、再開時の挙動について調査・検証しました。
その中で見えてきたのは、この機能が例外処理を利用した再エンキューと、引数への進捗情報のシリアライズによって実現されているということです。 また、ステップ構造の変更の際には整合性に注意する必要がある点は、運用する上での重要な知見と言えるでしょう。
冒頭で触れた「コンテナのライフサイクルと長時間ジョブの両立」という課題に対して、この機能は1つの解決策を提示していると思います。 「Retry(やり直し)」から「Resume(再開)」への転換は、デプロイ戦略における制約を減らし、より柔軟な運用の可能性を広げてくれるかもしれないと思いました。
私たちが扱っている物件情報のCSV取り込みワークフローでも、データ量によっては処理に数十分以上かかることがあります。こうした長時間ジョブで、デプロイのたびに最初からやり直すのではなく途中から再開できるようになれば、運用上の大きなメリットになると感じています。
実際のプロジェクトでこの機能を活用する際は、まず使用しているキューアダプタの対応状況を確認することをお勧めします。
また、今回は触れられなかったループ処理におけるCursorsの活用など、より実践的なユースケースについても検証の余地があると思います。
最後までお読みいただき、ありがとうございました。