Skip to content

Commit

Permalink
Merge pull request #2932 from itowlson/upgrade-otel
Browse files Browse the repository at this point in the history
Upgrade OTel to stop Tokio being pinned
  • Loading branch information
itowlson authored Nov 21, 2024
2 parents a4bb209 + 12ef3ea commit 8d27d0d
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 84 deletions.
39 changes: 27 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ http = "1"
http-body-util = "0.1"
hyper = { version = "1", features = ["full"] }
itertools = "0.13"
opentelemetry = { version = "0.25", features = ["metrics", "trace", "logs"] }
opentelemetry_sdk = { version = "0.25", features = ["rt-tokio", "logs_level_enabled", "metrics"] }
opentelemetry = { version = "0.27", features = ["metrics", "trace", "logs"] }
opentelemetry_sdk = { version = "0.27", features = ["rt-tokio", "spec_unstable_logs_enabled", "metrics"] }
rand = "0.8"
regex = "1"
reqwest = { version = "0.12", features = ["stream", "blocking"] }
Expand All @@ -144,7 +144,7 @@ thiserror = "1"
tokio = "1"
toml = "0.8"
tracing = { version = "0.1", features = ["log"] }
tracing-opentelemetry = { version = "0.26", default-features = false, features = ["metrics"] }
tracing-opentelemetry = { version = "0.28", default-features = false, features = ["metrics"] }
url = "2"
wasi-common-preview1 = { version = "25.0.0", package = "wasi-common", features = [
"tokio",
Expand Down
3 changes: 2 additions & 1 deletion crates/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ anyhow = { workspace = true }
http0 = { version = "0.2.9", package = "http" }
http1 = { version = "1.0.0", package = "http" }
opentelemetry = { workspace = true }
opentelemetry-otlp = { version = "0.25", features = ["http-proto", "http", "reqwest-client"] }
opentelemetry-appender-tracing = "0.27"
opentelemetry-otlp = { version = "0.27", features = ["http-proto", "http", "reqwest-client"] }
opentelemetry_sdk = { workspace = true }
terminal = { path = "../terminal" }
tracing = { workspace = true }
Expand Down
37 changes: 0 additions & 37 deletions crates/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::cell::Cell;
use std::io::IsTerminal;
use std::time::Duration;
use std::time::Instant;

use env::otel_logs_enabled;
use env::otel_metrics_enabled;
Expand Down Expand Up @@ -67,9 +64,6 @@ pub fn init(spin_version: String) -> anyhow::Result<ShutdownGuard> {
.add_directive("[{app_log_non_utf8}]=off".parse()?),
);

// Even if metrics or tracing aren't enabled we're okay to turn on the global error handler
opentelemetry::global::set_error_handler(otel_error_handler)?;

let otel_tracing_layer = if otel_tracing_enabled() {
Some(traces::otel_tracing_layer(spin_version.clone())?)
} else {
Expand Down Expand Up @@ -100,37 +94,6 @@ pub fn init(spin_version: String) -> anyhow::Result<ShutdownGuard> {
Ok(ShutdownGuard)
}

fn otel_error_handler(err: opentelemetry::global::Error) {
// Track the error count
let signal = match err {
opentelemetry::global::Error::Metric(_) => "metrics",
opentelemetry::global::Error::Trace(_) => "traces",
opentelemetry::global::Error::Log(_) => "logs",
_ => "unknown",
};
metrics::monotonic_counter!(spin.otel_error_count = 1, signal = signal);

// Only log the first error at ERROR level, subsequent errors will be logged at higher levels and rate limited
static FIRST_OTEL_ERROR: std::sync::Once = std::sync::Once::new();
FIRST_OTEL_ERROR.call_once(|| {
tracing::error!(?err, "OpenTelemetry error");
tracing::error!("There has been an error with the OpenTelemetry system. Traces, logs or metrics are likely failing to export.");
tracing::error!("Further OpenTelemetry errors will be available at WARN level (rate limited) or at TRACE level.");
});

// Rate limit the logging of the OTel errors to not occur more frequently on each thread than OTEL_ERROR_INTERVAL
const OTEL_ERROR_INTERVAL: Duration = Duration::from_millis(5000);
thread_local! {
static LAST_OTEL_ERROR: Cell<Instant> = Cell::new(Instant::now() - OTEL_ERROR_INTERVAL);
}
if LAST_OTEL_ERROR.get().elapsed() > OTEL_ERROR_INTERVAL {
LAST_OTEL_ERROR.set(Instant::now());
tracing::warn!(?err, "OpenTelemetry error");
} else {
tracing::trace!(?err, "OpenTelemetry error");
}
}

/// An RAII implementation for connection to open telemetry services.
///
/// Shutdown of the open telemetry services will happen on `Drop`.
Expand Down
20 changes: 10 additions & 10 deletions crates/telemetry/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{ascii::escape_default, sync::OnceLock, time::Duration};

use anyhow::bail;
use opentelemetry::logs::{LogRecord, Logger, LoggerProvider};
use opentelemetry_otlp::LogExporterBuilder;
use opentelemetry_sdk::{
logs::{BatchConfigBuilder, BatchLogProcessor, Logger as SdkLogger},
resource::{EnvResourceDetector, TelemetryResourceDetector},
Expand Down Expand Up @@ -86,21 +85,22 @@ pub(crate) fn init_otel_logging_backend(spin_version: String) -> anyhow::Result<
// currently default to using the HTTP exporter but in the future we could select off of the
// combination of OTEL_EXPORTER_OTLP_PROTOCOL and OTEL_EXPORTER_OTLP_LOGS_PROTOCOL to
// determine whether we should use http/protobuf or grpc.
let exporter_builder: LogExporterBuilder = match OtlpProtocol::logs_protocol_from_env() {
OtlpProtocol::Grpc => opentelemetry_otlp::new_exporter().tonic().into(),
OtlpProtocol::HttpProtobuf => opentelemetry_otlp::new_exporter().http().into(),
let exporter = match OtlpProtocol::logs_protocol_from_env() {
OtlpProtocol::Grpc => opentelemetry_otlp::LogExporter::builder()
.with_tonic()
.build()?,
OtlpProtocol::HttpProtobuf => opentelemetry_otlp::LogExporter::builder()
.with_http()
.build()?,
OtlpProtocol::HttpJson => bail!("http/json OTLP protocol is not supported"),
};

let provider = opentelemetry_sdk::logs::LoggerProvider::builder()
.with_resource(resource)
.with_log_processor(
BatchLogProcessor::builder(
exporter_builder.build_log_exporter()?,
opentelemetry_sdk::runtime::Tokio,
)
.with_batch_config(BatchConfigBuilder::default().build())
.build(),
BatchLogProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio)
.with_batch_config(BatchConfigBuilder::default().build())
.build(),
)
.build();

Expand Down
20 changes: 8 additions & 12 deletions crates/telemetry/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@ use std::time::Duration;

use anyhow::{bail, Result};
use opentelemetry::global;
use opentelemetry_otlp::MetricsExporterBuilder;
use opentelemetry_sdk::{
metrics::{
reader::{DefaultAggregationSelector, DefaultTemporalitySelector},
PeriodicReader, SdkMeterProvider,
},
metrics::{PeriodicReader, SdkMeterProvider},
resource::{EnvResourceDetector, TelemetryResourceDetector},
runtime, Resource,
};
Expand Down Expand Up @@ -42,15 +38,15 @@ pub(crate) fn otel_metrics_layer<S: Subscriber + for<'span> LookupSpan<'span>>(
// currently default to using the HTTP exporter but in the future we could select off of the
// combination of OTEL_EXPORTER_OTLP_PROTOCOL and OTEL_EXPORTER_OTLP_TRACES_PROTOCOL to
// determine whether we should use http/protobuf or grpc.
let exporter_builder: MetricsExporterBuilder = match OtlpProtocol::metrics_protocol_from_env() {
OtlpProtocol::Grpc => opentelemetry_otlp::new_exporter().tonic().into(),
OtlpProtocol::HttpProtobuf => opentelemetry_otlp::new_exporter().http().into(),
let exporter = match OtlpProtocol::metrics_protocol_from_env() {
OtlpProtocol::Grpc => opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.build()?,
OtlpProtocol::HttpProtobuf => opentelemetry_otlp::MetricExporter::builder()
.with_http()
.build()?,
OtlpProtocol::HttpJson => bail!("http/json OTLP protocol is not supported"),
};
let exporter = exporter_builder.build_metrics_exporter(
Box::new(DefaultTemporalitySelector::new()),
Box::new(DefaultAggregationSelector::new()),
)?;

let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
let meter_provider = SdkMeterProvider::builder()
Expand Down
26 changes: 17 additions & 9 deletions crates/telemetry/src/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::time::Duration;

use anyhow::bail;
use opentelemetry::{global, trace::TracerProvider};
use opentelemetry_otlp::SpanExporterBuilder;
use opentelemetry_sdk::{
resource::{EnvResourceDetector, TelemetryResourceDetector},
Resource,
Expand Down Expand Up @@ -35,17 +34,26 @@ pub(crate) fn otel_tracing_layer<S: Subscriber + for<'span> LookupSpan<'span>>(
);

// This will configure the exporter based on the OTEL_EXPORTER_* environment variables.
let exporter_builder: SpanExporterBuilder = match OtlpProtocol::traces_protocol_from_env() {
OtlpProtocol::Grpc => opentelemetry_otlp::new_exporter().tonic().into(),
OtlpProtocol::HttpProtobuf => opentelemetry_otlp::new_exporter().http().into(),
let exporter = match OtlpProtocol::traces_protocol_from_env() {
OtlpProtocol::Grpc => opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.build()?,
OtlpProtocol::HttpProtobuf => opentelemetry_otlp::SpanExporter::builder()
.with_http()
.build()?,
OtlpProtocol::HttpJson => bail!("http/json OTLP protocol is not supported"),
};

let tracer_provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter_builder)
.with_trace_config(opentelemetry_sdk::trace::Config::default().with_resource(resource))
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
let span_processor = opentelemetry_sdk::trace::BatchSpanProcessor::builder(
exporter,
opentelemetry_sdk::runtime::Tokio,
)
.build();

let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
.with_config(opentelemetry_sdk::trace::Config::default().with_resource(resource))
.with_span_processor(span_processor)
.build();

global::set_tracer_provider(tracer_provider.clone());

Expand Down

0 comments on commit 8d27d0d

Please sign in to comment.