# Copyright 2021 The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Cloud Trace Span Exporter for OpenTelemetry. Uses Cloud Trace Client's REST
API to export traces and spans for viewing in Cloud Trace.
Usage
-----
.. code-block:: python
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
# For debugging
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
# Otherwise
from opentelemetry.sdk.trace.export import BatchSpanProcessor
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter()
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(cloud_trace_exporter)
)
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("foo"):
print("Hello world!")
When not debugging, make sure to use
:class:`opentelemetry.sdk.trace.export.BatchSpanProcessor` with the
default parameters for performance reasons.
Auto-instrumentation
--------------------
This exporter can also be used with `OpenTelemetry auto-instrumentation
<https://opentelemetry.io/docs/instrumentation/python/automatic/>`_:
.. code-block:: sh
opentelemetry-instrument --traces_exporter gcp_trace <command> <args>
Configuration is supported through environment variables
(:mod:`opentelemetry.exporter.cloud_trace.environment_variables`) or the corresponding command
line arguments to ``opentelemetry-instrument``:
.. code-block:: sh
opentelemetry-instrument --traces_exporter gcp_trace \\
--exporter_gcp_trace_project_id my-project \\
<command> <args>
See ``opentelemetry-instrument --help`` for all configuration options.
API
---
"""
import logging
import re
from collections.abc import Sequence as SequenceABC
from os import environ
from typing import (
Any,
Dict,
List,
Optional,
Pattern,
Sequence,
Tuple,
overload,
)
import google.auth
import opentelemetry.trace as trace_api
import pkg_resources
from google.cloud.trace_v2 import BatchWriteSpansRequest, TraceServiceClient
from google.cloud.trace_v2 import types as trace_types
from google.cloud.trace_v2.services.trace_service.transports import (
TraceServiceGrpcTransport,
)
from google.protobuf.timestamp_pb2 import ( # pylint: disable=no-name-in-module
Timestamp,
)
from google.rpc import code_pb2, status_pb2
from opentelemetry.exporter.cloud_trace.environment_variables import (
OTEL_EXPORTER_GCP_TRACE_PROJECT_ID,
OTEL_EXPORTER_GCP_TRACE_RESOURCE_REGEX,
)
from opentelemetry.exporter.cloud_trace.version import __version__
from opentelemetry.resourcedetector.gcp_resource_detector import (
_constants as _resource_constants,
)
from opentelemetry.resourcedetector.gcp_resource_detector._mapping import (
get_monitored_resource,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import Event
from opentelemetry.sdk.trace.export import (
ReadableSpan,
SpanExporter,
SpanExportResult,
)
from opentelemetry.sdk.util import BoundedDict
from opentelemetry.trace import format_span_id, format_trace_id
from opentelemetry.trace.status import StatusCode
from opentelemetry.util import types
logger = logging.getLogger(__name__)
_OTEL_SDK_VERSION = pkg_resources.get_distribution("opentelemetry-sdk").version
_USER_AGENT = f"opentelemetry-python {_OTEL_SDK_VERSION}; google-cloud-trace-exporter {__version__}"
# Set user-agent metadata, see https://github.com/grpc/grpc/issues/23644 and default options
# from
# https://github.com/googleapis/python-trace/blob/v1.7.3/google/cloud/trace_v1/services/trace_service/transports/grpc.py#L177-L180
_OPTIONS = [
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
("grpc.primary_user_agent", _USER_AGENT),
]
MAX_NUM_LINKS = 128
MAX_NUM_EVENTS = 32
MAX_EVENT_ATTRS = 4
MAX_LINK_ATTRS = 32
MAX_SPAN_ATTRS = 32
MAX_ATTR_KEY_BYTES = 128
MAX_ATTR_VAL_BYTES = 16 * 1024 # 16 kilobytes
[docs]class CloudTraceSpanExporter(SpanExporter):
"""Cloud Trace span exporter for OpenTelemetry.
Args:
project_id: GCP project ID for the project to send spans to. Alternatively, can be
configured with :envvar:`OTEL_EXPORTER_GCP_TRACE_PROJECT_ID`.
client: Cloud Trace client. If not given, will be taken from gcloud
default credentials
resource_regex: Resource attributes with keys matching this regex will be added to
exported spans as labels (default: None). Alternatively, can be configured with
:envvar:`OTEL_EXPORTER_GCP_TRACE_RESOURCE_REGEX`.
"""
def __init__(
self,
project_id=None,
client=None,
resource_regex=None,
):
self.client: TraceServiceClient = client or TraceServiceClient(
transport=TraceServiceGrpcTransport(
channel=TraceServiceGrpcTransport.create_channel(
options=_OPTIONS
)
)
)
if not project_id:
project_id = environ.get(OTEL_EXPORTER_GCP_TRACE_PROJECT_ID)
if not project_id:
_, project_id = google.auth.default()
self.project_id = project_id
if not resource_regex:
resource_regex = environ.get(
OTEL_EXPORTER_GCP_TRACE_RESOURCE_REGEX
)
self.resource_regex = (
re.compile(resource_regex) if resource_regex else None
)
[docs] def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
"""Export the spans to Cloud Trace.
See: https://cloud.google.com/trace/docs/reference/v2/rest/v2/projects.traces/batchWrite
Args:
spans: Sequence of spans to export
"""
try:
self.client.batch_write_spans(
request=BatchWriteSpansRequest(
name="projects/{}".format(self.project_id),
spans=self._translate_to_cloud_trace(spans),
)
)
# pylint: disable=broad-except
except Exception as ex:
logger.error("Error while writing to Cloud Trace", exc_info=ex)
return SpanExportResult.FAILURE
return SpanExportResult.SUCCESS
def _translate_to_cloud_trace(
self, spans: Sequence[ReadableSpan]
) -> List[trace_types.Span]:
"""Translate the spans to Cloud Trace format.
Args:
spans: Sequence of spans to convert
"""
cloud_trace_spans: List[trace_types.Span] = []
for span in spans:
ctx = span.get_span_context()
trace_id = format_trace_id(ctx.trace_id)
span_id = format_span_id(ctx.span_id)
span_name = "projects/{}/traces/{}/spans/{}".format(
self.project_id, trace_id, span_id
)
parent_id = None
if span.parent:
parent_id = format_span_id(span.parent.span_id)
start_time = _get_time_from_ns(span.start_time)
end_time = _get_time_from_ns(span.end_time)
if span.attributes and len(span.attributes) > MAX_SPAN_ATTRS:
logger.warning(
"Span has more then %s attributes, some will be truncated",
MAX_SPAN_ATTRS,
)
# Span does not support a MonitoredResource object. We put the
# information into attributes instead.
resources_and_attrs = {
**(span.attributes or {}),
**_extract_resources(span.resource, self.resource_regex),
}
cloud_trace_spans.append(
trace_types.Span(
name=span_name,
span_id=span_id,
display_name=_get_truncatable_str_object(span.name, 128),
start_time=start_time,
end_time=end_time,
parent_span_id=parent_id,
attributes=_extract_attributes(
resources_and_attrs,
MAX_SPAN_ATTRS,
add_agent_attr=True,
),
links=_extract_links(span.links),
status=_extract_status(span.status),
time_events=_extract_events(span.events),
span_kind=_extract_span_kind(span.kind),
)
)
# TODO: Leverage more of the Cloud Trace API, e.g.
# same_process_as_parent_span and child_span_count
return cloud_trace_spans
[docs] def shutdown(self):
pass
def _get_time_from_ns(nanoseconds: Optional[int]) -> Optional[Timestamp]:
"""Given epoch nanoseconds, split into epoch milliseconds and remaining
nanoseconds"""
if not nanoseconds:
return None
ts = Timestamp()
# pylint: disable=no-member
ts.FromNanoseconds(nanoseconds)
return ts
def _get_truncatable_str_object(str_to_convert: str, max_length: int):
"""Truncate the string if it exceeds the length limit and record the
truncated bytes count."""
truncated, truncated_byte_count = _truncate_str(str_to_convert, max_length)
return trace_types.TruncatableString(
value=truncated, truncated_byte_count=truncated_byte_count
)
def _truncate_str(str_to_check: str, limit: int) -> Tuple[str, int]:
"""Check the length of a string. If exceeds limit, then truncate it."""
encoded = str_to_check.encode("utf-8")
truncated_str = encoded[:limit].decode("utf-8", errors="ignore")
return truncated_str, len(encoded) - len(truncated_str.encode("utf-8"))
def _extract_status(status: trace_api.Status) -> Optional[status_pb2.Status]:
"""Convert a OTel Status to protobuf Status."""
if status.status_code is StatusCode.UNSET:
status_proto = None
elif status.status_code is StatusCode.OK:
status_proto = status_pb2.Status(code=code_pb2.OK)
elif status.status_code is StatusCode.ERROR:
status_proto = status_pb2.Status(
code=code_pb2.UNKNOWN, message=status.description or ""
)
# future added value
else:
logger.info(
"Couldn't handle OTel status code %s, assuming error",
status.status_code,
)
status_proto = status_pb2.Status(
code=code_pb2.UNKNOWN, message=status.description or ""
)
return status_proto
def _extract_links(
links: Sequence[trace_api.Link],
) -> Optional[trace_types.Span.Links]:
"""Convert span.links"""
if not links:
return None
extracted_links: List[trace_types.Span.Link] = []
dropped_links = 0
if len(links) > MAX_NUM_LINKS:
logger.warning(
"Exporting more then %s links, some will be truncated",
MAX_NUM_LINKS,
)
dropped_links = len(links) - MAX_NUM_LINKS
links = links[:MAX_NUM_LINKS]
for link in links:
link_attributes = link.attributes or {}
if len(link_attributes) > MAX_LINK_ATTRS:
logger.warning(
"Link has more then %s attributes, some will be truncated",
MAX_LINK_ATTRS,
)
trace_id = format_trace_id(link.context.trace_id)
span_id = format_span_id(link.context.span_id)
extracted_links.append(
trace_types.Span.Link(
trace_id=trace_id,
span_id=span_id,
type="TYPE_UNSPECIFIED",
attributes=_extract_attributes(
link_attributes, MAX_LINK_ATTRS
),
)
)
return trace_types.Span.Links(
link=extracted_links, dropped_links_count=dropped_links
)
def _extract_events(
events: Sequence[Event],
) -> Optional[trace_types.Span.TimeEvents]:
"""Convert span.events to dict."""
if not events:
return None
time_events: List[trace_types.Span.TimeEvent] = []
dropped_annontations = 0
if len(events) > MAX_NUM_EVENTS:
logger.warning(
"Exporting more then %s annotations, some will be truncated",
MAX_NUM_EVENTS,
)
dropped_annontations = len(events) - MAX_NUM_EVENTS
events = events[:MAX_NUM_EVENTS]
for event in events:
if event.attributes and len(event.attributes) > MAX_EVENT_ATTRS:
logger.warning(
"Event %s has more then %s attributes, some will be truncated",
event.name,
MAX_EVENT_ATTRS,
)
time_events.append(
trace_types.Span.TimeEvent(
time=_get_time_from_ns(event.timestamp),
annotation=trace_types.Span.TimeEvent.Annotation(
description=_get_truncatable_str_object(event.name, 256),
attributes=_extract_attributes(
event.attributes, MAX_EVENT_ATTRS
),
),
)
)
return trace_types.Span.TimeEvents(
time_event=time_events,
dropped_annotations_count=dropped_annontations,
dropped_message_events_count=0,
)
# pylint: disable=no-member
SPAN_KIND_MAPPING = {
trace_api.SpanKind.INTERNAL: trace_types.Span.SpanKind.INTERNAL,
trace_api.SpanKind.CLIENT: trace_types.Span.SpanKind.CLIENT,
trace_api.SpanKind.SERVER: trace_types.Span.SpanKind.SERVER,
trace_api.SpanKind.PRODUCER: trace_types.Span.SpanKind.PRODUCER,
trace_api.SpanKind.CONSUMER: trace_types.Span.SpanKind.CONSUMER,
}
# pylint: disable=no-member
def _extract_span_kind(
span_kind: trace_api.SpanKind,
) -> int:
return SPAN_KIND_MAPPING.get(
span_kind, trace_types.Span.SpanKind.SPAN_KIND_UNSPECIFIED
)
def _strip_characters(ot_version):
return "".join(filter(lambda x: x.isdigit() or x == ".", ot_version))
def _extract_resources(
resource: Resource, resource_regex: Optional[Pattern] = None
) -> Dict[str, str]:
extracted_attributes = {}
resource_attributes = resource.attributes
if resource_regex:
extracted_attributes.update(
{
k: str(v)
for k, v in resource_attributes.items()
if resource_regex.match(k)
}
)
monitored_resource = get_monitored_resource(resource)
# Do not map generic_task and generic_node to g.co/r/... span labels.
if monitored_resource and monitored_resource.type not in (
_resource_constants.GENERIC_NODE,
_resource_constants.GENERIC_TASK,
):
extracted_attributes.update(
{
"g.co/r/{}/{}".format(monitored_resource.type, k): v
for k, v in monitored_resource.labels.items()
}
)
return extracted_attributes
LABELS_MAPPING = {
# this one might be http.flavor? I'm not sure
"http.scheme": "/http/client_protocol",
"http.host": "/http/host",
"http.method": "/http/method",
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#common-attributes
"http.request_content_length": "/http/request/size",
"http.response_content_length": "/http/response/size",
"http.route": "/http/route",
"http.status_code": "/http/status_code",
"http.url": "/http/url",
"http.user_agent": "/http/user_agent",
}
def _extract_attributes(
attrs: types.Attributes,
num_attrs_limit: int,
add_agent_attr: bool = False,
) -> trace_types.Span.Attributes:
"""Convert span.attributes to dict."""
attributes_dict: BoundedDict[
str, trace_types.AttributeValue
] = BoundedDict(num_attrs_limit)
invalid_value_dropped_count = 0
for ot_key, ot_value in attrs.items() if attrs else []:
key = _truncate_str(ot_key, MAX_ATTR_KEY_BYTES)[0]
if key in LABELS_MAPPING: # pylint: disable=consider-using-get
key = LABELS_MAPPING[key]
value = _format_attribute_value(ot_value)
if value is not None:
attributes_dict[key] = value
else:
invalid_value_dropped_count += 1
if add_agent_attr:
attributes_dict["g.co/agent"] = _format_attribute_value(
"opentelemetry-python {}; google-cloud-trace-exporter {}".format(
_strip_characters(
pkg_resources.get_distribution("opentelemetry-sdk").version
),
_strip_characters(__version__),
)
)
return trace_types.Span.Attributes(
attribute_map=dict(attributes_dict),
dropped_attributes_count=attributes_dict.dropped
+ invalid_value_dropped_count,
)
@overload
def _format_attribute_value(
value: types.AttributeValue,
) -> trace_types.AttributeValue:
...
@overload
def _format_attribute_value(
value: Any,
) -> Optional[trace_types.AttributeValue]:
...
def _format_attribute_value(
value,
) -> Optional[trace_types.AttributeValue]:
if isinstance(value, bool):
value_type = "bool_value"
elif isinstance(value, int):
value_type = "int_value"
elif isinstance(value, str):
value_type = "string_value"
value = _get_truncatable_str_object(value, MAX_ATTR_VAL_BYTES)
elif isinstance(value, float):
value_type = "string_value"
value = _get_truncatable_str_object(
"{:0.4f}".format(value), MAX_ATTR_VAL_BYTES
)
elif isinstance(value, SequenceABC):
value_type = "string_value"
value = _get_truncatable_str_object(
",".join(str(x) for x in value), MAX_ATTR_VAL_BYTES
)
else:
logger.warning(
"ignoring attribute value %s of type %s. Values type must be one "
"of bool, int, string or float, or a sequence of these",
value,
type(value),
)
return None
return trace_types.AttributeValue(**{value_type: value})