Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
521565c
feat(rails): add messaging span data to ActiveJob consumer transaction
solnic May 7, 2026
4201e41
feat(rails): emit producer span when enqueueing ActiveJob
solnic May 7, 2026
6baadf3
feat(rails): propagate trace context through ActiveJob payload
solnic May 7, 2026
c4d8085
fixup(rails): account for AJ producer span in active_storage subscrib…
solnic May 7, 2026
79a3418
feat(rails): propagate whitelisted user context through ActiveJob pay…
solnic May 7, 2026
6c1634f
feat(rails): isolate Sentry hub per worker thread for ActiveJob
solnic May 7, 2026
8994ac6
refactor(rails): bundle ActiveJob tracing examples into a distributed…
solnic May 7, 2026
529d724
fixup(rails): widen latency tolerance on Rails < 7 in messaging_span_…
solnic May 7, 2026
b610cb2
refactor(rails): introduce worker_thread harness hook for the hub-iso…
solnic May 7, 2026
d61680a
fix(rails): no with_usec in 7.0
solnic May 8, 2026
ffc2bc9
chore(rails): patch AJ test adapter for 5.2
solnic May 12, 2026
0d4d3c0
fix(active_job): always emit retry count when job is retryable
solnic May 12, 2026
a4e2010
feat(active_job): add active_job_propagate_traces config option
solnic May 12, 2026
3bc2cfa
feat(active_job): set scope tags and context on consumer like Sidekiq
solnic May 12, 2026
58f8913
fix(active_job): avoid shared queue race in jruby
solnic May 12, 2026
03ad132
fix(active_job): save and restore hub around job execution
solnic May 12, 2026
642fa49
fix(active_job): run isolation threads sequentially to avoid transpor…
solnic May 12, 2026
bc8bb65
fix(active_job): correct retry counter
solnic May 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 124 additions & 6 deletions sentry-rails/lib/sentry/rails/active_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,61 @@
module Sentry
module Rails
module ActiveJobExtensions
SENTRY_PAYLOAD_KEY = "_sentry"

USER_FIELDS_WHITELIST = %w[id email username].freeze

def perform_now
if !Sentry.initialized? || already_supported_by_sentry_integration?
super
else
SentryReporter.record(self) do
super
SentryReporter.record(
self,
trace_headers: @_sentry_trace_headers,
user: @_sentry_user
) { super }
end
end

def serialize
payload = super
return payload if !Sentry.initialized? || already_supported_by_sentry_integration?

begin
sentry_data = {}
if Sentry.configuration.rails.active_job_propagate_traces
headers = Sentry.get_trace_propagation_headers
sentry_data["trace_propagation_headers"] = headers if headers && !headers.empty?
end

if Sentry.configuration.send_default_pii
user = Sentry.get_current_scope.user || {}
whitelisted = user.each_with_object({}) do |(k, v), acc|
acc[k.to_s] = v if USER_FIELDS_WHITELIST.include?(k.to_s)
end
sentry_data["user"] = whitelisted unless whitelisted.empty?
end

payload[SENTRY_PAYLOAD_KEY] = sentry_data unless sentry_data.empty?
rescue StandardError => e
Sentry.sdk_logger&.error("sentry-rails: failed to inject _sentry payload: #{e}")
end

payload
end

def deserialize(job_data)
super
return if !Sentry.initialized? || already_supported_by_sentry_integration?

begin
sentry_data = job_data[SENTRY_PAYLOAD_KEY]
return unless sentry_data

@_sentry_trace_headers = sentry_data["trace_propagation_headers"]
@_sentry_user = sentry_data["user"]
rescue StandardError => e
Sentry.sdk_logger&.error("sentry-rails: failed to extract _sentry payload: #{e}")
end
end

Expand All @@ -28,19 +76,67 @@ class SentryReporter
}

class << self
def record(job, &block)
def producer_callback_registered?
@producer_callback_registered ||= false
end

def producer_callback_registered!
@producer_callback_registered = true
end

def record_producer_span(job)
return yield if !Sentry.initialized? || job.already_supported_by_sentry_integration?

Sentry.with_child_span(op: "queue.publish", description: job.class.name) do |span|
if span
span.set_origin(SPAN_ORIGIN)
span.set_data(Sentry::Span::DataConventions::MESSAGING_MESSAGE_ID, job.job_id)
span.set_data(Sentry::Span::DataConventions::MESSAGING_DESTINATION_NAME, job.queue_name)
end
yield
end
end

def record(job, trace_headers: nil, user: nil, &block)
# Always give this thread a fresh hub cloned from the main hub so
# the job's events are fully isolated. Save and restore whatever
# hub was on the thread before (e.g. the Rack request hub set by
# CaptureExceptions, or a stale hub left by a recycled thread-pool
# thread) so the outer context continues working correctly after
# the job finishes.
original_hub = Thread.current.thread_variable_get(Sentry::THREAD_LOCAL)
Sentry.clone_hub_to_current_thread

Sentry.with_scope do |scope|
begin
scope.set_user(user) if user && !user.empty?
scope.set_transaction_name(job.class.name, source: :task)
scope.set_tags(queue: job.queue_name)
scope.set_contexts(active_job: {
job_class: job.class.name,
job_id: job.job_id,
queue: job.queue_name,
provider_job_id: job.provider_job_id
})

transaction = Sentry.start_transaction(
transaction_options = {
name: scope.transaction_name,
source: scope.transaction_source,
op: OP_NAME,
origin: SPAN_ORIGIN
)
}

scope.set_span(transaction) if transaction
transaction = if trace_headers && !trace_headers.empty?
continued = Sentry.continue_trace(trace_headers, **transaction_options)
Sentry.start_transaction(transaction: continued, **transaction_options)
else
Sentry.start_transaction(**transaction_options)
end

if transaction
set_messaging_data(transaction, job)
scope.set_span(transaction)
end

yield.tap do
finish_sentry_transaction(transaction, 200)
Expand All @@ -53,6 +149,28 @@ def record(job, &block)
raise
end
end
ensure
Thread.current.thread_variable_set(Sentry::THREAD_LOCAL, original_hub)
end

def set_messaging_data(transaction, job)
transaction.set_data(Sentry::Span::DataConventions::MESSAGING_MESSAGE_ID, job.job_id)
transaction.set_data(Sentry::Span::DataConventions::MESSAGING_DESTINATION_NAME, job.queue_name)

if job.class.rescue_handlers.any?
transaction.set_data(Sentry::Span::DataConventions::MESSAGING_MESSAGE_RETRY_COUNT, [job.executions.to_i - 1, 0].max)
end

if (latency = compute_latency(job))
transaction.set_data(Sentry::Span::DataConventions::MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)
end
end

def compute_latency(job)
return unless job.respond_to?(:enqueued_at) && job.enqueued_at

enqueued_time = job.enqueued_at.is_a?(String) ? Time.parse(job.enqueued_at) : job.enqueued_at
((Time.now.to_f - enqueued_time.to_f) * 1000).round
end

def capture_exception(job, e)
Expand Down
6 changes: 6 additions & 0 deletions sentry-rails/lib/sentry/rails/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ class Configuration
# Set this option to true if you want Sentry to capture each retry failure
attr_accessor :active_job_report_on_retry_error

# Whether we should inject trace propagation headers into the serialized job
# payload in order to have a connected trace between producer and consumer.
# Defaults to true. Set to false to opt out.
attr_accessor :active_job_propagate_traces

# Configuration for structured logging feature
# @return [StructuredLoggingConfiguration]
attr_reader :structured_logging
Expand All @@ -193,6 +198,7 @@ def initialize
@db_query_source_threshold_ms = 100
@active_support_logger_subscription_items = Sentry::Rails::ACTIVE_SUPPORT_LOGGER_SUBSCRIPTION_ITEMS_DEFAULT.dup
@active_job_report_on_retry_error = false
@active_job_propagate_traces = true
@structured_logging = StructuredLoggingConfiguration.new
end
end
Expand Down
7 changes: 7 additions & 0 deletions sentry-rails/lib/sentry/rails/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ class Railtie < ::Rails::Railtie
ActiveSupport.on_load(:active_job) do
require "sentry/rails/active_job"
prepend Sentry::Rails::ActiveJobExtensions

unless Sentry::Rails::ActiveJobExtensions::SentryReporter.producer_callback_registered?
around_enqueue do |job, block|
Sentry::Rails::ActiveJobExtensions::SentryReporter.record_producer_span(job, &block)
end
Sentry::Rails::ActiveJobExtensions::SentryReporter.producer_callback_registered!
end
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,43 @@ def perform
expect(transaction.contexts.dig(:trace, :status)).to eq("ok")
end

it "sets queue scope tag on the consumer transaction" do
successful_job.set(queue: "important").perform_later
drain

transaction = sentry_events.find { |e| e.is_a?(Sentry::TransactionEvent) }
expect(transaction).not_to be_nil
expect(transaction.tags[:queue]).to eq("important")
end

it "sets active_job context on the consumer transaction" do
successful_job.perform_later
drain

transaction = sentry_events.find { |e| e.is_a?(Sentry::TransactionEvent) }
expect(transaction).not_to be_nil

ctx = transaction.contexts[:active_job]
expect(ctx).not_to be_nil
expect(ctx[:job_class]).to eq(successful_job.name)
expect(ctx[:job_id]).to be_a(String).and(satisfy { |v| !v.empty? })
expect(ctx[:queue]).to eq("default")
end

it "sets active_job context on the error event" do
expect do
failing_job.perform_later
drain
end.to raise_error(RuntimeError, /boom from tracing spec/)

error_event = sentry_events.find { |e| e.is_a?(Sentry::ErrorEvent) }
expect(error_event).not_to be_nil

ctx = error_event.contexts[:active_job]
expect(ctx).not_to be_nil
expect(ctx[:job_class]).to eq(failing_job.name)
end

it "records a db.sql.active_record child span when the job performs a query" do
query_job = job_fixture do
def perform
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

RSpec.shared_examples "an ActiveJob backend that supports distributed tracing" do
it_behaves_like "an ActiveJob backend that emits a producer span on enqueue"
it_behaves_like "an ActiveJob backend that propagates trace context through the job payload"
it_behaves_like "an ActiveJob backend that records messaging span data on the consumer transaction"
it_behaves_like "an ActiveJob backend that propagates Sentry user context through job payloads"
it_behaves_like "an ActiveJob backend that isolates Sentry context per worker thread"
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# frozen_string_literal: true

RSpec.shared_examples "an ActiveJob backend that records messaging span data on the consumer transaction" do
include ActiveSupport::Testing::TimeHelpers

let(:successful_job) do
job_fixture do
def perform; end
end
end

let(:configure_sentry) { proc { |config| config.traces_sample_rate = 1.0 } }

it "records messaging.message.id and messaging.destination.name on the consumer transaction" do
successful_job.set(queue: "critical").perform_later
drain

data = consumer_transaction.contexts.dig(:trace, :data)
expect(data["messaging.message.id"]).to be_a(String).and(satisfy { |v| !v.empty? })
expect(data["messaging.destination.name"]).to eq("critical")
end

it "omits messaging.message.retry.count for non-retryable jobs" do
successful_job.perform_later
drain

data = consumer_transaction.contexts.dig(:trace, :data)
expect(data).not_to have_key("messaging.message.retry.count")
end

context "when the job is retryable" do
let(:retryable_job) do
job_fixture do
retry_on StandardError, attempts: 3, wait: 0

def perform; end
end
end

it "records messaging.message.retry.count = 0 on the first execution" do
retryable_job.perform_later
drain

data = consumer_transaction.contexts.dig(:trace, :data)
expect(data["messaging.message.retry.count"]).to eq(0)
end

it "records messaging.message.retry.count across real retried executions", skip: RAILS_VERSION < 6.0 do
# Mirrors sentry-sidekiq's convention (see sentry-sidekiq's
# error_handler.rb): retry.count is the producer-side retry counter as
# observed when the consumer starts, NOT a "this is retry N" index.
# On attempt 1 ActiveJob has not yet incremented executions, so we
# report 0; on attempt 2 executions is 1 (set by the prior run), still
# max(1 - 1, 0) = 0; on attempt 3 executions is 2 → 1.
retried_job = job_fixture do
retry_on StandardError, attempts: 3, wait: 0

def perform
raise StandardError, "trigger retry" if executions < 3
end
end

retried_job.perform_later
drain

consumer_txns = transactions.select { |t| t.contexts.dig(:trace, :op) == "queue.active_job" }
retry_counts = consumer_txns.map { |t| t.contexts.dig(:trace, :data, "messaging.message.retry.count") }
expect(retry_counts).to eq([0, 0, 1])
end
end

it "records messaging.message.receive.latency in milliseconds", skip: RAILS_VERSION < 6.1 do
successful_job.perform_later

# Older Rails versions truncate Time.now to whole seconds inside `travel`
# (no `with_usec:` option until 7.0+), so the measured latency can be up
# to ~999ms below the travel delta. Widen the tolerance accordingly.
if RAILS_VERSION > 7.0
travel(5.seconds, with_usec: true) { drain }
tolerance = 50
else
travel(5.seconds) { drain }
tolerance = 1100
end

latency = consumer_transaction.contexts.dig(:trace, :data, "messaging.message.receive.latency")
expect(latency).to be_a(Integer)
expect(latency).to be_within(tolerance).of(5_000)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# frozen_string_literal: true

RSpec.shared_examples "an ActiveJob backend that emits a producer span on enqueue" do
let(:successful_job) do
job_fixture do
def perform; end
end
end

context "with traces_sample_rate = 1.0" do
let(:configure_sentry) { proc { |config| config.traces_sample_rate = 1.0 } }

it "adds a queue.publish child span to the active parent transaction" do
within_parent_transaction do
successful_job.set(queue: "events").perform_later
end

parent = transactions.find { |t| t.contexts.dig(:trace, :op) == "test" }
expect(parent).not_to be_nil

publish_span = parent.spans.find { |s| s[:op] == "queue.publish" }
expect(publish_span).not_to be_nil
expect(publish_span[:description]).to eq(successful_job.name)
expect(publish_span[:origin]).to eq("auto.queue.active_job")
expect(publish_span[:data]["messaging.message.id"]).to be_a(String).and(satisfy { |v| !v.empty? })
expect(publish_span[:data]["messaging.destination.name"]).to eq("events")
expect(publish_span[:timestamp]).not_to be_nil
end

it "does not raise or capture an orphan span when no parent transaction is active" do
expect { successful_job.perform_later }.not_to raise_error

orphan_publish = transactions.flat_map(&:spans).find { |s| s[:op] == "queue.publish" }
expect(orphan_publish).to be_nil
end
end

context "with traces_sample_rate = 0" do
let(:configure_sentry) { proc { |config| config.traces_sample_rate = 0 } }

it "does not capture a queue.publish span" do
within_parent_transaction do
successful_job.perform_later
end

publish_spans = transactions.flat_map(&:spans).select { |s| s[:op] == "queue.publish" }
expect(publish_spans).to be_empty
end
end
end
Loading
Loading