diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 426d5d9629..fc707dfe23 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -109,7 +109,7 @@ def _to_transport_format(item: "StreamedSpan") -> "Any": res: "dict[str, Any]" = { "trace_id": item.trace_id, "span_id": item.span_id, - "name": item._name, + "name": item._name if item._name is not None else "", "status": item._status, "is_segment": item._is_segment(), "start_timestamp": item._start_timestamp.timestamp(), diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index b8b1691979..25fac605b8 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -14,11 +14,11 @@ ) from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch from sentry_sdk.integrations.logging import ignore_logger +from sentry_sdk.traces import StreamedSpan from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, Span, TransactionSource -from sentry_sdk.tracing_utils import Baggage +from sentry_sdk.tracing_utils import Baggage, has_span_streaming_enabled from sentry_sdk.utils import ( capture_internal_exceptions, - ensure_integration_enabled, event_from_exception, reraise, ) @@ -162,7 +162,9 @@ def event_processor(event: "Event", hint: "Hint") -> "Optional[Event]": def _update_celery_task_headers( - original_headers: "dict[str, Any]", span: "Optional[Span]", monitor_beat_tasks: bool + original_headers: "dict[str, Any]", + span: "Optional[Union[StreamedSpan, Span]]", + monitor_beat_tasks: bool, ) -> "dict[str, Any]": """ Updates the headers of the Celery task with the tracing information @@ -255,7 +257,8 @@ def _wrap_task_run(f: "F") -> "F": def apply_async(*args: "Any", **kwargs: "Any") -> "Any": # Note: kwargs can contain headers=None, so no setdefault! # Unsure which backend though. - integration = sentry_sdk.get_client().get_integration(CeleryIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(CeleryIntegration) if integration is None: return f(*args, **kwargs) @@ -274,17 +277,28 @@ def apply_async(*args: "Any", **kwargs: "Any") -> "Any": else: task_name = "" + span_streaming = has_span_streaming_enabled(client.options) + task_started_from_beat = sentry_sdk.get_isolation_scope()._name == "celery-beat" - span_mgr: "Union[Span, NoOpMgr]" = ( - sentry_sdk.start_span( - op=OP.QUEUE_SUBMIT_CELERY, - name=task_name, - origin=CeleryIntegration.origin, - ) - if not task_started_from_beat - else NoOpMgr() - ) + span_mgr: "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr() + if span_streaming: + if not task_started_from_beat and sentry_sdk.get_current_span() is not None: + span_mgr = sentry_sdk.traces.start_span( + name=task_name, + attributes={ + "sentry.op": OP.QUEUE_SUBMIT_CELERY, + "sentry.origin": CeleryIntegration.origin, + }, + ) + + else: + if not task_started_from_beat: + span_mgr = sentry_sdk.start_span( + op=OP.QUEUE_SUBMIT_CELERY, + name=task_name, + origin=CeleryIntegration.origin, + ) with span_mgr as span: kwargs["headers"] = _update_celery_task_headers( @@ -303,50 +317,74 @@ def _wrap_tracer(task: "Any", f: "F") -> "F": # Also because in Celery 3, signal dispatch returns early if one handler # crashes. @wraps(f) - @ensure_integration_enabled(CeleryIntegration, f) def _inner(*args: "Any", **kwargs: "Any") -> "Any": + client = sentry_sdk.get_client() + if client.get_integration(CeleryIntegration) is None: + return f(*args, **kwargs) + + span_streaming = has_span_streaming_enabled(client.options) + with isolation_scope() as scope: scope._name = "celery" scope.clear_breadcrumbs() scope.add_event_processor(_make_event_processor(task, *args, **kwargs)) - transaction = None + custom_sampling_context = { + "celery_job": { + "task": task.name, + # for some reason, args[1] is a list if non-empty but a + # tuple if empty + "args": list(args[1]), + "kwargs": args[2], + } + } + + span: "Union[Span, StreamedSpan]" + span_ctx: "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr() # Celery task objects are not a thing to be trusted. Even # something such as attribute access can fail. with capture_internal_exceptions(): headers = args[3].get("headers") or {} - transaction = continue_trace( - headers, - op=OP.QUEUE_TASK_CELERY, - name="unknown celery task", - source=TransactionSource.TASK, - origin=CeleryIntegration.origin, - ) - transaction.name = task.name - transaction.set_status(SPANSTATUS.OK) + if span_streaming: + sentry_sdk.traces.continue_trace(headers) + scope.set_custom_sampling_context(custom_sampling_context) + span = sentry_sdk.traces.start_span( + name=task.name, + parent_span=None, # make this a segment + attributes={ + "sentry.origin": CeleryIntegration.origin, + "sentry.span.source": TransactionSource.TASK.value, + "sentry.op": OP.QUEUE_TASK_CELERY, + }, + ) - if transaction is None: - return f(*args, **kwargs) + span_ctx = span + + else: + span = continue_trace( + headers, + op=OP.QUEUE_TASK_CELERY, + name=task.name, + source=TransactionSource.TASK, + origin=CeleryIntegration.origin, + ) + span.set_status(SPANSTATUS.OK) + + span_ctx = sentry_sdk.start_transaction( + span, + custom_sampling_context=custom_sampling_context, + ) - with sentry_sdk.start_transaction( - transaction, - custom_sampling_context={ - "celery_job": { - "task": task.name, - # for some reason, args[1] is a list if non-empty but a - # tuple if empty - "args": list(args[1]), - "kwargs": args[2], - } - }, - ): + with span_ctx: return f(*args, **kwargs) return _inner # type: ignore -def _set_messaging_destination_name(task: "Any", span: "Span") -> None: +def _set_messaging_destination_name( + task: "Any", span: "Union[StreamedSpan, Span]" +) -> None: """Set "messaging.destination.name" tag for span""" with capture_internal_exceptions(): delivery_info = task.request.delivery_info @@ -355,26 +393,47 @@ def _set_messaging_destination_name(task: "Any", span: "Span") -> None: if delivery_info.get("exchange") == "" and routing_key is not None: # Empty exchange indicates the default exchange, meaning the tasks # are sent to the queue with the same name as the routing key. - span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) + if isinstance(span, StreamedSpan): + span.set_attribute(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) + else: + span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) def _wrap_task_call(task: "Any", f: "F") -> "F": # Need to wrap task call because the exception is caught before we get to # see it. Also celery's reported stacktrace is untrustworthy. - # functools.wraps is important here because celery-once looks at this - # method's name. @ensure_integration_enabled internally calls functools.wraps, - # but if we ever remove the @ensure_integration_enabled decorator, we need - # to add @functools.wraps(f) here. - # https://github.com/getsentry/sentry-python/issues/421 - @ensure_integration_enabled(CeleryIntegration, f) + @wraps(f) def _inner(*args: "Any", **kwargs: "Any") -> "Any": + client = sentry_sdk.get_client() + if client.get_integration(CeleryIntegration) is None: + return f(*args, **kwargs) + + span_streaming = has_span_streaming_enabled(client.options) + try: - with sentry_sdk.start_span( - op=OP.QUEUE_PROCESS, - name=task.name, - origin=CeleryIntegration.origin, - ) as span: + span: "Union[Span, StreamedSpan]" + if span_streaming: + span = sentry_sdk.traces.start_span( + name=task.name, + attributes={ + "sentry.op": OP.QUEUE_PROCESS, + "sentry.origin": CeleryIntegration.origin, + }, + ) + else: + span = sentry_sdk.start_span( + op=OP.QUEUE_PROCESS, + name=task.name, + origin=CeleryIntegration.origin, + ) + + with span: + if isinstance(span, StreamedSpan): + set_on_span = span.set_attribute + else: + set_on_span = span.set_data + _set_messaging_destination_name(task, span) latency = None @@ -389,19 +448,19 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": if latency is not None: latency *= 1000 # milliseconds - span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency) + set_on_span(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency) with capture_internal_exceptions(): - span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id) + set_on_span(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id) with capture_internal_exceptions(): - span.set_data( + set_on_span( SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries ) with capture_internal_exceptions(): with task.app.connection() as conn: - span.set_data( + set_on_span( SPANDATA.MESSAGING_SYSTEM, conn.transport.driver_type, ) @@ -476,8 +535,13 @@ def sentry_workloop(*args: "Any", **kwargs: "Any") -> "Any": def _patch_producer_publish() -> None: original_publish = Producer.publish - @ensure_integration_enabled(CeleryIntegration, original_publish) def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any": + client = sentry_sdk.get_client() + if client.get_integration(CeleryIntegration) is None: + return original_publish(self, *args, **kwargs) + + span_streaming = has_span_streaming_enabled(client.options) + kwargs_headers = kwargs.get("headers", {}) if not isinstance(kwargs_headers, Mapping): # Ensure kwargs_headers is a Mapping, so we can safely call get(). @@ -487,31 +551,52 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any": # method will still work. kwargs_headers = {} - task_name = kwargs_headers.get("task") + task_name = kwargs_headers.get("task") or "" task_id = kwargs_headers.get("id") retries = kwargs_headers.get("retries") routing_key = kwargs.get("routing_key") exchange = kwargs.get("exchange") - with sentry_sdk.start_span( - op=OP.QUEUE_PUBLISH, - name=task_name, - origin=CeleryIntegration.origin, - ) as span: + span: "Union[StreamedSpan, Span, None]" = None + if span_streaming: + if sentry_sdk.get_current_span() is not None: + span = sentry_sdk.traces.start_span( + name=task_name, + attributes={ + "sentry.op": OP.QUEUE_PUBLISH, + "sentry.origin": CeleryIntegration.origin, + }, + ) + else: + span = sentry_sdk.start_span( + op=OP.QUEUE_PUBLISH, + name=task_name, + origin=CeleryIntegration.origin, + ) + + if span is None: + return original_publish(self, *args, **kwargs) + + with span: + if isinstance(span, StreamedSpan): + set_on_span = span.set_attribute + else: + set_on_span = span.set_data + if task_id is not None: - span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id) + set_on_span(SPANDATA.MESSAGING_MESSAGE_ID, task_id) if exchange == "" and routing_key is not None: # Empty exchange indicates the default exchange, meaning messages are # routed to the queue with the same name as the routing key. - span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) + set_on_span(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) if retries is not None: - span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries) + set_on_span(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries) with capture_internal_exceptions(): - span.set_data( + set_on_span( SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type ) diff --git a/sentry_sdk/scope.py b/sentry_sdk/scope.py index 750e602127..f24edcf137 100644 --- a/sentry_sdk/scope.py +++ b/sentry_sdk/scope.py @@ -893,6 +893,19 @@ def span(self, span: "Optional[Union[Span, StreamedSpan]]") -> None: if transaction.source: self._transaction_info["source"] = transaction.source + # Also set _transaction and _transaction_info in streaming mode as this + # is used for populating events and linking them to segments + if ( + isinstance(span, StreamedSpan) + and not isinstance(span, NoOpStreamedSpan) + and span._is_segment() + ): + self._transaction = span.name + if span._attributes.get("sentry.span.source"): + self._transaction_info["source"] = str( + span._attributes["sentry.span.source"] + ) + @property def profile(self) -> "Optional[Profile]": return self._profile diff --git a/sentry_sdk/traces.py b/sentry_sdk/traces.py index f44ef71f5b..a9b0070034 100644 --- a/sentry_sdk/traces.py +++ b/sentry_sdk/traces.py @@ -258,7 +258,9 @@ def __init__( ): self._name: str = name self._active: bool = active - self._attributes: "Attributes" = {} + self._attributes: "Attributes" = { + "sentry.origin": "manual", + } if attributes: for attribute, value in attributes.items(): diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index 5d2d19c06a..9d88733b2b 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -117,23 +117,45 @@ def celery_invocation(request): return request.param -def test_simple_with_performance(capture_events, init_celery, celery_invocation): - celery = init_celery(traces_sample_rate=1.0) - events = capture_events() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_simple_with_performance( + capture_events, capture_items, init_celery, celery_invocation, span_streaming +): + celery = init_celery( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task(name="dummy_task") def dummy_task(x, y): foo = 42 # noqa return x / y - with start_transaction(op="unit test transaction") as transaction: - celery_invocation(dummy_task, 1, 2) - _, expected_context = celery_invocation(dummy_task, 1, 0) + if span_streaming: + items = capture_items("event", "span") + + with sentry_sdk.traces.start_span(name="span") as span: + celery_invocation(dummy_task, 1, 2) + _, expected_context = celery_invocation(dummy_task, 1, 0) + + sentry_sdk.flush() - (_, error_event, _, _) = events + error_event = next(item.payload for item in items if item.type == "event") + + assert error_event["contexts"]["trace"]["trace_id"] == span.trace_id + assert error_event["contexts"]["trace"]["span_id"] != span.span_id + else: + events = capture_events() + + with start_transaction(op="unit test transaction") as transaction: + celery_invocation(dummy_task, 1, 2) + _, expected_context = celery_invocation(dummy_task, 1, 0) + + (_, error_event, _, _) = events + + assert error_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert error_event["contexts"]["trace"]["span_id"] != transaction.span_id - assert error_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert error_event["contexts"]["trace"]["span_id"] != transaction.span_id assert error_event["transaction"] == "dummy_task" assert "celery_task_id" in error_event["tags"] assert error_event["extra"]["celery-job"] == dict( @@ -182,9 +204,20 @@ def dummy_task(x, y): assert exception["stacktrace"]["frames"][0]["vars"]["foo"] == "42" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("task_fails", [True, False], ids=["error", "success"]) -def test_transaction_events(capture_events, init_celery, celery_invocation, task_fails): - celery = init_celery(traces_sample_rate=1.0) +def test_transaction_events( + capture_events, + capture_items, + init_celery, + celery_invocation, + task_fails, + span_streaming, +): + celery = init_celery( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task(name="dummy_task") def dummy_task(x, y): @@ -192,58 +225,102 @@ def dummy_task(x, y): # XXX: For some reason the first call does not get instrumented properly. celery_invocation(dummy_task, 1, 1) + sentry_sdk.flush() - events = capture_events() + if span_streaming: + items = capture_items("event", "span") - with start_transaction(name="submission") as transaction: - celery_invocation(dummy_task, 1, 0 if task_fails else 1) + with sentry_sdk.traces.start_span(name="submission") as span: + celery_invocation(dummy_task, 1, 0 if task_fails else 1) - if task_fails: - error_event = events.pop(0) - assert error_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" + sentry_sdk.flush() - execution_event, submission_event = events - assert execution_event["transaction"] == "dummy_task" - assert execution_event["transaction_info"] == {"source": "task"} + if task_fails: + error_event = items.pop(0).payload + assert error_event["contexts"]["trace"]["trace_id"] == span.trace_id + assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" - assert submission_event["transaction"] == "submission" - assert submission_event["transaction_info"] == {"source": "custom"} + process_span, execution_span, submit_span, submission_span = [ + item.payload for item in items + ] - assert execution_event["type"] == submission_event["type"] == "transaction" - assert execution_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert submission_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert execution_span["name"] == "dummy_task" + assert execution_span["is_segment"] is True + assert execution_span["attributes"]["sentry.span.source"] == "task" + assert execution_span["trace_id"] == span.trace_id + if task_fails: + assert execution_span["status"] == "error" + else: + assert execution_span["status"] == "ok" + + assert process_span["name"] == "dummy_task" + assert process_span["trace_id"] == span.trace_id + assert process_span["attributes"]["sentry.op"] == "queue.process" + assert process_span["parent_span_id"] == execution_span["span_id"] + + assert submission_span["name"] == "submission" + assert submission_span["is_segment"] is True + + assert submit_span["name"] == "dummy_task" + assert submit_span["attributes"]["sentry.op"] == "queue.submit.celery" + assert submit_span["attributes"]["sentry.origin"] == "auto.queue.celery" + assert ( + submit_span["parent_span_id"] == submission_span["span_id"] == span.span_id + ) + assert submit_span["trace_id"] == span.trace_id - if task_fails: - assert execution_event["contexts"]["trace"]["status"] == "internal_error" else: - assert execution_event["contexts"]["trace"]["status"] == "ok" + events = capture_events() - assert len(execution_event["spans"]) == 1 - assert ( - execution_event["spans"][0].items() - >= { - "trace_id": str(transaction.trace_id), - "same_process_as_parent": True, - "op": "queue.process", - "description": "dummy_task", - "data": ApproxDict(), - }.items() - ) - assert submission_event["spans"] == [ - { - "data": ApproxDict(), - "description": "dummy_task", - "op": "queue.submit.celery", - "origin": "auto.queue.celery", - "parent_span_id": submission_event["contexts"]["trace"]["span_id"], - "same_process_as_parent": True, - "span_id": submission_event["spans"][0]["span_id"], - "start_timestamp": submission_event["spans"][0]["start_timestamp"], - "timestamp": submission_event["spans"][0]["timestamp"], - "trace_id": str(transaction.trace_id), - } - ] + with start_transaction(name="submission") as transaction: + celery_invocation(dummy_task, 1, 0 if task_fails else 1) + + if task_fails: + error_event = events.pop(0) + assert error_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" + + execution_event, submission_event = events + assert execution_event["transaction"] == "dummy_task" + assert execution_event["transaction_info"] == {"source": "task"} + + assert submission_event["transaction"] == "submission" + assert submission_event["transaction_info"] == {"source": "custom"} + + assert execution_event["type"] == submission_event["type"] == "transaction" + assert execution_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert submission_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + + if task_fails: + assert execution_event["contexts"]["trace"]["status"] == "internal_error" + else: + assert execution_event["contexts"]["trace"]["status"] == "ok" + + assert len(execution_event["spans"]) == 1 + assert ( + execution_event["spans"][0].items() + >= { + "trace_id": str(transaction.trace_id), + "same_process_as_parent": True, + "op": "queue.process", + "description": "dummy_task", + "data": ApproxDict(), + }.items() + ) + assert submission_event["spans"] == [ + { + "data": ApproxDict(), + "description": "dummy_task", + "op": "queue.submit.celery", + "origin": "auto.queue.celery", + "parent_span_id": submission_event["contexts"]["trace"]["span_id"], + "same_process_as_parent": True, + "span_id": submission_event["spans"][0]["span_id"], + "start_timestamp": submission_event["spans"][0]["start_timestamp"], + "timestamp": submission_event["spans"][0]["timestamp"], + "trace_id": str(transaction.trace_id), + } + ] def test_no_double_patching(celery): @@ -438,13 +515,18 @@ def dummy_task(self, x, y): assert celery_invocation(dummy_task, 1, 1)[0].wait() == 1 +@pytest.mark.parametrize("span_streaming", [True, False]) def test_traces_sampler_gets_task_info_in_sampling_context( + span_streaming, init_celery, celery_invocation, DictionaryContaining, # noqa:N803 ): - traces_sampler = mock.Mock() - celery = init_celery(traces_sampler=traces_sampler) + traces_sampler = mock.Mock(return_value=1.0) + celery = init_celery( + traces_sampler=traces_sampler, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task(name="dog_walk") def walk_dogs(x, y): @@ -541,13 +623,17 @@ def dummy_task(self, x, y): ) -def test_sentry_propagate_traces_override(init_celery): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_sentry_propagate_traces_override(span_streaming, init_celery): """ Test if the `sentry-propagate-traces` header given to `apply_async` overrides the `propagate_traces` parameter in the integration constructor. """ celery = init_celery( - propagate_traces=True, traces_sample_rate=1.0, release="abcdef" + propagate_traces=True, + traces_sample_rate=1.0, + release="abcdef", + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) @celery.task(name="dummy_task", bind=True) @@ -555,21 +641,38 @@ def dummy_task(self, message): trace_id = get_current_span().trace_id return trace_id - with start_transaction() as transaction: - transaction_trace_id = transaction.trace_id + if span_streaming: + with sentry_sdk.traces.start_span(name="parent") as span: + parent_trace_id = span.trace_id + + # should propagate trace + task_trace_id = dummy_task.apply_async( + args=("some message",), + ).get() + assert parent_trace_id == task_trace_id - # should propagate trace - task_transaction_id = dummy_task.apply_async( - args=("some message",), - ).get() - assert transaction_trace_id == task_transaction_id + # should NOT propagate trace + task_trace_id = dummy_task.apply_async( + args=("another message",), + headers={"sentry-propagate-traces": False}, + ).get() + assert parent_trace_id != task_trace_id + else: + with start_transaction() as transaction: + transaction_trace_id = transaction.trace_id - # should NOT propagate trace (overrides `propagate_traces` parameter in integration constructor) - task_transaction_id = dummy_task.apply_async( - args=("another message",), - headers={"sentry-propagate-traces": False}, - ).get() - assert transaction_trace_id != task_transaction_id + # should propagate trace + task_trace_id = dummy_task.apply_async( + args=("some message",), + ).get() + assert transaction_trace_id == task_trace_id + + # should NOT propagate trace + task_trace_id = dummy_task.apply_async( + args=("another message",), + headers={"sentry-propagate-traces": False}, + ).get() + assert transaction_trace_id != task_trace_id def test_apply_async_manually_span(sentry_init): @@ -601,28 +704,47 @@ def example_task(): assert result.get() == "success" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("routing_key", ("celery", "custom")) @mock.patch("celery.app.task.Task.request") def test_messaging_destination_name_default_exchange( - mock_request, routing_key, init_celery, capture_events + mock_request, + routing_key, + span_streaming, + init_celery, + capture_events, + capture_items, ): - celery_app = init_celery(traces_sample_rate=1.0) - events = capture_events() + celery_app = init_celery( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) mock_request.delivery_info = {"routing_key": routing_key, "exchange": ""} @celery_app.task() def task(): ... - task.apply_async() - - (event,) = events - (span,) = event["spans"] - assert span["data"]["messaging.destination.name"] == routing_key + if span_streaming: + items = capture_items("span") + task.apply_async() + sentry_sdk.flush() + process_span, _execution_span = items + assert ( + process_span.payload["attributes"]["messaging.destination.name"] + == routing_key + ) + else: + events = capture_events() + task.apply_async() + (event,) = events + (span,) = event["spans"] + assert span["data"]["messaging.destination.name"] == routing_key +@pytest.mark.parametrize("span_streaming", [True, False]) @mock.patch("celery.app.task.Task.request") def test_messaging_destination_name_nondefault_exchange( - mock_request, init_celery, capture_events + mock_request, span_streaming, init_celery, capture_events, capture_items ): """ Currently, we only capture the routing key as the messaging.destination.name when @@ -630,69 +752,115 @@ def test_messaging_destination_name_nondefault_exchange( that the routing key is the queue name. Other exchanges may not guarantee this behavior. """ - celery_app = init_celery(traces_sample_rate=1.0) - events = capture_events() + celery_app = init_celery( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) mock_request.delivery_info = {"routing_key": "celery", "exchange": "custom"} @celery_app.task() def task(): ... - task.apply_async() - - (event,) = events - (span,) = event["spans"] - assert "messaging.destination.name" not in span["data"] + if span_streaming: + items = capture_items("span") + task.apply_async() + sentry_sdk.flush() + process_span, _execution_span = items + assert "messaging.destination.name" not in process_span.payload["attributes"] + else: + events = capture_events() + task.apply_async() + (event,) = events + (span,) = event["spans"] + assert "messaging.destination.name" not in span["data"] -def test_messaging_id(init_celery, capture_events): - celery = init_celery(traces_sample_rate=1.0) - events = capture_events() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_messaging_id(span_streaming, init_celery, capture_events, capture_items): + celery = init_celery( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task def example_task(): ... - example_task.apply_async() - - (event,) = events - (span,) = event["spans"] - assert "messaging.message.id" in span["data"] + if span_streaming: + items = capture_items("span") + example_task.apply_async() + sentry_sdk.flush() + process_span, _execution_span = items + assert "messaging.message.id" in process_span.payload["attributes"] + else: + events = capture_events() + example_task.apply_async() + (event,) = events + (span,) = event["spans"] + assert "messaging.message.id" in span["data"] -def test_retry_count_zero(init_celery, capture_events): - celery = init_celery(traces_sample_rate=1.0) - events = capture_events() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_retry_count_zero(span_streaming, init_celery, capture_events, capture_items): + celery = init_celery( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task() def task(): ... - task.apply_async() - - (event,) = events - (span,) = event["spans"] - assert span["data"]["messaging.message.retry.count"] == 0 + if span_streaming: + items = capture_items("span") + task.apply_async() + sentry_sdk.flush() + process_span, _execution_span = items + assert process_span.payload["attributes"]["messaging.message.retry.count"] == 0 + else: + events = capture_events() + task.apply_async() + (event,) = events + (span,) = event["spans"] + assert span["data"]["messaging.message.retry.count"] == 0 +@pytest.mark.parametrize("span_streaming", [True, False]) @mock.patch("celery.app.task.Task.request") -def test_retry_count_nonzero(mock_request, init_celery, capture_events): +def test_retry_count_nonzero( + mock_request, span_streaming, init_celery, capture_events, capture_items +): mock_request.retries = 3 - celery = init_celery(traces_sample_rate=1.0) - events = capture_events() + celery = init_celery( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task() def task(): ... - task.apply_async() - - (event,) = events - (span,) = event["spans"] - assert span["data"]["messaging.message.retry.count"] == 3 + if span_streaming: + items = capture_items("span") + task.apply_async() + sentry_sdk.flush() + process_span, _execution_span = items + assert process_span.payload["attributes"]["messaging.message.retry.count"] == 3 + else: + events = capture_events() + task.apply_async() + (event,) = events + (span,) = event["spans"] + assert span["data"]["messaging.message.retry.count"] == 3 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("system", ("redis", "amqp")) -def test_messaging_system(system, init_celery, capture_events): - celery = init_celery(traces_sample_rate=1.0) - events = capture_events() +def test_messaging_system( + system, span_streaming, init_celery, capture_events, capture_items +): + celery = init_celery( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) # Does not need to be a real URL, since we use always eager celery.conf.broker_url = f"{system}://example.com" # noqa: E231 @@ -700,15 +868,25 @@ def test_messaging_system(system, init_celery, capture_events): @celery.task() def task(): ... - task.apply_async() - - (event,) = events - (span,) = event["spans"] - assert span["data"]["messaging.system"] == system + if span_streaming: + items = capture_items("span") + task.apply_async() + sentry_sdk.flush() + process_span, _execution_span = items + assert process_span.payload["attributes"]["messaging.system"] == system + else: + events = capture_events() + task.apply_async() + (event,) = events + (span,) = event["spans"] + assert span["data"]["messaging.system"] == system +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("system", ("amqp", "redis")) -def test_producer_span_data(system, monkeypatch, sentry_init, capture_events): +def test_producer_span_data( + system, span_streaming, monkeypatch, sentry_init, capture_events, capture_items +): old_publish = kombu.messaging.Producer._publish def publish(*args, **kwargs): @@ -716,61 +894,114 @@ def publish(*args, **kwargs): monkeypatch.setattr(kombu.messaging.Producer, "_publish", publish) - sentry_init(integrations=[CeleryIntegration()], traces_sample_rate=1.0) + sentry_init( + integrations=[CeleryIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) celery = Celery(__name__, broker=f"{system}://example.com") # noqa: E231 - events = capture_events() @celery.task() def task(): ... - with start_transaction(): - task.apply_async() + if span_streaming: + items = capture_items("span") - (event,) = events - span = next(span for span in event["spans"] if span["op"] == "queue.publish") + with sentry_sdk.traces.start_span(name="producer test"): + task.apply_async() + + sentry_sdk.flush() + + span_items = [item.payload for item in items] + publish_span = next( + s for s in span_items if s["attributes"].get("sentry.op") == "queue.publish" + ) + + assert publish_span["attributes"]["messaging.system"] == system + assert publish_span["attributes"]["messaging.destination.name"] == "celery" + assert "messaging.message.id" in publish_span["attributes"] + assert publish_span["attributes"]["messaging.message.retry.count"] == 0 + else: + events = capture_events() + + with start_transaction(): + task.apply_async() - assert span["data"]["messaging.system"] == system + (event,) = events + span = next(span for span in event["spans"] if span["op"] == "queue.publish") - assert span["data"]["messaging.destination.name"] == "celery" - assert "messaging.message.id" in span["data"] - assert span["data"]["messaging.message.retry.count"] == 0 + assert span["data"]["messaging.system"] == system + assert span["data"]["messaging.destination.name"] == "celery" + assert "messaging.message.id" in span["data"] + assert span["data"]["messaging.message.retry.count"] == 0 monkeypatch.setattr(kombu.messaging.Producer, "_publish", old_publish) -def test_receive_latency(init_celery, capture_events): - celery = init_celery(traces_sample_rate=1.0) - events = capture_events() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_receive_latency(span_streaming, init_celery, capture_events, capture_items): + celery = init_celery( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task() def task(): ... - task.apply_async() - - (event,) = events - (span,) = event["spans"] - assert "messaging.message.receive.latency" in span["data"] - assert span["data"]["messaging.message.receive.latency"] > 0 + if span_streaming: + items = capture_items("span") + task.apply_async() + sentry_sdk.flush() + process_span, _execution_span = items + assert "messaging.message.receive.latency" in process_span.payload["attributes"] + assert ( + process_span.payload["attributes"]["messaging.message.receive.latency"] > 0 + ) + else: + events = capture_events() + task.apply_async() + (event,) = events + (span,) = event["spans"] + assert "messaging.message.receive.latency" in span["data"] + assert span["data"]["messaging.message.receive.latency"] > 0 -def tests_span_origin_consumer(init_celery, capture_events): - celery = init_celery(traces_sample_rate=1.0) +@pytest.mark.parametrize("span_streaming", [True, False]) +def tests_span_origin_consumer( + span_streaming, init_celery, capture_events, capture_items +): + celery = init_celery( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) celery.conf.broker_url = "redis://example.com" # noqa: E231 - events = capture_events() - @celery.task() def task(): ... - task.apply_async() - - (event,) = events - - assert event["contexts"]["trace"]["origin"] == "auto.queue.celery" - assert event["spans"][0]["origin"] == "auto.queue.celery" + if span_streaming: + items = capture_items("span") + task.apply_async() + sentry_sdk.flush() + process_span, execution_span = items + assert ( + execution_span.payload["attributes"]["sentry.origin"] == "auto.queue.celery" + ) + assert ( + process_span.payload["attributes"]["sentry.origin"] == "auto.queue.celery" + ) + else: + events = capture_events() + task.apply_async() + (event,) = events + assert event["contexts"]["trace"]["origin"] == "auto.queue.celery" + assert event["spans"][0]["origin"] == "auto.queue.celery" -def tests_span_origin_producer(monkeypatch, sentry_init, capture_events): +@pytest.mark.parametrize("span_streaming", [True, False]) +def tests_span_origin_producer( + span_streaming, monkeypatch, sentry_init, capture_events, capture_items +): old_publish = kombu.messaging.Producer._publish def publish(*args, **kwargs): @@ -778,42 +1009,77 @@ def publish(*args, **kwargs): monkeypatch.setattr(kombu.messaging.Producer, "_publish", publish) - sentry_init(integrations=[CeleryIntegration()], traces_sample_rate=1.0) + sentry_init( + integrations=[CeleryIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) celery = Celery(__name__, broker="redis://example.com") # noqa: E231 - events = capture_events() - @celery.task() def task(): ... - with start_transaction(name="custom_transaction"): - task.apply_async() + if span_streaming: + items = capture_items("span") - (event,) = events + with sentry_sdk.traces.start_span(name="custom parent"): + task.apply_async() + + sentry_sdk.flush() + + parent = items.pop(-1).payload + assert parent["name"] == "custom parent" + assert parent["attributes"]["sentry.origin"] == "manual" + + for item in items: + assert item.payload["attributes"]["sentry.origin"] == "auto.queue.celery" + else: + events = capture_events() + + with start_transaction(name="custom_transaction"): + task.apply_async() + + (event,) = events - assert event["contexts"]["trace"]["origin"] == "manual" + assert event["contexts"]["trace"]["origin"] == "manual" - for span in event["spans"]: - assert span["origin"] == "auto.queue.celery" + for span in event["spans"]: + assert span["origin"] == "auto.queue.celery" monkeypatch.setattr(kombu.messaging.Producer, "_publish", old_publish) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.forked @mock.patch("celery.Celery.send_task") def test_send_task_wrapped( patched_send_task, + span_streaming, sentry_init, capture_events, + capture_items, reset_integrations, ): - sentry_init(integrations=[CeleryIntegration()], traces_sample_rate=1.0) + sentry_init( + integrations=[CeleryIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) celery = Celery(__name__, broker="redis://example.com") # noqa: E231 - events = capture_events() - - with sentry_sdk.start_transaction(name="custom_transaction"): - celery.send_task("very_creative_task_name", args=(1, 2), kwargs={"foo": "bar"}) + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="custom parent") as outer_span: + celery.send_task( + "very_creative_task_name", args=(1, 2), kwargs={"foo": "bar"} + ) + sentry_sdk.flush() + else: + events = capture_events() + with sentry_sdk.start_transaction(name="custom_transaction"): + celery.send_task( + "very_creative_task_name", args=(1, 2), kwargs={"foo": "bar"} + ) (call,) = patched_send_task.call_args_list # We should have exactly one call (args, kwargs) = call @@ -837,24 +1103,42 @@ def test_send_task_wrapped( == kwargs["headers"]["headers"]["sentry-trace"] ) - (event,) = events # We should have exactly one event (the transaction) - assert event["type"] == "transaction" - assert event["transaction"] == "custom_transaction" + if span_streaming: + submit_span, outer = [item.payload for item in items] - (span,) = event["spans"] # We should have exactly one span - assert span["description"] == "very_creative_task_name" - assert span["op"] == "queue.submit.celery" - assert span["trace_id"] == kwargs["headers"]["sentry-trace"].split("-")[0] + assert outer["name"] == "custom parent" + assert outer["is_segment"] is True + + assert submit_span["name"] == "very_creative_task_name" + assert submit_span["attributes"]["sentry.op"] == "queue.submit.celery" + assert submit_span["trace_id"] == outer_span.trace_id + assert ( + submit_span["trace_id"] == kwargs["headers"]["sentry-trace"].split("-")[0] + ) + + else: + (event,) = events + assert event["type"] == "transaction" + assert event["transaction"] == "custom_transaction" + + (span,) = event["spans"] + assert span["description"] == "very_creative_task_name" + assert span["op"] == "queue.submit.celery" + assert span["trace_id"] == kwargs["headers"]["sentry-trace"].split("-")[0] -def test_user_custom_headers_accessible_in_task(init_celery): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_user_custom_headers_accessible_in_task(span_streaming, init_celery): """ Regression test for https://github.com/getsentry/sentry-python/issues/5566 User-provided custom headers passed to apply_async() must be accessible via task.request.headers on the worker side. """ - celery = init_celery(traces_sample_rate=1.0) + celery = init_celery( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) @celery.task(name="custom_headers_task", bind=True) def custom_headers_task(self): @@ -866,8 +1150,12 @@ def custom_headers_task(self): "tenant_id": "tenant-42", } - with start_transaction(name="test"): - result = custom_headers_task.apply_async(headers=custom_headers) + if span_streaming: + with sentry_sdk.traces.start_span(name="test"): + result = custom_headers_task.apply_async(headers=custom_headers) + else: + with start_transaction(name="test"): + result = custom_headers_task.apply_async(headers=custom_headers) received_headers = result.get() for key, value in custom_headers.items(): diff --git a/tests/tracing/test_span_streaming.py b/tests/tracing/test_span_streaming.py index 44f504cc26..8859aa39c3 100644 --- a/tests/tracing/test_span_streaming.py +++ b/tests/tracing/test_span_streaming.py @@ -1549,6 +1549,7 @@ def test_transport_format(sentry_init, capture_envelopes): "server.address": {"value": "test-server", "type": "string"}, "sentry.environment": {"value": "production", "type": "string"}, "sentry.release": {"value": "1.0.0", "type": "string"}, + "sentry.origin": {"value": "manual", "type": "string"}, }, } ]