Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add observable instruments to periodicreader tests #2428

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,4 +782,134 @@
"Metrics should be available in exporter."
);
}

async fn some_async_function() -> u64 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@howardjohn If you have bandwidth, could you review these? Asking since you helped fix some bugs in the area previously!

// No dependency on any particular async runtime.
std::thread::sleep(std::time::Duration::from_millis(1));
1
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn async_inside_observable_callback_from_tokio_multi_with_one_worker() {
async_inside_observable_callback_helper();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn async_inside_observable_callback_from_tokio_multi_with_two_worker() {
async_inside_observable_callback_helper();
}

#[tokio::test(flavor = "current_thread")]
async fn async_inside_observable_callback_from_tokio_current_thread() {
async_inside_observable_callback_helper();
}

#[test]
fn async_inside_observable_callback_from_regular_main() {
async_inside_observable_callback_helper();
}

fn async_inside_observable_callback_helper() {
let interval = std::time::Duration::from_millis(10);
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone())
.with_interval(interval)
.build();

let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let _gauge = meter
.u64_observable_gauge("my_observable_gauge")
.with_callback(|observer| {
// using futures_executor::block_on intentionally and avoiding
// any particular async runtime.
let value = futures_executor::block_on(some_async_function());
observer.observe(value, &[]);
})
.build();

meter_provider.force_flush().expect("flush should succeed");
let exported_metrics = exporter
.get_finished_metrics()
.expect("this should not fail");
assert!(
!exported_metrics.is_empty(),
"Metrics should be available in exporter."

Check warning on line 837 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L837

Added line #L837 was not covered by tests
);
}

async fn some_tokio_async_function() -> u64 {
// Tokio specific async function
tokio::time::sleep(Duration::from_millis(1)).await;
1
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]

async fn tokio_async_inside_observable_callback_from_tokio_multi_with_one_worker() {
tokio_async_inside_observable_callback_helper(true);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn tokio_async_inside_observable_callback_from_tokio_multi_with_two_worker() {
tokio_async_inside_observable_callback_helper(true);
}

#[tokio::test(flavor = "current_thread")]
#[ignore] //TODO: Investigate if this can be fixed.
async fn tokio_async_inside_observable_callback_from_tokio_current_thread() {
tokio_async_inside_observable_callback_helper(true);
}

Check warning on line 862 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L860-L862

Added lines #L860 - L862 were not covered by tests

#[test]
fn tokio_async_inside_observable_callback_from_regular_main() {
tokio_async_inside_observable_callback_helper(false);
}

fn tokio_async_inside_observable_callback_helper(use_current_tokio_runtime: bool) {
let interval = std::time::Duration::from_millis(10);
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone())
.with_interval(interval)
.build();

let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");

if use_current_tokio_runtime {
let rt = tokio::runtime::Handle::current().clone();
let _gauge = meter
.u64_observable_gauge("my_observable_gauge")
.with_callback(move |observer| {
// call tokio specific async function from here
let value = rt.block_on(some_tokio_async_function());
observer.observe(value, &[]);
})
.build();
// rt here is a reference to the current tokio runtime.
// Droppng it occurs when the tokio::main itself ends.
} else {
let rt = tokio::runtime::Runtime::new().unwrap();
let _gauge = meter
.u64_observable_gauge("my_observable_gauge")
.with_callback(move |observer| {
// call tokio specific async function from here
let value = rt.block_on(some_tokio_async_function());
observer.observe(value, &[]);
})
.build();
// rt is not dropped here as it is moved to the closure,
// and is dropped only when MeterProvider itself is dropped.
// This works when called from normal main.
};

meter_provider.force_flush().expect("flush should succeed");
let exported_metrics = exporter
.get_finished_metrics()
.expect("this should not fail");
assert!(
!exported_metrics.is_empty(),
"Metrics should be available in exporter."

Check warning on line 912 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L912

Added line #L912 was not covered by tests
);
}
}
Loading