From 5c20a0eb321df9843685e29e5c03af386d99d2fa Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Tue, 14 Apr 2026 12:48:36 +0200 Subject: [PATCH 01/12] feat(celery): Support span streaming --- sentry_sdk/integrations/celery/__init__.py | 204 ++++++++++++++------- 1 file changed, 140 insertions(+), 64 deletions(-) diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index b8b1691979..abc707ed82 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -14,11 +14,11 @@ ) from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch from sentry_sdk.integrations.logging import ignore_logger +from sentry_sdk.traces import StreamedSpan from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, Span, TransactionSource -from sentry_sdk.tracing_utils import Baggage +from sentry_sdk.tracing_utils import Baggage, has_span_streaming_enabled from sentry_sdk.utils import ( capture_internal_exceptions, - ensure_integration_enabled, event_from_exception, reraise, ) @@ -162,7 +162,9 @@ def event_processor(event: "Event", hint: "Hint") -> "Optional[Event]": def _update_celery_task_headers( - original_headers: "dict[str, Any]", span: "Optional[Span]", monitor_beat_tasks: bool + original_headers: "dict[str, Any]", + span: "Optional[Union[StreamedSpan, Span]]", + monitor_beat_tasks: bool, ) -> "dict[str, Any]": """ Updates the headers of the Celery task with the tracing information @@ -255,7 +257,8 @@ def _wrap_task_run(f: "F") -> "F": def apply_async(*args: "Any", **kwargs: "Any") -> "Any": # Note: kwargs can contain headers=None, so no setdefault! # Unsure which backend though. - integration = sentry_sdk.get_client().get_integration(CeleryIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(CeleryIntegration) if integration is None: return f(*args, **kwargs) @@ -274,17 +277,28 @@ def apply_async(*args: "Any", **kwargs: "Any") -> "Any": else: task_name = "" + span_streaming = has_span_streaming_enabled(client.options) + task_started_from_beat = sentry_sdk.get_isolation_scope()._name == "celery-beat" - span_mgr: "Union[Span, NoOpMgr]" = ( - sentry_sdk.start_span( - op=OP.QUEUE_SUBMIT_CELERY, - name=task_name, - origin=CeleryIntegration.origin, - ) - if not task_started_from_beat - else NoOpMgr() - ) + span_mgr: "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr() + if span_streaming: + if not task_started_from_beat: + span_mgr = sentry_sdk.traces.start_span( + name=task_name, + attributes={ + "sentry.op": OP.QUEUE_SUBMIT_CELERY, + "sentry.origin": CeleryIntegration.origin, + }, + ) + + else: + if not task_started_from_beat: + span_mgr = sentry_sdk.start_span( + op=OP.QUEUE_SUBMIT_CELERY, + name=task_name, + origin=CeleryIntegration.origin, + ) with span_mgr as span: kwargs["headers"] = _update_celery_task_headers( @@ -303,50 +317,73 @@ def _wrap_tracer(task: "Any", f: "F") -> "F": # Also because in Celery 3, signal dispatch returns early if one handler # crashes. @wraps(f) - @ensure_integration_enabled(CeleryIntegration, f) def _inner(*args: "Any", **kwargs: "Any") -> "Any": + client = sentry_sdk.get_client() + if client.get_integration(CeleryIntegration) is None: + return f(*args, **kwargs) + + span_streaming = has_span_streaming_enabled(client.options) + with isolation_scope() as scope: scope._name = "celery" scope.clear_breadcrumbs() scope.add_event_processor(_make_event_processor(task, *args, **kwargs)) - transaction = None + transaction: "Optional[Union[Span, StreamedSpan]]" = None + span_ctx: "Optional[Union[Span, StreamedSpan]]" = None # Celery task objects are not a thing to be trusted. Even # something such as attribute access can fail. with capture_internal_exceptions(): headers = args[3].get("headers") or {} - transaction = continue_trace( - headers, - op=OP.QUEUE_TASK_CELERY, - name="unknown celery task", - source=TransactionSource.TASK, - origin=CeleryIntegration.origin, - ) - transaction.name = task.name - transaction.set_status(SPANSTATUS.OK) + if span_streaming: + sentry_sdk.traces.continue_trace(headers) + transaction = sentry_sdk.traces.start_span( + name=task.name, + attributes={ + "sentry.origin": CeleryIntegration.origin, + "sentry.span.source": TransactionSource.TASK.value, + "sentry.op": OP.QUEUE_TASK_CELERY, + }, + ) - if transaction is None: + span_ctx = transaction + + else: + transaction = continue_trace( + headers, + op=OP.QUEUE_TASK_CELERY, + name=task.name, + source=TransactionSource.TASK, + origin=CeleryIntegration.origin, + ) + transaction.set_status(SPANSTATUS.OK) + + span_ctx = sentry_sdk.start_transaction( + transaction, + custom_sampling_context={ + "celery_job": { + "task": task.name, + # for some reason, args[1] is a list if non-empty but a + # tuple if empty + "args": list(args[1]), + "kwargs": args[2], + } + }, + ) + + if transaction is None or span_ctx is None: return f(*args, **kwargs) - with sentry_sdk.start_transaction( - transaction, - custom_sampling_context={ - "celery_job": { - "task": task.name, - # for some reason, args[1] is a list if non-empty but a - # tuple if empty - "args": list(args[1]), - "kwargs": args[2], - } - }, - ): + with span_ctx: return f(*args, **kwargs) return _inner # type: ignore -def _set_messaging_destination_name(task: "Any", span: "Span") -> None: +def _set_messaging_destination_name( + task: "Any", span: "Union[StreamedSpan, Span]" +) -> None: """Set "messaging.destination.name" tag for span""" with capture_internal_exceptions(): delivery_info = task.request.delivery_info @@ -355,26 +392,43 @@ def _set_messaging_destination_name(task: "Any", span: "Span") -> None: if delivery_info.get("exchange") == "" and routing_key is not None: # Empty exchange indicates the default exchange, meaning the tasks # are sent to the queue with the same name as the routing key. - span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) + if isinstance(span, StreamedSpan): + span.set_attribute(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) + else: + span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) def _wrap_task_call(task: "Any", f: "F") -> "F": # Need to wrap task call because the exception is caught before we get to # see it. Also celery's reported stacktrace is untrustworthy. - # functools.wraps is important here because celery-once looks at this - # method's name. @ensure_integration_enabled internally calls functools.wraps, - # but if we ever remove the @ensure_integration_enabled decorator, we need - # to add @functools.wraps(f) here. - # https://github.com/getsentry/sentry-python/issues/421 - @ensure_integration_enabled(CeleryIntegration, f) + @wraps(f) def _inner(*args: "Any", **kwargs: "Any") -> "Any": + client = sentry_sdk.get_client() + if client.get_integration(CeleryIntegration) is None: + return f(*args, **kwargs) + + span_streaming = has_span_streaming_enabled(client.options) + try: - with sentry_sdk.start_span( - op=OP.QUEUE_PROCESS, - name=task.name, - origin=CeleryIntegration.origin, - ) as span: + span: "Union[Span, StreamedSpan]" + if span_streaming: + span = sentry_sdk.traces.start_span(name=task.name) + span.set_attribute("sentry.op", OP.QUEUE_PROCESS) + span.set_attribute("sentry.origin", CeleryIntegration.origin) + else: + span = sentry_sdk.start_span( + op=OP.QUEUE_PROCESS, + name=task.name, + origin=CeleryIntegration.origin, + ) + + with span: + if isinstance(span, StreamedSpan): + set_on_span = span.set_attribute + else: + set_on_span = span.set_data + _set_messaging_destination_name(task, span) latency = None @@ -389,19 +443,19 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": if latency is not None: latency *= 1000 # milliseconds - span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency) + set_on_span(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency) with capture_internal_exceptions(): - span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id) + set_on_span(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id) with capture_internal_exceptions(): - span.set_data( + set_on_span( SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries ) with capture_internal_exceptions(): with task.app.connection() as conn: - span.set_data( + set_on_span( SPANDATA.MESSAGING_SYSTEM, conn.transport.driver_type, ) @@ -476,8 +530,13 @@ def sentry_workloop(*args: "Any", **kwargs: "Any") -> "Any": def _patch_producer_publish() -> None: original_publish = Producer.publish - @ensure_integration_enabled(CeleryIntegration, original_publish) def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any": + client = sentry_sdk.get_client() + if client.get_integration(CeleryIntegration) is None: + return original_publish(self, *args, **kwargs) + + span_streaming = has_span_streaming_enabled(client.options) + kwargs_headers = kwargs.get("headers", {}) if not isinstance(kwargs_headers, Mapping): # Ensure kwargs_headers is a Mapping, so we can safely call get(). @@ -487,6 +546,10 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any": # method will still work. kwargs_headers = {} + if "task" not in kwargs_headers: + # filter out heartbeat and other internal Celery events + return original_publish(self, *args, **kwargs) + task_name = kwargs_headers.get("task") task_id = kwargs_headers.get("id") retries = kwargs_headers.get("retries") @@ -494,24 +557,37 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any": routing_key = kwargs.get("routing_key") exchange = kwargs.get("exchange") - with sentry_sdk.start_span( - op=OP.QUEUE_PUBLISH, - name=task_name, - origin=CeleryIntegration.origin, - ) as span: + span: "Union[StreamedSpan, Span]" + if span_streaming: + span = sentry_sdk.traces.start_span(name=task_name) + span.set_attribute("sentry.op", OP.QUEUE_PUBLISH) + span.set_attribute("sentry.origin", CeleryIntegration.origin) + else: + span = sentry_sdk.start_span( + op=OP.QUEUE_PUBLISH, + name=task_name, + origin=CeleryIntegration.origin, + ) + + with span: + if isinstance(span, StreamedSpan): + set_on_span = span.set_attribute + else: + set_on_span = span.set_data + if task_id is not None: - span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id) + set_on_span(SPANDATA.MESSAGING_MESSAGE_ID, task_id) if exchange == "" and routing_key is not None: # Empty exchange indicates the default exchange, meaning messages are # routed to the queue with the same name as the routing key. - span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) + set_on_span(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) if retries is not None: - span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries) + set_on_span(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries) with capture_internal_exceptions(): - span.set_data( + set_on_span( SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type ) From 943c12b2c3954aeca6fbeaa35c0d3e784848a952 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 15 Apr 2026 10:36:27 +0200 Subject: [PATCH 02/12] tests --- sentry_sdk/integrations/celery/__init__.py | 51 +- tests/integrations/celery/test_celery.py | 549 +++++++++++++++------ 2 files changed, 424 insertions(+), 176 deletions(-) diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index abc707ed82..c076ace720 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -283,7 +283,7 @@ def apply_async(*args: "Any", **kwargs: "Any") -> "Any": span_mgr: "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr() if span_streaming: - if not task_started_from_beat: + if not task_started_from_beat and sentry_sdk.get_current_span() is not None: span_mgr = sentry_sdk.traces.start_span( name=task_name, attributes={ @@ -329,16 +329,29 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": scope.clear_breadcrumbs() scope.add_event_processor(_make_event_processor(task, *args, **kwargs)) - transaction: "Optional[Union[Span, StreamedSpan]]" = None + span: "Optional[Union[Span, StreamedSpan]]" = None span_ctx: "Optional[Union[Span, StreamedSpan]]" = None + custom_sampling_context = { + "celery_job": { + "task": task.name, + # for some reason, args[1] is a list if non-empty but a + # tuple if empty + "args": list(args[1]), + "kwargs": args[2], + } + } + + scope.set_custom_sampling_context(custom_sampling_context) + scope.set_transaction_name(task.name, source=TransactionSource.TASK) + # Celery task objects are not a thing to be trusted. Even # something such as attribute access can fail. with capture_internal_exceptions(): headers = args[3].get("headers") or {} if span_streaming: sentry_sdk.traces.continue_trace(headers) - transaction = sentry_sdk.traces.start_span( + span = sentry_sdk.traces.start_span( name=task.name, attributes={ "sentry.origin": CeleryIntegration.origin, @@ -347,32 +360,24 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": }, ) - span_ctx = transaction + span_ctx = span else: - transaction = continue_trace( + span = continue_trace( headers, op=OP.QUEUE_TASK_CELERY, name=task.name, source=TransactionSource.TASK, origin=CeleryIntegration.origin, ) - transaction.set_status(SPANSTATUS.OK) + span.set_status(SPANSTATUS.OK) span_ctx = sentry_sdk.start_transaction( - transaction, - custom_sampling_context={ - "celery_job": { - "task": task.name, - # for some reason, args[1] is a list if non-empty but a - # tuple if empty - "args": list(args[1]), - "kwargs": args[2], - } - }, + span, + custom_sampling_context=custom_sampling_context, ) - if transaction is None or span_ctx is None: + if span is None or span_ctx is None: return f(*args, **kwargs) with span_ctx: @@ -557,11 +562,12 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any": routing_key = kwargs.get("routing_key") exchange = kwargs.get("exchange") - span: "Union[StreamedSpan, Span]" + span: "Union[StreamedSpan, Span, None]" = None if span_streaming: - span = sentry_sdk.traces.start_span(name=task_name) - span.set_attribute("sentry.op", OP.QUEUE_PUBLISH) - span.set_attribute("sentry.origin", CeleryIntegration.origin) + if sentry_sdk.get_current_span() is not None: + span = sentry_sdk.traces.start_span(name=task_name) + span.set_attribute("sentry.op", OP.QUEUE_PUBLISH) + span.set_attribute("sentry.origin", CeleryIntegration.origin) else: span = sentry_sdk.start_span( op=OP.QUEUE_PUBLISH, @@ -569,6 +575,9 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any": origin=CeleryIntegration.origin, ) + if span is None: + return original_publish(self, *args, **kwargs) + with span: if isinstance(span, StreamedSpan): set_on_span = span.set_attribute diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index 42ae6ea14f..97a0077ece 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -117,23 +117,45 @@ def celery_invocation(request): return request.param -def test_simple_with_performance(capture_events, init_celery, celery_invocation): - celery = init_celery(traces_sample_rate=1.0) - events = capture_events() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_simple_with_performance( + capture_events, capture_items, init_celery, celery_invocation, span_streaming +): + celery = init_celery( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task(name="dummy_task") def dummy_task(x, y): foo = 42 # noqa return x / y - with start_transaction(op="unit test transaction") as transaction: - celery_invocation(dummy_task, 1, 2) - _, expected_context = celery_invocation(dummy_task, 1, 0) + if span_streaming: + items = capture_items("event", "span") + + with sentry_sdk.traces.start_span(name="span") as span: + celery_invocation(dummy_task, 1, 2) + _, expected_context = celery_invocation(dummy_task, 1, 0) + + sentry_sdk.flush() + + error_event = next(item.payload for item in items if item.type == "event") + + assert error_event["contexts"]["trace"]["trace_id"] == span.trace_id + assert error_event["contexts"]["trace"]["span_id"] != span.span_id + else: + events = capture_events() - (_, error_event, _, _) = events + with start_transaction(op="unit test transaction") as transaction: + celery_invocation(dummy_task, 1, 2) + _, expected_context = celery_invocation(dummy_task, 1, 0) + + (_, error_event, _, _) = events + + assert error_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert error_event["contexts"]["trace"]["span_id"] != transaction.span_id - assert error_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert error_event["contexts"]["trace"]["span_id"] != transaction.span_id assert error_event["transaction"] == "dummy_task" assert "celery_task_id" in error_event["tags"] assert error_event["extra"]["celery-job"] == dict( @@ -182,9 +204,20 @@ def dummy_task(x, y): assert exception["stacktrace"]["frames"][0]["vars"]["foo"] == "42" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("task_fails", [True, False], ids=["error", "success"]) -def test_transaction_events(capture_events, init_celery, celery_invocation, task_fails): - celery = init_celery(traces_sample_rate=1.0) +def test_transaction_events( + capture_events, + capture_items, + init_celery, + celery_invocation, + task_fails, + span_streaming, +): + celery = init_celery( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task(name="dummy_task") def dummy_task(x, y): @@ -192,58 +225,94 @@ def dummy_task(x, y): # XXX: For some reason the first call does not get instrumented properly. celery_invocation(dummy_task, 1, 1) + sentry_sdk.flush() - events = capture_events() + if span_streaming: + items = capture_items("event", "span") - with start_transaction(name="submission") as transaction: - celery_invocation(dummy_task, 1, 0 if task_fails else 1) + with sentry_sdk.traces.start_span(name="submission") as span: + celery_invocation(dummy_task, 1, 0 if task_fails else 1) - if task_fails: - error_event = events.pop(0) - assert error_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" + sentry_sdk.flush() - execution_event, submission_event = events - assert execution_event["transaction"] == "dummy_task" - assert execution_event["transaction_info"] == {"source": "task"} + if task_fails: + error_event = items.pop(0).payload + assert error_event["contexts"]["trace"]["trace_id"] == span.trace_id + assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" - assert submission_event["transaction"] == "submission" - assert submission_event["transaction_info"] == {"source": "custom"} + span_items = [item.payload for item in items] + process_span, execution_span, submit_span, submission_span = span_items - assert execution_event["type"] == submission_event["type"] == "transaction" - assert execution_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert submission_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert execution_span["name"] == "dummy_task" + assert execution_span["attributes"]["sentry.span.source"] == "task" + assert execution_span["trace_id"] == span.trace_id + + assert submit_span["name"] == "dummy_task" + assert submit_span["trace_id"] == span.trace_id + assert submit_span["attributes"]["sentry.origin"] == "auto.queue.celery" + assert submit_span["parent_span_id"] == span.span_id + + if task_fails: + assert execution_span["status"] == "error" + else: + assert execution_span["status"] == "ok" + + assert process_span["name"] == "dummy_task" + assert process_span["trace_id"] == span.trace_id + assert process_span["attributes"]["sentry.op"] == "queue.process" - if task_fails: - assert execution_event["contexts"]["trace"]["status"] == "internal_error" else: - assert execution_event["contexts"]["trace"]["status"] == "ok" + events = capture_events() - assert len(execution_event["spans"]) == 1 - assert ( - execution_event["spans"][0].items() - >= { - "trace_id": str(transaction.trace_id), - "same_process_as_parent": True, - "op": "queue.process", - "description": "dummy_task", - "data": ApproxDict(), - }.items() - ) - assert submission_event["spans"] == [ - { - "data": ApproxDict(), - "description": "dummy_task", - "op": "queue.submit.celery", - "origin": "auto.queue.celery", - "parent_span_id": submission_event["contexts"]["trace"]["span_id"], - "same_process_as_parent": True, - "span_id": submission_event["spans"][0]["span_id"], - "start_timestamp": submission_event["spans"][0]["start_timestamp"], - "timestamp": submission_event["spans"][0]["timestamp"], - "trace_id": str(transaction.trace_id), - } - ] + with start_transaction(name="submission") as transaction: + celery_invocation(dummy_task, 1, 0 if task_fails else 1) + + if task_fails: + error_event = events.pop(0) + assert error_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" + + execution_event, submission_event = events + assert execution_event["transaction"] == "dummy_task" + assert execution_event["transaction_info"] == {"source": "task"} + + assert submission_event["transaction"] == "submission" + assert submission_event["transaction_info"] == {"source": "custom"} + + assert execution_event["type"] == submission_event["type"] == "transaction" + assert execution_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert submission_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + + if task_fails: + assert execution_event["contexts"]["trace"]["status"] == "internal_error" + else: + assert execution_event["contexts"]["trace"]["status"] == "ok" + + assert len(execution_event["spans"]) == 1 + assert ( + execution_event["spans"][0].items() + >= { + "trace_id": str(transaction.trace_id), + "same_process_as_parent": True, + "op": "queue.process", + "description": "dummy_task", + "data": ApproxDict(), + }.items() + ) + assert submission_event["spans"] == [ + { + "data": ApproxDict(), + "description": "dummy_task", + "op": "queue.submit.celery", + "origin": "auto.queue.celery", + "parent_span_id": submission_event["contexts"]["trace"]["span_id"], + "same_process_as_parent": True, + "span_id": submission_event["spans"][0]["span_id"], + "start_timestamp": submission_event["spans"][0]["start_timestamp"], + "timestamp": submission_event["spans"][0]["timestamp"], + "trace_id": str(transaction.trace_id), + } + ] def test_no_double_patching(celery): @@ -601,28 +670,47 @@ def example_task(): assert result.get() == "success" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("routing_key", ("celery", "custom")) @mock.patch("celery.app.task.Task.request") def test_messaging_destination_name_default_exchange( - mock_request, routing_key, init_celery, capture_events + mock_request, + routing_key, + span_streaming, + init_celery, + capture_events, + capture_items, ): - celery_app = init_celery(enable_tracing=True) - events = capture_events() + celery_app = init_celery( + enable_tracing=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) mock_request.delivery_info = {"routing_key": routing_key, "exchange": ""} @celery_app.task() def task(): ... - task.apply_async() - - (event,) = events - (span,) = event["spans"] - assert span["data"]["messaging.destination.name"] == routing_key + if span_streaming: + items = capture_items("span") + task.apply_async() + sentry_sdk.flush() + process_span, _execution_span = items + assert ( + process_span.payload["attributes"]["messaging.destination.name"] + == routing_key + ) + else: + events = capture_events() + task.apply_async() + (event,) = events + (span,) = event["spans"] + assert span["data"]["messaging.destination.name"] == routing_key +@pytest.mark.parametrize("span_streaming", [True, False]) @mock.patch("celery.app.task.Task.request") def test_messaging_destination_name_nondefault_exchange( - mock_request, init_celery, capture_events + mock_request, span_streaming, init_celery, capture_events, capture_items ): """ Currently, we only capture the routing key as the messaging.destination.name when @@ -630,69 +718,115 @@ def test_messaging_destination_name_nondefault_exchange( that the routing key is the queue name. Other exchanges may not guarantee this behavior. """ - celery_app = init_celery(enable_tracing=True) - events = capture_events() + celery_app = init_celery( + enable_tracing=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) mock_request.delivery_info = {"routing_key": "celery", "exchange": "custom"} @celery_app.task() def task(): ... - task.apply_async() - - (event,) = events - (span,) = event["spans"] - assert "messaging.destination.name" not in span["data"] + if span_streaming: + items = capture_items("span") + task.apply_async() + sentry_sdk.flush() + process_span, _execution_span = items + assert "messaging.destination.name" not in process_span.payload["attributes"] + else: + events = capture_events() + task.apply_async() + (event,) = events + (span,) = event["spans"] + assert "messaging.destination.name" not in span["data"] -def test_messaging_id(init_celery, capture_events): - celery = init_celery(enable_tracing=True) - events = capture_events() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_messaging_id(span_streaming, init_celery, capture_events, capture_items): + celery = init_celery( + enable_tracing=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task def example_task(): ... - example_task.apply_async() - - (event,) = events - (span,) = event["spans"] - assert "messaging.message.id" in span["data"] + if span_streaming: + items = capture_items("span") + example_task.apply_async() + sentry_sdk.flush() + process_span, _execution_span = items + assert "messaging.message.id" in process_span.payload["attributes"] + else: + events = capture_events() + example_task.apply_async() + (event,) = events + (span,) = event["spans"] + assert "messaging.message.id" in span["data"] -def test_retry_count_zero(init_celery, capture_events): - celery = init_celery(enable_tracing=True) - events = capture_events() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_retry_count_zero(span_streaming, init_celery, capture_events, capture_items): + celery = init_celery( + enable_tracing=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task() def task(): ... - task.apply_async() - - (event,) = events - (span,) = event["spans"] - assert span["data"]["messaging.message.retry.count"] == 0 + if span_streaming: + items = capture_items("span") + task.apply_async() + sentry_sdk.flush() + process_span, _execution_span = items + assert process_span.payload["attributes"]["messaging.message.retry.count"] == 0 + else: + events = capture_events() + task.apply_async() + (event,) = events + (span,) = event["spans"] + assert span["data"]["messaging.message.retry.count"] == 0 +@pytest.mark.parametrize("span_streaming", [True, False]) @mock.patch("celery.app.task.Task.request") -def test_retry_count_nonzero(mock_request, init_celery, capture_events): +def test_retry_count_nonzero( + mock_request, span_streaming, init_celery, capture_events, capture_items +): mock_request.retries = 3 - celery = init_celery(enable_tracing=True) - events = capture_events() + celery = init_celery( + enable_tracing=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task() def task(): ... - task.apply_async() - - (event,) = events - (span,) = event["spans"] - assert span["data"]["messaging.message.retry.count"] == 3 + if span_streaming: + items = capture_items("span") + task.apply_async() + sentry_sdk.flush() + process_span, _execution_span = items + assert process_span.payload["attributes"]["messaging.message.retry.count"] == 3 + else: + events = capture_events() + task.apply_async() + (event,) = events + (span,) = event["spans"] + assert span["data"]["messaging.message.retry.count"] == 3 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("system", ("redis", "amqp")) -def test_messaging_system(system, init_celery, capture_events): - celery = init_celery(enable_tracing=True) - events = capture_events() +def test_messaging_system( + system, span_streaming, init_celery, capture_events, capture_items +): + celery = init_celery( + enable_tracing=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) # Does not need to be a real URL, since we use always eager celery.conf.broker_url = f"{system}://example.com" # noqa: E231 @@ -700,15 +834,25 @@ def test_messaging_system(system, init_celery, capture_events): @celery.task() def task(): ... - task.apply_async() - - (event,) = events - (span,) = event["spans"] - assert span["data"]["messaging.system"] == system + if span_streaming: + items = capture_items("span") + task.apply_async() + sentry_sdk.flush() + process_span, _execution_span = items + assert process_span.payload["attributes"]["messaging.system"] == system + else: + events = capture_events() + task.apply_async() + (event,) = events + (span,) = event["spans"] + assert span["data"]["messaging.system"] == system +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("system", ("amqp", "redis")) -def test_producer_span_data(system, monkeypatch, sentry_init, capture_events): +def test_producer_span_data( + system, span_streaming, monkeypatch, sentry_init, capture_events, capture_items +): old_publish = kombu.messaging.Producer._publish def publish(*args, **kwargs): @@ -716,61 +860,114 @@ def publish(*args, **kwargs): monkeypatch.setattr(kombu.messaging.Producer, "_publish", publish) - sentry_init(integrations=[CeleryIntegration()], enable_tracing=True) + sentry_init( + integrations=[CeleryIntegration()], + enable_tracing=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) celery = Celery(__name__, broker=f"{system}://example.com") # noqa: E231 - events = capture_events() @celery.task() def task(): ... - with start_transaction(): - task.apply_async() + if span_streaming: + items = capture_items("span") - (event,) = events - span = next(span for span in event["spans"] if span["op"] == "queue.publish") + with sentry_sdk.traces.start_span(name="producer test"): + task.apply_async() - assert span["data"]["messaging.system"] == system + sentry_sdk.flush() - assert span["data"]["messaging.destination.name"] == "celery" - assert "messaging.message.id" in span["data"] - assert span["data"]["messaging.message.retry.count"] == 0 + span_items = [item.payload for item in items] + publish_span = next( + s for s in span_items if s["attributes"].get("sentry.op") == "queue.publish" + ) + + assert publish_span["attributes"]["messaging.system"] == system + assert publish_span["attributes"]["messaging.destination.name"] == "celery" + assert "messaging.message.id" in publish_span["attributes"] + assert publish_span["attributes"]["messaging.message.retry.count"] == 0 + else: + events = capture_events() + + with start_transaction(): + task.apply_async() + + (event,) = events + span = next(span for span in event["spans"] if span["op"] == "queue.publish") + + assert span["data"]["messaging.system"] == system + assert span["data"]["messaging.destination.name"] == "celery" + assert "messaging.message.id" in span["data"] + assert span["data"]["messaging.message.retry.count"] == 0 monkeypatch.setattr(kombu.messaging.Producer, "_publish", old_publish) -def test_receive_latency(init_celery, capture_events): - celery = init_celery(traces_sample_rate=1.0) - events = capture_events() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_receive_latency(span_streaming, init_celery, capture_events, capture_items): + celery = init_celery( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task() def task(): ... - task.apply_async() - - (event,) = events - (span,) = event["spans"] - assert "messaging.message.receive.latency" in span["data"] - assert span["data"]["messaging.message.receive.latency"] > 0 + if span_streaming: + items = capture_items("span") + task.apply_async() + sentry_sdk.flush() + process_span, _execution_span = items + assert "messaging.message.receive.latency" in process_span.payload["attributes"] + assert ( + process_span.payload["attributes"]["messaging.message.receive.latency"] > 0 + ) + else: + events = capture_events() + task.apply_async() + (event,) = events + (span,) = event["spans"] + assert "messaging.message.receive.latency" in span["data"] + assert span["data"]["messaging.message.receive.latency"] > 0 -def tests_span_origin_consumer(init_celery, capture_events): - celery = init_celery(enable_tracing=True) +@pytest.mark.parametrize("span_streaming", [True, False]) +def tests_span_origin_consumer( + span_streaming, init_celery, capture_events, capture_items +): + celery = init_celery( + enable_tracing=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) celery.conf.broker_url = "redis://example.com" # noqa: E231 - events = capture_events() - @celery.task() def task(): ... - task.apply_async() - - (event,) = events - - assert event["contexts"]["trace"]["origin"] == "auto.queue.celery" - assert event["spans"][0]["origin"] == "auto.queue.celery" + if span_streaming: + items = capture_items("span") + task.apply_async() + sentry_sdk.flush() + process_span, execution_span = items + assert ( + execution_span.payload["attributes"]["sentry.origin"] == "auto.queue.celery" + ) + assert ( + process_span.payload["attributes"]["sentry.origin"] == "auto.queue.celery" + ) + else: + events = capture_events() + task.apply_async() + (event,) = events + assert event["contexts"]["trace"]["origin"] == "auto.queue.celery" + assert event["spans"][0]["origin"] == "auto.queue.celery" -def tests_span_origin_producer(monkeypatch, sentry_init, capture_events): +@pytest.mark.parametrize("span_streaming", [True, False]) +def tests_span_origin_producer( + span_streaming, monkeypatch, sentry_init, capture_events, capture_items +): old_publish = kombu.messaging.Producer._publish def publish(*args, **kwargs): @@ -778,42 +975,76 @@ def publish(*args, **kwargs): monkeypatch.setattr(kombu.messaging.Producer, "_publish", publish) - sentry_init(integrations=[CeleryIntegration()], enable_tracing=True) + sentry_init( + integrations=[CeleryIntegration()], + enable_tracing=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) celery = Celery(__name__, broker="redis://example.com") # noqa: E231 - events = capture_events() - @celery.task() def task(): ... - with start_transaction(name="custom_transaction"): - task.apply_async() + if span_streaming: + items = capture_items("span") - (event,) = events + with sentry_sdk.traces.start_span(name="custom parent"): + task.apply_async() + + sentry_sdk.flush() + + for item in items: + if item.payload["name"] != "custom parent": + assert ( + item.payload["attributes"]["sentry.origin"] == "auto.queue.celery" + ) + else: + events = capture_events() - assert event["contexts"]["trace"]["origin"] == "manual" + with start_transaction(name="custom_transaction"): + task.apply_async() - for span in event["spans"]: - assert span["origin"] == "auto.queue.celery" + (event,) = events + + assert event["contexts"]["trace"]["origin"] == "manual" + + for span in event["spans"]: + assert span["origin"] == "auto.queue.celery" monkeypatch.setattr(kombu.messaging.Producer, "_publish", old_publish) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.forked @mock.patch("celery.Celery.send_task") def test_send_task_wrapped( patched_send_task, + span_streaming, sentry_init, capture_events, + capture_items, reset_integrations, ): - sentry_init(integrations=[CeleryIntegration()], enable_tracing=True) + sentry_init( + integrations=[CeleryIntegration()], + enable_tracing=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) celery = Celery(__name__, broker="redis://example.com") # noqa: E231 - events = capture_events() - - with sentry_sdk.start_transaction(name="custom_transaction"): - celery.send_task("very_creative_task_name", args=(1, 2), kwargs={"foo": "bar"}) + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="custom parent") as outer_span: + celery.send_task( + "very_creative_task_name", args=(1, 2), kwargs={"foo": "bar"} + ) + sentry_sdk.flush() + else: + events = capture_events() + with sentry_sdk.start_transaction(name="custom_transaction"): + celery.send_task( + "very_creative_task_name", args=(1, 2), kwargs={"foo": "bar"} + ) (call,) = patched_send_task.call_args_list # We should have exactly one call (args, kwargs) = call @@ -837,14 +1068,22 @@ def test_send_task_wrapped( == kwargs["headers"]["headers"]["sentry-trace"] ) - (event,) = events # We should have exactly one event (the transaction) - assert event["type"] == "transaction" - assert event["transaction"] == "custom_transaction" - - (span,) = event["spans"] # We should have exactly one span - assert span["description"] == "very_creative_task_name" - assert span["op"] == "queue.submit.celery" - assert span["trace_id"] == kwargs["headers"]["sentry-trace"].split("-")[0] + if span_streaming: + submit_span, outer = [item.payload for item in items] + assert outer["name"] == "custom parent" + assert outer["is_segment"] is True + assert submit_span["name"] == "very_creative_task_name" + assert submit_span["attributes"]["sentry.op"] == "queue.submit.celery" + assert submit_span["trace_id"] == outer_span.trace_id + else: + (event,) = events + assert event["type"] == "transaction" + assert event["transaction"] == "custom_transaction" + + (span,) = event["spans"] + assert span["description"] == "very_creative_task_name" + assert span["op"] == "queue.submit.celery" + assert span["trace_id"] == kwargs["headers"]["sentry-trace"].split("-")[0] def test_user_custom_headers_accessible_in_task(init_celery): From 2a127151fe3eea321efd73baf3aba1fd2585358d Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 15 Apr 2026 11:55:55 +0200 Subject: [PATCH 03/12] reformat --- sentry_sdk/integrations/celery/__init__.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index c076ace720..244681da4f 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -418,9 +418,13 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": try: span: "Union[Span, StreamedSpan]" if span_streaming: - span = sentry_sdk.traces.start_span(name=task.name) - span.set_attribute("sentry.op", OP.QUEUE_PROCESS) - span.set_attribute("sentry.origin", CeleryIntegration.origin) + span = sentry_sdk.traces.start_span( + name=task.name, + attributes={ + "sentry.op": OP.QUEUE_PROCESS, + "sentry.origin": CeleryIntegration.origin, + }, + ) else: span = sentry_sdk.start_span( op=OP.QUEUE_PROCESS, @@ -565,9 +569,13 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any": span: "Union[StreamedSpan, Span, None]" = None if span_streaming: if sentry_sdk.get_current_span() is not None: - span = sentry_sdk.traces.start_span(name=task_name) - span.set_attribute("sentry.op", OP.QUEUE_PUBLISH) - span.set_attribute("sentry.origin", CeleryIntegration.origin) + span = sentry_sdk.traces.start_span( + name=task_name, + attributes={ + "sentry.op": OP.QUEUE_PUBLISH, + "sentry.origin": CeleryIntegration.origin, + }, + ) else: span = sentry_sdk.start_span( op=OP.QUEUE_PUBLISH, From c2d1ced70ae93ff310b67f2e9e224a0fa9a69ae6 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 15 Apr 2026 11:57:29 +0200 Subject: [PATCH 04/12] . --- sentry_sdk/integrations/celery/__init__.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index 244681da4f..22be9fcf14 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -12,7 +12,7 @@ _patch_redbeat_apply_async, _setup_celery_beat_signals, ) -from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch +from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch, NoOpMgr from sentry_sdk.integrations.logging import ignore_logger from sentry_sdk.traces import StreamedSpan from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, Span, TransactionSource @@ -244,14 +244,6 @@ def _update_celery_task_headers( return updated_headers -class NoOpMgr: - def __enter__(self) -> None: - return None - - def __exit__(self, exc_type: "Any", exc_value: "Any", traceback: "Any") -> None: - return None - - def _wrap_task_run(f: "F") -> "F": @wraps(f) def apply_async(*args: "Any", **kwargs: "Any") -> "Any": From b8fefb23e49f0e14f498d88744874a41ea96ea41 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 15 Apr 2026 14:06:36 +0200 Subject: [PATCH 05/12] small cleanup --- sentry_sdk/integrations/celery/__init__.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index 244681da4f..8e4d29df0e 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -329,9 +329,6 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": scope.clear_breadcrumbs() scope.add_event_processor(_make_event_processor(task, *args, **kwargs)) - span: "Optional[Union[Span, StreamedSpan]]" = None - span_ctx: "Optional[Union[Span, StreamedSpan]]" = None - custom_sampling_context = { "celery_job": { "task": task.name, @@ -345,6 +342,9 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": scope.set_custom_sampling_context(custom_sampling_context) scope.set_transaction_name(task.name, source=TransactionSource.TASK) + span: "Union[Span, StreamedSpan]" + span_ctx: "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr() + # Celery task objects are not a thing to be trusted. Even # something such as attribute access can fail. with capture_internal_exceptions(): @@ -377,9 +377,6 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": custom_sampling_context=custom_sampling_context, ) - if span is None or span_ctx is None: - return f(*args, **kwargs) - with span_ctx: return f(*args, **kwargs) From 0eb003be663ef124180cb3e2154b9eed06c6c388 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 15 Apr 2026 14:47:41 +0200 Subject: [PATCH 06/12] . --- sentry_sdk/integrations/celery/__init__.py | 8 +++++--- tests/integrations/celery/test_celery.py | 24 ++++++++++++++-------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index 8e4d29df0e..8f691d4f81 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -353,6 +353,7 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": sentry_sdk.traces.continue_trace(headers) span = sentry_sdk.traces.start_span( name=task.name, + parent_span=None, # make this a segment attributes={ "sentry.origin": CeleryIntegration.origin, "sentry.span.source": TransactionSource.TASK.value, @@ -552,9 +553,10 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any": # method will still work. kwargs_headers = {} - if "task" not in kwargs_headers: - # filter out heartbeat and other internal Celery events - return original_publish(self, *args, **kwargs) + # XXX[ivana]: check whether this is needed with the parent checks + # if "task" not in kwargs_headers: + # # filter out heartbeat and other internal Celery events + # return original_publish(self, *args, **kwargs) task_name = kwargs_headers.get("task") task_id = kwargs_headers.get("id") diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index b19bb08b7c..53feb9d0fc 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -240,18 +240,14 @@ def dummy_task(x, y): assert error_event["contexts"]["trace"]["trace_id"] == span.trace_id assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" - span_items = [item.payload for item in items] - process_span, execution_span, submit_span, submission_span = span_items + process_span, execution_span, submit_span, submission_span = [ + item.payload for item in items + ] assert execution_span["name"] == "dummy_task" + assert execution_span["is_segment"] is True assert execution_span["attributes"]["sentry.span.source"] == "task" assert execution_span["trace_id"] == span.trace_id - - assert submit_span["name"] == "dummy_task" - assert submit_span["trace_id"] == span.trace_id - assert submit_span["attributes"]["sentry.origin"] == "auto.queue.celery" - assert submit_span["parent_span_id"] == span.span_id - if task_fails: assert execution_span["status"] == "error" else: @@ -260,6 +256,18 @@ def dummy_task(x, y): assert process_span["name"] == "dummy_task" assert process_span["trace_id"] == span.trace_id assert process_span["attributes"]["sentry.op"] == "queue.process" + assert process_span["parent_span_id"] == execution_span["span_id"] + + assert submission_span["name"] == "submission" + assert submission_span["is_segment"] is True + + assert submit_span["name"] == "dummy_task" + assert submit_span["attributes"]["sentry.op"] == "queue.submit.celery" + assert submit_span["attributes"]["sentry.origin"] == "auto.queue.celery" + assert ( + submit_span["parent_span_id"] == submission_span["span_id"] == span.span_id + ) + assert submit_span["trace_id"] == span.trace_id else: events = capture_events() From 593871c6e140c8688cd1fef94abe8da132a31884 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 15 Apr 2026 15:24:06 +0200 Subject: [PATCH 07/12] . --- sentry_sdk/integrations/celery/__init__.py | 7 +- sentry_sdk/traces.py | 4 +- tests/integrations/celery/test_celery.py | 89 ++++++++++++++++------ 3 files changed, 69 insertions(+), 31 deletions(-) diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index 8f691d4f81..bc16ceb912 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -339,7 +339,6 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": } } - scope.set_custom_sampling_context(custom_sampling_context) scope.set_transaction_name(task.name, source=TransactionSource.TASK) span: "Union[Span, StreamedSpan]" @@ -351,6 +350,7 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": headers = args[3].get("headers") or {} if span_streaming: sentry_sdk.traces.continue_trace(headers) + scope.set_custom_sampling_context(custom_sampling_context) span = sentry_sdk.traces.start_span( name=task.name, parent_span=None, # make this a segment @@ -553,11 +553,6 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any": # method will still work. kwargs_headers = {} - # XXX[ivana]: check whether this is needed with the parent checks - # if "task" not in kwargs_headers: - # # filter out heartbeat and other internal Celery events - # return original_publish(self, *args, **kwargs) - task_name = kwargs_headers.get("task") task_id = kwargs_headers.get("id") retries = kwargs_headers.get("retries") diff --git a/sentry_sdk/traces.py b/sentry_sdk/traces.py index f44ef71f5b..a9b0070034 100644 --- a/sentry_sdk/traces.py +++ b/sentry_sdk/traces.py @@ -258,7 +258,9 @@ def __init__( ): self._name: str = name self._active: bool = active - self._attributes: "Attributes" = {} + self._attributes: "Attributes" = { + "sentry.origin": "manual", + } if attributes: for attribute, value in attributes.items(): diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index 53feb9d0fc..150c551003 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -515,13 +515,18 @@ def dummy_task(self, x, y): assert celery_invocation(dummy_task, 1, 1)[0].wait() == 1 +@pytest.mark.parametrize("span_streaming", [True, False]) def test_traces_sampler_gets_task_info_in_sampling_context( + span_streaming, init_celery, celery_invocation, DictionaryContaining, # noqa:N803 ): traces_sampler = mock.Mock() - celery = init_celery(traces_sampler=traces_sampler) + celery = init_celery( + traces_sampler=traces_sampler, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task(name="dog_walk") def walk_dogs(x, y): @@ -618,13 +623,17 @@ def dummy_task(self, x, y): ) -def test_sentry_propagate_traces_override(init_celery): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_sentry_propagate_traces_override(span_streaming, init_celery): """ Test if the `sentry-propagate-traces` header given to `apply_async` overrides the `propagate_traces` parameter in the integration constructor. """ celery = init_celery( - propagate_traces=True, traces_sample_rate=1.0, release="abcdef" + propagate_traces=True, + traces_sample_rate=1.0, + release="abcdef", + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) @celery.task(name="dummy_task", bind=True) @@ -632,21 +641,38 @@ def dummy_task(self, message): trace_id = get_current_span().trace_id return trace_id - with start_transaction() as transaction: - transaction_trace_id = transaction.trace_id + if span_streaming: + with sentry_sdk.traces.start_span(name="parent") as span: + parent_trace_id = span.trace_id - # should propagate trace - task_transaction_id = dummy_task.apply_async( - args=("some message",), - ).get() - assert transaction_trace_id == task_transaction_id + # should propagate trace + task_trace_id = dummy_task.apply_async( + args=("some message",), + ).get() + assert parent_trace_id == task_trace_id - # should NOT propagate trace (overrides `propagate_traces` parameter in integration constructor) - task_transaction_id = dummy_task.apply_async( - args=("another message",), - headers={"sentry-propagate-traces": False}, - ).get() - assert transaction_trace_id != task_transaction_id + # should NOT propagate trace + task_trace_id = dummy_task.apply_async( + args=("another message",), + headers={"sentry-propagate-traces": False}, + ).get() + assert parent_trace_id != task_trace_id + else: + with start_transaction() as transaction: + transaction_trace_id = transaction.trace_id + + # should propagate trace + task_trace_id = dummy_task.apply_async( + args=("some message",), + ).get() + assert transaction_trace_id == task_trace_id + + # should NOT propagate trace + task_trace_id = dummy_task.apply_async( + args=("another message",), + headers={"sentry-propagate-traces": False}, + ).get() + assert transaction_trace_id != task_trace_id def test_apply_async_manually_span(sentry_init): @@ -1001,11 +1027,12 @@ def task(): ... sentry_sdk.flush() + parent = items.pop(-1).payload + assert parent["name"] == "custom parent" + assert parent["attributes"]["sentry.origin"] == "manual" + for item in items: - if item.payload["name"] != "custom parent": - assert ( - item.payload["attributes"]["sentry.origin"] == "auto.queue.celery" - ) + assert item.payload["attributes"]["sentry.origin"] == "auto.queue.celery" else: events = capture_events() @@ -1078,11 +1105,17 @@ def test_send_task_wrapped( if span_streaming: submit_span, outer = [item.payload for item in items] + assert outer["name"] == "custom parent" assert outer["is_segment"] is True + assert submit_span["name"] == "very_creative_task_name" assert submit_span["attributes"]["sentry.op"] == "queue.submit.celery" assert submit_span["trace_id"] == outer_span.trace_id + assert ( + submit_span["trace_id"] == kwargs["headers"]["sentry-trace"].split("-")[0] + ) + else: (event,) = events assert event["type"] == "transaction" @@ -1094,14 +1127,18 @@ def test_send_task_wrapped( assert span["trace_id"] == kwargs["headers"]["sentry-trace"].split("-")[0] -def test_user_custom_headers_accessible_in_task(init_celery): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_user_custom_headers_accessible_in_task(span_streaming, init_celery): """ Regression test for https://github.com/getsentry/sentry-python/issues/5566 User-provided custom headers passed to apply_async() must be accessible via task.request.headers on the worker side. """ - celery = init_celery(traces_sample_rate=1.0) + celery = init_celery( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task(name="custom_headers_task", bind=True) def custom_headers_task(self): @@ -1113,8 +1150,12 @@ def custom_headers_task(self): "tenant_id": "tenant-42", } - with start_transaction(name="test"): - result = custom_headers_task.apply_async(headers=custom_headers) + if span_streaming: + with sentry_sdk.traces.start_span(name="test"): + result = custom_headers_task.apply_async(headers=custom_headers) + else: + with start_transaction(name="test"): + result = custom_headers_task.apply_async(headers=custom_headers) received_headers = result.get() for key, value in custom_headers.items(): From 35fa49c61faec18d4edf8c1cdb8081c4dd6a0689 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 15 Apr 2026 15:28:21 +0200 Subject: [PATCH 08/12] fallback task name --- sentry_sdk/integrations/celery/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index bc16ceb912..9e81254839 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -553,7 +553,7 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any": # method will still work. kwargs_headers = {} - task_name = kwargs_headers.get("task") + task_name = kwargs_headers.get("task") or "" task_id = kwargs_headers.get("id") retries = kwargs_headers.get("retries") From ba5a9885df60447306f2d7f9fc3a7b2483e774cd Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 15 Apr 2026 15:29:15 +0200 Subject: [PATCH 09/12] . --- sentry_sdk/_span_batcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 426d5d9629..fc707dfe23 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -109,7 +109,7 @@ def _to_transport_format(item: "StreamedSpan") -> "Any": res: "dict[str, Any]" = { "trace_id": item.trace_id, "span_id": item.span_id, - "name": item._name, + "name": item._name if item._name is not None else "", "status": item._status, "is_segment": item._is_segment(), "start_timestamp": item._start_timestamp.timestamp(), From 71a6728161972b52c1d498155dff4da862268de3 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 15 Apr 2026 15:41:14 +0200 Subject: [PATCH 10/12] fix source and txn name --- sentry_sdk/integrations/celery/__init__.py | 2 -- sentry_sdk/scope.py | 13 +++++++++++++ tests/integrations/celery/test_celery.py | 2 +- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index 9e81254839..25fac605b8 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -339,8 +339,6 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": } } - scope.set_transaction_name(task.name, source=TransactionSource.TASK) - span: "Union[Span, StreamedSpan]" span_ctx: "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr() diff --git a/sentry_sdk/scope.py b/sentry_sdk/scope.py index 750e602127..ba387da7ff 100644 --- a/sentry_sdk/scope.py +++ b/sentry_sdk/scope.py @@ -893,6 +893,19 @@ def span(self, span: "Optional[Union[Span, StreamedSpan]]") -> None: if transaction.source: self._transaction_info["source"] = transaction.source + # Also set _transaction and _transaction_info in streaming mode as this + # is used for populating events and linking them to segments + if ( + isinstance(span, StreamedSpan) + and not isinstance(span, NoOpStreamedSpan) + and span._is_segment + ): + self._transaction = span.name + if span._attributes.get("sentry.span.source"): + self._transaction_info["source"] = span._attributes[ + "sentry.span.source" + ] + @property def profile(self) -> "Optional[Profile]": return self._profile diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index 150c551003..9d88733b2b 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -522,7 +522,7 @@ def test_traces_sampler_gets_task_info_in_sampling_context( celery_invocation, DictionaryContaining, # noqa:N803 ): - traces_sampler = mock.Mock() + traces_sampler = mock.Mock(return_value=1.0) celery = init_celery( traces_sampler=traces_sampler, _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, From 80a585afb7ce4f6c595d7b7fe254892d3bac48f7 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 15 Apr 2026 15:45:02 +0200 Subject: [PATCH 11/12] . --- sentry_sdk/scope.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/scope.py b/sentry_sdk/scope.py index ba387da7ff..f24edcf137 100644 --- a/sentry_sdk/scope.py +++ b/sentry_sdk/scope.py @@ -898,13 +898,13 @@ def span(self, span: "Optional[Union[Span, StreamedSpan]]") -> None: if ( isinstance(span, StreamedSpan) and not isinstance(span, NoOpStreamedSpan) - and span._is_segment + and span._is_segment() ): self._transaction = span.name if span._attributes.get("sentry.span.source"): - self._transaction_info["source"] = span._attributes[ - "sentry.span.source" - ] + self._transaction_info["source"] = str( + span._attributes["sentry.span.source"] + ) @property def profile(self) -> "Optional[Profile]": From ae964eb1b042c9f1996db38e3240a0e667bc648b Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 15 Apr 2026 15:53:36 +0200 Subject: [PATCH 12/12] add missing attr --- tests/tracing/test_span_streaming.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/tracing/test_span_streaming.py b/tests/tracing/test_span_streaming.py index 44f504cc26..8859aa39c3 100644 --- a/tests/tracing/test_span_streaming.py +++ b/tests/tracing/test_span_streaming.py @@ -1549,6 +1549,7 @@ def test_transport_format(sentry_init, capture_envelopes): "server.address": {"value": "test-server", "type": "string"}, "sentry.environment": {"value": "production", "type": "string"}, "sentry.release": {"value": "1.0.0", "type": "string"}, + "sentry.origin": {"value": "manual", "type": "string"}, }, } ]