From d482c08f561926c19ff98173cc1ab923786eea11 Mon Sep 17 00:00:00 2001 From: Julio Date: Tue, 23 Jun 2026 11:40:00 +0200 Subject: [PATCH 1/2] refactor(libdd-telemetry)!: avoid leaking libdd-common types in the public API. --- datadog-sidecar/src/service/sidecar_server.rs | 7 +- examples/ffi/telemetry.c | 5 +- examples/ffi/telemetry_metrics.c | 5 +- .../src/crash_info/telemetry.rs | 7 +- libdd-data-pipeline/src/telemetry/mod.rs | 4 +- libdd-telemetry-ffi/src/builder.rs | 93 ++++++++++++++++--- libdd-telemetry-ffi/src/lib.rs | 28 ++++-- .../examples/tm-metrics-worker-test.rs | 4 +- libdd-telemetry/examples/tm-send-sketch.rs | 12 +-- libdd-telemetry/examples/tm-worker-test.rs | 5 +- libdd-telemetry/src/config.rs | 69 ++++++++++---- libdd-telemetry/src/worker/mod.rs | 16 +--- 12 files changed, 173 insertions(+), 82 deletions(-) diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 7a5e095d13..cd86f3b565 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -709,7 +709,12 @@ impl SidecarInterface for ConnectionSidecarHandler { libdd_telemetry::config::PROD_INTAKE_SUBDOMAIN, &config.endpoint, ); - cfg.set_endpoint(endpoint).ok(); + // Set the api key before the uri so the telemetry path is resolved correctly. + cfg.set_endpoint_api_key(endpoint.api_key.as_deref()).ok(); + cfg.set_endpoint_uri(endpoint.url.clone()).ok(); + cfg.set_endpoint_timeout_ms(endpoint.timeout_ms); + cfg.set_endpoint_test_token(endpoint.test_token.clone()); + cfg.set_endpoint_use_system_resolver(endpoint.use_system_resolver); cfg.telemetry_heartbeat_interval = config.telemetry_heartbeat_interval; cfg.telemetry_extended_heartbeat_interval = config.telemetry_extended_heartbeat_interval; diff --git a/examples/ffi/telemetry.c b/examples/ffi/telemetry.c index 2987381fc6..b9dff330b6 100644 --- a/examples/ffi/telemetry.c +++ b/examples/ffi/telemetry.c @@ -23,9 +23,8 @@ int main(void) { TRY(ddog_telemetry_builder_instantiate(&builder, service, lang, lang_version, tracer_version)); ddog_CharSlice endpoint_char = DDOG_CHARSLICE_C("file://./examples_telemetry.out"); - struct ddog_Endpoint *endpoint = ddog_endpoint_from_url(endpoint_char); - TRY(ddog_telemetry_builder_with_endpoint_config_endpoint(builder, endpoint)); - ddog_endpoint_drop(endpoint); + TRY(ddog_telemetry_builder_with_endpoint_config_endpoint( + builder, endpoint_char, DDOG_CHARSLICE_C(""), 0, DDOG_CHARSLICE_C(""), false)); ddog_CharSlice runtime_id = DDOG_CHARSLICE_C("fa1f0ed0-8a3a-49e8-8f23-46fb44e24579"), service_version = DDOG_CHARSLICE_C("1.0"), env = DDOG_CHARSLICE_C("test"); diff --git a/examples/ffi/telemetry_metrics.c b/examples/ffi/telemetry_metrics.c index 8b1138f8d9..1d86f7ff2e 100644 --- a/examples/ffi/telemetry_metrics.c +++ b/examples/ffi/telemetry_metrics.c @@ -49,9 +49,8 @@ int main(void) { TRY(ddog_telemetry_builder_instantiate(&builder, service, lang, lang_version, tracer_version)); ddog_CharSlice endpoint_char = DDOG_CHARSLICE_C("file://./examples_telemetry_metrics.out"); - struct ddog_Endpoint *endpoint = ddog_endpoint_from_url(endpoint_char); - TRY(ddog_telemetry_builder_with_endpoint_config_endpoint(builder, endpoint)); - ddog_endpoint_drop(endpoint); + TRY(ddog_telemetry_builder_with_endpoint_config_endpoint( + builder, endpoint_char, DDOG_CHARSLICE_C(""), 0, DDOG_CHARSLICE_C(""), false)); ddog_CharSlice runtime_id = DDOG_CHARSLICE_C("fa1f0ed0-8a3a-49e8-8f23-46fb44e24579"), service_version = DDOG_CHARSLICE_C("1.0"), env = DDOG_CHARSLICE_C("test"); diff --git a/libdd-crashtracker/src/crash_info/telemetry.rs b/libdd-crashtracker/src/crash_info/telemetry.rs index e4c369f06a..a43b31a745 100644 --- a/libdd-crashtracker/src/crash_info/telemetry.rs +++ b/libdd-crashtracker/src/crash_info/telemetry.rs @@ -202,7 +202,12 @@ impl TelemetryCrashUploader { .context("file path is not valid")?; cfg.set_host_from_url(&format!("file://{}.telemetry", path.display())) } else { - cfg.set_endpoint(endpoint.clone()) + cfg.set_endpoint_timeout_ms(endpoint.timeout_ms); + cfg.set_endpoint_test_token(endpoint.test_token.clone()); + cfg.set_endpoint_use_system_resolver(endpoint.use_system_resolver); + // Set the api key before the uri so the telemetry path is resolved correctly. + cfg.set_endpoint_api_key(endpoint.api_key.as_deref()) + .and_then(|()| cfg.set_endpoint_uri(endpoint.url.clone())) }; } diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index 3ea0ea2096..cd2b9e05fd 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -72,9 +72,7 @@ impl TelemetryClientBuilder { /// Sets the url where the metrics will be sent. pub fn set_url(mut self, url: &str) -> Self { - let _ = self - .config - .set_endpoint(libdd_common::Endpoint::from_slice(url)); + let _ = self.config.set_endpoint_url(url); self } diff --git a/libdd-telemetry-ffi/src/builder.rs b/libdd-telemetry-ffi/src/builder.rs index 78216e1848..df76c73e93 100644 --- a/libdd-telemetry-ffi/src/builder.rs +++ b/libdd-telemetry-ffi/src/builder.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use ffi::slice::AsBytes; -use libdd_common::Endpoint; use libdd_common_ffi as ffi; use libdd_telemetry::{ data, @@ -155,14 +154,60 @@ pub unsafe extern "C" fn ddog_telemetry_builder_run_metric_logs( MaybeError::None } +/// Applies endpoint settings to the builder's telemetry config from primitive +/// values, so `libdd_common::Endpoint` stays out of this crate's public API. +/// +/// `api_key` and `test_token` are treated as unset when empty; a `timeout_ms` of +/// 0 keeps the existing/default timeout. +fn set_builder_endpoint( + telemetry_builder: &mut TelemetryWorkerBuilder, + url: ffi::CharSlice, + api_key: ffi::CharSlice, + timeout_ms: u64, + test_token: ffi::CharSlice, + use_system_resolver: bool, +) -> ffi::MaybeError { + let url = try_c!(url.try_to_utf8()); + let api_key = api_key.to_utf8_lossy(); + let test_token = test_token.to_utf8_lossy(); + let config = &mut telemetry_builder.config; + // Set the api key before the url so the telemetry path is resolved correctly. + if !api_key.is_empty() { + try_c!(config.set_endpoint_api_key(Some(api_key.as_ref()))); + } + try_c!(config.set_endpoint_url(url)); + if timeout_ms != 0 { + config.set_endpoint_timeout_ms(timeout_ms); + } + if !test_token.is_empty() { + config.set_endpoint_test_token(Some(test_token.into_owned())); + } + config.set_endpoint_use_system_resolver(use_system_resolver); + ffi::MaybeError::None +} + #[no_mangle] #[allow(clippy::missing_safety_doc)] +/// Sets the telemetry endpoint from its component parts. +/// +/// * `api_key` / `test_token`: ignored when empty. +/// * `timeout_ms`: pass 0 to keep the existing/default timeout. pub unsafe extern "C" fn ddog_telemetry_builder_with_endpoint_config_endpoint( telemetry_builder: &mut TelemetryWorkerBuilder, - endpoint: &Endpoint, + url: ffi::CharSlice, + api_key: ffi::CharSlice, + timeout_ms: u64, + test_token: ffi::CharSlice, + use_system_resolver: bool, ) -> ffi::MaybeError { - try_c!(telemetry_builder.config.set_endpoint(endpoint.clone())); - ffi::MaybeError::None + set_builder_endpoint( + telemetry_builder, + url, + api_key, + timeout_ms, + test_token, + use_system_resolver, + ) } #[repr(C)] #[allow(dead_code)] @@ -172,7 +217,7 @@ pub enum TelemetryWorkerBuilderEndpointProperty { #[no_mangle] #[allow(clippy::missing_safety_doc)] -/// Sets a property from it's string value. +/// Sets the endpoint property from its component parts. /// /// Available properties: /// @@ -180,14 +225,24 @@ pub enum TelemetryWorkerBuilderEndpointProperty { pub unsafe extern "C" fn ddog_telemetry_builder_with_property_endpoint( telemetry_builder: &mut TelemetryWorkerBuilder, _property: TelemetryWorkerBuilderEndpointProperty, - endpoint: &Endpoint, + url: ffi::CharSlice, + api_key: ffi::CharSlice, + timeout_ms: u64, + test_token: ffi::CharSlice, + use_system_resolver: bool, ) -> ffi::MaybeError { - try_c!(telemetry_builder.config.set_endpoint(endpoint.clone())); - ffi::MaybeError::None + set_builder_endpoint( + telemetry_builder, + url, + api_key, + timeout_ms, + test_token, + use_system_resolver, + ) } #[no_mangle] #[allow(clippy::missing_safety_doc)] -/// Sets a property from it's string value. +/// Sets a named endpoint property from its component parts. /// /// Available properties: /// @@ -195,15 +250,23 @@ pub unsafe extern "C" fn ddog_telemetry_builder_with_property_endpoint( pub unsafe extern "C" fn ddog_telemetry_builder_with_endpoint_named_property( telemetry_builder: &mut TelemetryWorkerBuilder, property: ffi::CharSlice, - endpoint: &Endpoint, + url: ffi::CharSlice, + api_key: ffi::CharSlice, + timeout_ms: u64, + test_token: ffi::CharSlice, + use_system_resolver: bool, ) -> ffi::MaybeError { let property = try_c!(property.try_to_utf8()); match property { - "config . endpoint" => { - try_c!(telemetry_builder.config.set_endpoint(endpoint.clone())); - } - _ => return ffi::MaybeError::None, + "config . endpoint" => set_builder_endpoint( + telemetry_builder, + url, + api_key, + timeout_ms, + test_token, + use_system_resolver, + ), + _ => ffi::MaybeError::None, } - ffi::MaybeError::None } diff --git a/libdd-telemetry-ffi/src/lib.rs b/libdd-telemetry-ffi/src/lib.rs index c9ea548c4d..3f0bda6a2f 100644 --- a/libdd-telemetry-ffi/src/lib.rs +++ b/libdd-telemetry-ffi/src/lib.rs @@ -111,7 +111,6 @@ mod tests { use crate::{builder::*, worker_handle::*}; use ffi::tags::{ddog_Vec_Tag_new, ddog_Vec_Tag_push, PushTagResult}; use ffi::MaybeError; - use libdd_common::Endpoint; use libdd_common_ffi as ffi; use libdd_telemetry::{ data::{ @@ -137,9 +136,14 @@ mod tests { let mut builder = builder.assume_init(); let f = tempfile::NamedTempFile::new().unwrap(); + let url = format!("file://{}", f.path().to_str().unwrap()); ddog_telemetry_builder_with_endpoint_config_endpoint( &mut builder, - &Endpoint::from_slice(&format!("file://{}", f.path().to_str().unwrap())), + ffi::CharSlice::from(url.as_str()), + ffi::CharSlice::from(""), + 0, + ffi::CharSlice::from(""), + false, ) .unwrap_none(); @@ -306,13 +310,15 @@ mod tests { let mut builder = builder.assume_init(); let f = tempfile::NamedTempFile::new().unwrap(); + let url = format!("file://{}", f.path().as_os_str().to_str().unwrap()); assert_eq!( ddog_telemetry_builder_with_endpoint_config_endpoint( &mut builder, - &Endpoint::from_slice(&format!( - "file://{}", - f.path().as_os_str().to_str().unwrap() - )), + ffi::CharSlice::from(url.as_str()), + ffi::CharSlice::from(""), + 0, + ffi::CharSlice::from(""), + false, ), MaybeError::None ); @@ -351,13 +357,15 @@ mod tests { let mut builder = builder.assume_init(); let f = tempfile::NamedTempFile::new().unwrap(); + let url = format!("file://{}", f.path().as_os_str().to_str().unwrap()); assert_eq!( ddog_telemetry_builder_with_endpoint_config_endpoint( &mut builder, - &Endpoint::from_slice(&format!( - "file://{}", - f.path().as_os_str().to_str().unwrap() - )), + ffi::CharSlice::from(url.as_str()), + ffi::CharSlice::from(""), + 0, + ffi::CharSlice::from(""), + false, ), MaybeError::None ); diff --git a/libdd-telemetry/examples/tm-metrics-worker-test.rs b/libdd-telemetry/examples/tm-metrics-worker-test.rs index 8dff49964a..0a9d420a02 100644 --- a/libdd-telemetry/examples/tm-metrics-worker-test.rs +++ b/libdd-telemetry/examples/tm-metrics-worker-test.rs @@ -36,9 +36,7 @@ fn main() -> Result<(), Box> { builder.config.telemetry_debug_logging_enabled = true; builder .config - .set_endpoint(libdd_common::Endpoint::from_slice( - "file://./tm-metrics-worker-test.output", - )) + .set_endpoint_url("file://./tm-metrics-worker-test.output") .unwrap(); builder.config.telemetry_heartbeat_interval = Duration::from_secs(1); builder.config.debug_enabled = true; diff --git a/libdd-telemetry/examples/tm-send-sketch.rs b/libdd-telemetry/examples/tm-send-sketch.rs index f977a1dd22..ca78ce5ea5 100644 --- a/libdd-telemetry/examples/tm-send-sketch.rs +++ b/libdd-telemetry/examples/tm-send-sketch.rs @@ -4,13 +4,11 @@ // Datadog, Inc. use std::{ - borrow::Cow, sync::atomic::{AtomicU64, Ordering}, time::SystemTime, }; -use http::{header::CONTENT_TYPE, Uri}; -use libdd_common::Endpoint; +use http::header::CONTENT_TYPE; use libdd_telemetry::{ build_host, config::Config, @@ -111,12 +109,10 @@ async fn async_main() { let mut config = Config::from_env(); config.direct_submission_enabled = true; config.debug_enabled = true; + let api_key = std::env::var("DD_API_KEY").unwrap(); + config.set_endpoint_api_key(Some(&api_key)).unwrap(); config - .set_endpoint(Endpoint { - url: Uri::from_static("https://instrumentation-telemetry-intake.datad0g.com"), - api_key: Some(Cow::Owned(std::env::var("DD_API_KEY").unwrap())), - ..Default::default() - }) + .set_endpoint_url("https://instrumentation-telemetry-intake.datad0g.com") .unwrap(); push_telemetry(&config, &req).await.unwrap(); } diff --git a/libdd-telemetry/examples/tm-worker-test.rs b/libdd-telemetry/examples/tm-worker-test.rs index f60ef46218..7cbd012d69 100644 --- a/libdd-telemetry/examples/tm-worker-test.rs +++ b/libdd-telemetry/examples/tm-worker-test.rs @@ -39,10 +39,7 @@ fn main() -> Result<(), Box> { builder.config = libdd_telemetry::config::Config::from_env(); builder .config - .set_endpoint(libdd_common::Endpoint { - url: libdd_common::parse_uri("file://./tm-worker-test.output").unwrap(), - ..Default::default() - }) + .set_endpoint_url("file://./tm-worker-test.output") .unwrap(); builder.config.telemetry_heartbeat_interval = Duration::from_secs(1); diff --git a/libdd-telemetry/src/config.rs b/libdd-telemetry/src/config.rs index 1ecdf1c512..68ffbaea11 100644 --- a/libdd-telemetry/src/config.rs +++ b/libdd-telemetry/src/config.rs @@ -236,17 +236,58 @@ impl Config { self.endpoint.as_ref() } - pub fn set_endpoint(&mut self, endpoint: Endpoint) -> anyhow::Result<()> { - self.endpoint = Some(endpoint_with_telemetry_path( - endpoint, - self.direct_submission_enabled, - )?); + /// Rewrites the endpoint path to the telemetry path appropriate for the + /// current scheme, API key and direct-submission setting. Called by the + /// setters that can affect it (URL and API key). + fn apply_telemetry_path(&mut self) -> anyhow::Result<()> { + if let Some(endpoint) = self.endpoint.take() { + self.endpoint = Some(endpoint_with_telemetry_path( + endpoint, + self.direct_submission_enabled, + )?); + } Ok(()) } + /// Sets the endpoint URL from a string (`http(s)://`, `unix://`, `windows:` + /// or `file://`). Use [`Config::set_endpoint_uri`] if you already hold a + /// parsed [`Uri`] to avoid re-parsing. + pub fn set_endpoint_url(&mut self, url: &str) -> anyhow::Result<()> { + self.set_endpoint_uri(parse_uri(url)?) + } + + /// Sets the endpoint URL from an already-parsed [`Uri`]. + pub fn set_endpoint_uri(&mut self, uri: Uri) -> anyhow::Result<()> { + self.endpoint.get_or_insert_with(Endpoint::default).url = uri; + self.apply_telemetry_path() + } + + /// Sets (or, with `None`, clears) the endpoint API key. + pub fn set_endpoint_api_key(&mut self, api_key: Option<&str>) -> anyhow::Result<()> { + self.endpoint.get_or_insert_with(Endpoint::default).api_key = + api_key.map(|key| Cow::Owned(key.to_string())); + self.apply_telemetry_path() + } + + /// Sets the endpoint request timeout in milliseconds. + pub fn set_endpoint_timeout_ms(&mut self, timeout_ms: u64) { + if let Some(endpoint) = &mut self.endpoint { + endpoint.timeout_ms = timeout_ms; + } + } + + /// Sets (or, with `None`, clears) the `X-Datadog-Test-Session-Token` header + /// sent with requests. pub fn set_endpoint_test_token>>(&mut self, test_token: Option) { if let Some(endpoint) = &mut self.endpoint { - endpoint.test_token = test_token.map(|t| t.into()); + endpoint.test_token = test_token.map(|token| token.into()); + } + } + + /// Sets whether to use the system DNS resolver when building the HTTP client. + pub fn set_endpoint_use_system_resolver(&mut self, use_system_resolver: bool) { + if let Some(endpoint) = &mut self.endpoint { + endpoint.use_system_resolver = use_system_resolver; } } @@ -266,14 +307,9 @@ impl Config { parent_session_id: None, root_session_id: None, }; - if let Ok(url) = parse_uri(&trace_agent_url) { - let _res = this.set_endpoint(Endpoint { - url, - api_key, - ..Default::default() - }); - } + _ = this.set_endpoint_api_key(api_key.as_deref()); + _ = this.set_endpoint_url(&trace_agent_url); this } @@ -294,12 +330,7 @@ impl Config { /// If the host_url is http/https, any path will be ignored and replaced by the /// appropriate telemetry endpoint path pub fn set_host_from_url(&mut self, host_url: &str) -> anyhow::Result<()> { - let endpoint = self.endpoint.take().unwrap_or_default(); - - self.set_endpoint(Endpoint { - url: parse_uri(host_url)?, - ..endpoint - }) + self.set_endpoint_url(host_url) } } diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index 675ddad877..f14f471916 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -1307,7 +1307,7 @@ mod tests { LifecycleAction, TelemetryActions, TelemetryWorker, TelemetryWorkerBuilder, TelemetryWorkerFlavor, TelemetryWorkerHandle, }; - use libdd_common::{http_common, Endpoint}; + use libdd_common::http_common; use tokio::runtime::Runtime; fn is_send(_: T) {} @@ -1333,9 +1333,7 @@ mod tests { "1".into(), "tv".into(), ); - b.config - .set_endpoint(Endpoint::from_slice("http://127.0.0.1:1")) - .unwrap(); + b.config.set_endpoint_url("http://127.0.0.1:1").unwrap(); b.runtime_id = Some("rid".into()); b.config.session_id = session_id; b.config.parent_session_id = parent_session_id; @@ -1467,9 +1465,7 @@ mod tests { "1".into(), "tv".into(), ); - b.config - .set_endpoint(Endpoint::from_slice("http://127.0.0.1:1")) - .unwrap(); + b.config.set_endpoint_url("http://127.0.0.1:1").unwrap(); b.runtime_id = Some("rid".into()); b.flavor = flavor; b.build_worker(Some(tokio::runtime::Handle::current())).1 @@ -1795,7 +1791,6 @@ mod tests { #[test] fn test_channel_close_flushes_and_parks_via_shared_runtime() { use httpmock::prelude::*; - use libdd_common::Endpoint; use libdd_shared_runtime::SharedRuntime; use std::time::Duration; @@ -1814,10 +1809,7 @@ mod tests { "1".into(), "tv".into(), ); - builder - .config - .set_endpoint(Endpoint::from_slice(&server.url("/"))) - .unwrap(); + builder.config.set_endpoint_url(&server.url("/")).unwrap(); builder.runtime_id = Some("rid".into()); let shared_runtime = SharedRuntime::new().expect("SharedRuntime::new"); From dd135ff812e8c86af23eade11b0d18959bd2b7a1 Mon Sep 17 00:00:00 2001 From: Julio Date: Fri, 26 Jun 2026 12:23:26 +0200 Subject: [PATCH 2/2] chore: pack properties in a struct rather than having multiple methods --- datadog-sidecar/src/service/sidecar_server.rs | 16 ++- .../src/crash_info/telemetry.rs | 57 ++++++--- libdd-data-pipeline/src/telemetry/mod.rs | 7 +- libdd-telemetry-ffi/src/builder.rs | 104 +++++++++------ .../examples/tm-metrics-worker-test.rs | 5 +- libdd-telemetry/examples/tm-send-sketch.rs | 7 +- libdd-telemetry/examples/tm-worker-test.rs | 5 +- libdd-telemetry/src/config.rs | 121 ++++++++++-------- libdd-telemetry/src/worker/mod.rs | 23 +++- 9 files changed, 219 insertions(+), 126 deletions(-) diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index cd86f3b565..204f0fd621 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -50,7 +50,7 @@ use libdd_capabilities_impl::NativeCapabilities; use libdd_common::tag::Tag; use libdd_dogstatsd_client::{new, DogStatsDActionOwned}; use libdd_remote_config::fetch::{ConfigInvariants, ConfigOptions, MultiTargetStats}; -use libdd_telemetry::config::Config; +use libdd_telemetry::config::{Config, TelemetryEndpoint}; use libdd_tinybytes as tinybytes; use libdd_trace_utils::tracer_header_tags::TracerHeaderTags; @@ -709,12 +709,14 @@ impl SidecarInterface for ConnectionSidecarHandler { libdd_telemetry::config::PROD_INTAKE_SUBDOMAIN, &config.endpoint, ); - // Set the api key before the uri so the telemetry path is resolved correctly. - cfg.set_endpoint_api_key(endpoint.api_key.as_deref()).ok(); - cfg.set_endpoint_uri(endpoint.url.clone()).ok(); - cfg.set_endpoint_timeout_ms(endpoint.timeout_ms); - cfg.set_endpoint_test_token(endpoint.test_token.clone()); - cfg.set_endpoint_use_system_resolver(endpoint.use_system_resolver); + cfg.set_endpoint(TelemetryEndpoint { + url: Some(endpoint.url.to_string()), + api_key: endpoint.api_key.as_deref().map(str::to_owned), + test_token: endpoint.test_token.as_deref().map(str::to_owned), + timeout_ms: endpoint.timeout_ms, + use_system_resolver: endpoint.use_system_resolver, + }) + .ok(); cfg.telemetry_heartbeat_interval = config.telemetry_heartbeat_interval; cfg.telemetry_extended_heartbeat_interval = config.telemetry_extended_heartbeat_interval; diff --git a/libdd-crashtracker/src/crash_info/telemetry.rs b/libdd-crashtracker/src/crash_info/telemetry.rs index a43b31a745..3d92b6efa8 100644 --- a/libdd-crashtracker/src/crash_info/telemetry.rs +++ b/libdd-crashtracker/src/crash_info/telemetry.rs @@ -197,18 +197,23 @@ impl TelemetryCrashUploader { // But do we want to support direct submission to the intake? // ignore result because what are we going to do? - let _ = if endpoint.url.scheme_str() == Some("file") { + let telemetry_endpoint = if endpoint.url.scheme_str() == Some("file") { let path = libdd_common::decode_uri_path_in_authority(&endpoint.url) .context("file path is not valid")?; - cfg.set_host_from_url(&format!("file://{}.telemetry", path.display())) + libdd_telemetry::config::TelemetryEndpoint { + url: Some(format!("file://{}.telemetry", path.display())), + ..Default::default() + } } else { - cfg.set_endpoint_timeout_ms(endpoint.timeout_ms); - cfg.set_endpoint_test_token(endpoint.test_token.clone()); - cfg.set_endpoint_use_system_resolver(endpoint.use_system_resolver); - // Set the api key before the uri so the telemetry path is resolved correctly. - cfg.set_endpoint_api_key(endpoint.api_key.as_deref()) - .and_then(|()| cfg.set_endpoint_uri(endpoint.url.clone())) + libdd_telemetry::config::TelemetryEndpoint { + url: Some(endpoint.url.to_string()), + api_key: endpoint.api_key.as_deref().map(str::to_owned), + test_token: endpoint.test_token.as_deref().map(str::to_owned), + timeout_ms: endpoint.timeout_ms, + use_system_resolver: endpoint.use_system_resolver, + } }; + let _ = cfg.set_endpoint(telemetry_endpoint); } parse_tags!( @@ -524,7 +529,10 @@ mod tests { new_test_uploader_with_process_tags(seed, "entrypoint.name:cli,entrypoint.type:script"); t.cfg - .set_host_from_url(&format!("file://{}", output_filename.to_str().unwrap())) + .set_endpoint(libdd_telemetry::config::TelemetryEndpoint { + url: Some(format!("file://{}", output_filename.to_str().unwrap())), + ..Default::default() + }) .unwrap(); let test_instance = super::CrashInfo::test_instance(seed); @@ -590,7 +598,10 @@ mod tests { let mut t = new_test_uploader(seed); t.cfg - .set_host_from_url(&format!("file://{}", output_filename.to_str().unwrap())) + .set_endpoint(libdd_telemetry::config::TelemetryEndpoint { + url: Some(format!("file://{}", output_filename.to_str().unwrap())), + ..Default::default() + }) .unwrap(); let sig_info = crate::SigInfo::test_instance(42); @@ -666,7 +677,10 @@ mod tests { let mut t = new_test_uploader(seed); t.cfg - .set_host_from_url(&format!("file://{}", output_filename.to_str().unwrap())) + .set_endpoint(libdd_telemetry::config::TelemetryEndpoint { + url: Some(format!("file://{}", output_filename.to_str().unwrap())), + ..Default::default() + }) .unwrap(); let sig_info = crate::SigInfo::test_instance(123); @@ -776,10 +790,13 @@ mod tests { let mut uploader = TelemetryCrashUploader::new(&metadata, &endpoint)?; uploader .cfg - .set_host_from_url(&format!( - "file://{}.telemetry", - output_filename.to_str().unwrap() - )) + .set_endpoint(libdd_telemetry::config::TelemetryEndpoint { + url: Some(format!( + "file://{}.telemetry", + output_filename.to_str().unwrap() + )), + ..Default::default() + }) .unwrap(); uploader.upload_crash_ping(&crash_ping).await?; @@ -936,7 +953,10 @@ mod tests { uploader .cfg - .set_host_from_url(&format!("file://{}", output_filename.to_str().unwrap())) + .set_endpoint(libdd_telemetry::config::TelemetryEndpoint { + url: Some(format!("file://{}", output_filename.to_str().unwrap())), + ..Default::default() + }) .unwrap(); let sig_info = crate::SigInfo::test_instance(150); @@ -1011,7 +1031,10 @@ mod tests { let mut uploader = new_test_uploader(7); uploader .cfg - .set_host_from_url(&format!("file://{}", output_filename.to_str().unwrap()))?; + .set_endpoint(libdd_telemetry::config::TelemetryEndpoint { + url: Some(format!("file://{}", output_filename.to_str().unwrap())), + ..Default::default() + })?; uploader .upload_general_log( diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index cd2b9e05fd..fda45aa451 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -72,7 +72,12 @@ impl TelemetryClientBuilder { /// Sets the url where the metrics will be sent. pub fn set_url(mut self, url: &str) -> Self { - let _ = self.config.set_endpoint_url(url); + let _ = self + .config + .set_endpoint(libdd_telemetry::config::TelemetryEndpoint { + url: Some(url.to_owned()), + ..Default::default() + }); self } diff --git a/libdd-telemetry-ffi/src/builder.rs b/libdd-telemetry-ffi/src/builder.rs index df76c73e93..d8d132b978 100644 --- a/libdd-telemetry-ffi/src/builder.rs +++ b/libdd-telemetry-ffi/src/builder.rs @@ -4,7 +4,7 @@ use ffi::slice::AsBytes; use libdd_common_ffi as ffi; use libdd_telemetry::{ - data, + config, data, worker::{TelemetryWorkerBuilder, TelemetryWorkerFlavor, TelemetryWorkerHandle}, }; use std::ptr::NonNull; @@ -154,35 +154,51 @@ pub unsafe extern "C" fn ddog_telemetry_builder_run_metric_logs( MaybeError::None } -/// Applies endpoint settings to the builder's telemetry config from primitive -/// values, so `libdd_common::Endpoint` stays out of this crate's public API. +/// C-facing companion to [`libdd_telemetry::config::TelemetryEndpoint`]: the same +/// shape, but with caller-owned [`ffi::CharSlice`] strings instead of `String`s, +/// so `libdd_common::Endpoint` stays out of this crate's public API. /// -/// `api_key` and `test_token` are treated as unset when empty; a `timeout_ms` of -/// 0 keeps the existing/default timeout. +/// Empty `url`/`api_key`/`test_token` slices are treated as unset (leave the +/// existing value unchanged); a `timeout_ms` of 0 keeps the existing/default +/// timeout. `use_system_resolver` is always applied. +#[repr(C)] +pub struct TelemetryEndpoint<'a> { + pub url: ffi::CharSlice<'a>, + pub api_key: ffi::CharSlice<'a>, + pub timeout_ms: u64, + pub test_token: ffi::CharSlice<'a>, + pub use_system_resolver: bool, +} + +impl TelemetryEndpoint<'_> { + /// Copies the caller-owned slices into an owned, `'static` + /// [`config::TelemetryEndpoint`]. The copy is mandatory: the slices point at + /// memory the C caller may free, so it cannot be borrowed into the config. + /// + /// An inherent method rather than a `From` impl because the orphan rule + /// forbids implementing `From for` the foreign `config::TelemetryEndpoint`. + fn into_config(self) -> config::TelemetryEndpoint { + fn owned(slice: ffi::CharSlice) -> Option { + (!slice.is_empty()).then(|| slice.to_utf8_lossy().into_owned()) + } + config::TelemetryEndpoint { + url: owned(self.url), + api_key: owned(self.api_key), + test_token: owned(self.test_token), + timeout_ms: self.timeout_ms, + use_system_resolver: self.use_system_resolver, + } + } +} + +/// Applies endpoint settings to the builder's telemetry config. fn set_builder_endpoint( telemetry_builder: &mut TelemetryWorkerBuilder, - url: ffi::CharSlice, - api_key: ffi::CharSlice, - timeout_ms: u64, - test_token: ffi::CharSlice, - use_system_resolver: bool, + endpoint: TelemetryEndpoint, ) -> ffi::MaybeError { - let url = try_c!(url.try_to_utf8()); - let api_key = api_key.to_utf8_lossy(); - let test_token = test_token.to_utf8_lossy(); - let config = &mut telemetry_builder.config; - // Set the api key before the url so the telemetry path is resolved correctly. - if !api_key.is_empty() { - try_c!(config.set_endpoint_api_key(Some(api_key.as_ref()))); - } - try_c!(config.set_endpoint_url(url)); - if timeout_ms != 0 { - config.set_endpoint_timeout_ms(timeout_ms); - } - if !test_token.is_empty() { - config.set_endpoint_test_token(Some(test_token.into_owned())); - } - config.set_endpoint_use_system_resolver(use_system_resolver); + try_c!(telemetry_builder + .config + .set_endpoint(endpoint.into_config())); ffi::MaybeError::None } @@ -202,11 +218,13 @@ pub unsafe extern "C" fn ddog_telemetry_builder_with_endpoint_config_endpoint( ) -> ffi::MaybeError { set_builder_endpoint( telemetry_builder, - url, - api_key, - timeout_ms, - test_token, - use_system_resolver, + TelemetryEndpoint { + url, + api_key, + timeout_ms, + test_token, + use_system_resolver, + }, ) } #[repr(C)] @@ -233,11 +251,13 @@ pub unsafe extern "C" fn ddog_telemetry_builder_with_property_endpoint( ) -> ffi::MaybeError { set_builder_endpoint( telemetry_builder, - url, - api_key, - timeout_ms, - test_token, - use_system_resolver, + TelemetryEndpoint { + url, + api_key, + timeout_ms, + test_token, + use_system_resolver, + }, ) } #[no_mangle] @@ -261,11 +281,13 @@ pub unsafe extern "C" fn ddog_telemetry_builder_with_endpoint_named_property( match property { "config . endpoint" => set_builder_endpoint( telemetry_builder, - url, - api_key, - timeout_ms, - test_token, - use_system_resolver, + TelemetryEndpoint { + url, + api_key, + timeout_ms, + test_token, + use_system_resolver, + }, ), _ => ffi::MaybeError::None, } diff --git a/libdd-telemetry/examples/tm-metrics-worker-test.rs b/libdd-telemetry/examples/tm-metrics-worker-test.rs index 0a9d420a02..d896d294fc 100644 --- a/libdd-telemetry/examples/tm-metrics-worker-test.rs +++ b/libdd-telemetry/examples/tm-metrics-worker-test.rs @@ -36,7 +36,10 @@ fn main() -> Result<(), Box> { builder.config.telemetry_debug_logging_enabled = true; builder .config - .set_endpoint_url("file://./tm-metrics-worker-test.output") + .set_endpoint(libdd_telemetry::config::TelemetryEndpoint { + url: Some("file://./tm-metrics-worker-test.output".to_owned()), + ..Default::default() + }) .unwrap(); builder.config.telemetry_heartbeat_interval = Duration::from_secs(1); builder.config.debug_enabled = true; diff --git a/libdd-telemetry/examples/tm-send-sketch.rs b/libdd-telemetry/examples/tm-send-sketch.rs index ca78ce5ea5..bfd54e119c 100644 --- a/libdd-telemetry/examples/tm-send-sketch.rs +++ b/libdd-telemetry/examples/tm-send-sketch.rs @@ -110,9 +110,12 @@ async fn async_main() { config.direct_submission_enabled = true; config.debug_enabled = true; let api_key = std::env::var("DD_API_KEY").unwrap(); - config.set_endpoint_api_key(Some(&api_key)).unwrap(); config - .set_endpoint_url("https://instrumentation-telemetry-intake.datad0g.com") + .set_endpoint(libdd_telemetry::config::TelemetryEndpoint { + url: Some("https://instrumentation-telemetry-intake.datad0g.com".to_owned()), + api_key: Some(api_key), + ..Default::default() + }) .unwrap(); push_telemetry(&config, &req).await.unwrap(); } diff --git a/libdd-telemetry/examples/tm-worker-test.rs b/libdd-telemetry/examples/tm-worker-test.rs index 7cbd012d69..5825826ad3 100644 --- a/libdd-telemetry/examples/tm-worker-test.rs +++ b/libdd-telemetry/examples/tm-worker-test.rs @@ -39,7 +39,10 @@ fn main() -> Result<(), Box> { builder.config = libdd_telemetry::config::Config::from_env(); builder .config - .set_endpoint_url("file://./tm-worker-test.output") + .set_endpoint(libdd_telemetry::config::TelemetryEndpoint { + url: Some("file://./tm-worker-test.output".to_owned()), + ..Default::default() + }) .unwrap(); builder.config.telemetry_heartbeat_interval = Duration::from_secs(1); diff --git a/libdd-telemetry/src/config.rs b/libdd-telemetry/src/config.rs index 68ffbaea11..026d4bc8ef 100644 --- a/libdd-telemetry/src/config.rs +++ b/libdd-telemetry/src/config.rs @@ -18,6 +18,23 @@ const TRACE_SOCKET_PATH: &str = "/var/run/datadog/apm.socket"; const DEFAULT_AGENT_HOST: &str = "localhost"; const DEFAULT_AGENT_PORT: u16 = 8126; +/// Partial endpoint configuration applied through [`Config::set_endpoint`]. +/// +/// A `None` (or, for `timeout_ms`, `0`) field leaves the corresponding endpoint +/// value untouched, so the struct doubles as a patch. `use_system_resolver` is +/// always applied. +#[derive(Debug, Default)] +pub struct TelemetryEndpoint { + pub url: Option, + pub api_key: Option, + pub timeout_ms: u64, + /// Sets X-Datadog-Test-Session-Token header on any request + pub test_token: Option, + /// Use the system DNS resolver when building the HTTP client. If false, the default + /// in-process resolver is used. + pub use_system_resolver: bool, +} + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct Config { /// Endpoint to send the data to @@ -237,8 +254,8 @@ impl Config { } /// Rewrites the endpoint path to the telemetry path appropriate for the - /// current scheme, API key and direct-submission setting. Called by the - /// setters that can affect it (URL and API key). + /// current scheme, API key and direct-submission setting. Called by + /// [`Config::set_endpoint`] after the endpoint fields have been updated. fn apply_telemetry_path(&mut self) -> anyhow::Result<()> { if let Some(endpoint) = self.endpoint.take() { self.endpoint = Some(endpoint_with_telemetry_path( @@ -249,48 +266,49 @@ impl Config { Ok(()) } - /// Sets the endpoint URL from a string (`http(s)://`, `unix://`, `windows:` - /// or `file://`). Use [`Config::set_endpoint_uri`] if you already hold a - /// parsed [`Uri`] to avoid re-parsing. - pub fn set_endpoint_url(&mut self, url: &str) -> anyhow::Result<()> { - self.set_endpoint_uri(parse_uri(url)?) - } + /// Applies a [`TelemetryEndpoint`] patch to the endpoint, then rewrites the + /// path to the telemetry path appropriate for the resulting scheme and API + /// key. This is the single entry point for endpoint configuration so the URL + /// path invariant always holds. + pub fn set_endpoint(&mut self, endpoint: TelemetryEndpoint) -> anyhow::Result<()> { + // Parse the URL before touching `self.endpoint` so a parse error leaves + // the existing endpoint untouched. + let url = endpoint.url.as_deref().map(parse_uri).transpose()?; + + let inner = self.endpoint.get_or_insert_with(Endpoint::default); + if let Some(url) = url { + inner.url = url; + } - /// Sets the endpoint URL from an already-parsed [`Uri`]. - pub fn set_endpoint_uri(&mut self, uri: Uri) -> anyhow::Result<()> { - self.endpoint.get_or_insert_with(Endpoint::default).url = uri; - self.apply_telemetry_path() - } + // Move the owned Strings into the `Cow<'static, str>` fields — `Cow::from(String)` + // yields `Cow::Owned`, so no copy happens. + if let Some(api_key) = endpoint.api_key { + inner.api_key = Some(Cow::from(api_key)); + } - /// Sets (or, with `None`, clears) the endpoint API key. - pub fn set_endpoint_api_key(&mut self, api_key: Option<&str>) -> anyhow::Result<()> { - self.endpoint.get_or_insert_with(Endpoint::default).api_key = - api_key.map(|key| Cow::Owned(key.to_string())); - self.apply_telemetry_path() - } + if let Some(test_token) = endpoint.test_token { + inner.test_token = Some(Cow::from(test_token)); + } - /// Sets the endpoint request timeout in milliseconds. - pub fn set_endpoint_timeout_ms(&mut self, timeout_ms: u64) { - if let Some(endpoint) = &mut self.endpoint { - endpoint.timeout_ms = timeout_ms; + if endpoint.timeout_ms != 0 { + inner.timeout_ms = endpoint.timeout_ms; } + + inner.use_system_resolver = endpoint.use_system_resolver; + + self.apply_telemetry_path() } /// Sets (or, with `None`, clears) the `X-Datadog-Test-Session-Token` header - /// sent with requests. + /// sent with requests. Unlike [`Config::set_endpoint`], `None` clears the + /// token rather than leaving it unchanged, and an absent endpoint is left + /// absent (no default is inserted). pub fn set_endpoint_test_token>>(&mut self, test_token: Option) { if let Some(endpoint) = &mut self.endpoint { endpoint.test_token = test_token.map(|token| token.into()); } } - /// Sets whether to use the system DNS resolver when building the HTTP client. - pub fn set_endpoint_use_system_resolver(&mut self, use_system_resolver: bool) { - if let Some(endpoint) = &mut self.endpoint { - endpoint.use_system_resolver = use_system_resolver; - } - } - pub fn from_settings(settings: &Settings) -> Self { let trace_agent_url = Self::trace_agent_url_from_setting(settings); let api_key = Self::api_key_from_settings(settings); @@ -308,8 +326,11 @@ impl Config { root_session_id: None, }; - _ = this.set_endpoint_api_key(api_key.as_deref()); - _ = this.set_endpoint_url(&trace_agent_url); + _ = this.set_endpoint(TelemetryEndpoint { + url: Some(trace_agent_url), + api_key: api_key.map(Cow::into_owned), + ..Default::default() + }); this } @@ -318,20 +339,6 @@ impl Config { let settings = Settings::from_env(); Self::from_settings(&settings) } - - /// set_host sets the host telemetry should connect to. - /// - /// It handles the following schemes - /// * http/https - /// * unix sockets unix://\ - /// * windows pipes of the format windows:\ - /// * files, with the format file://\ - /// - /// If the host_url is http/https, any path will be ignored and replaced by the - /// appropriate telemetry endpoint path - pub fn set_host_from_url(&mut self, host_url: &str) -> anyhow::Result<()> { - self.set_endpoint_url(host_url) - } } #[cfg(test)] @@ -343,7 +350,16 @@ mod tests { use libdd_common::connector::named_pipe; - use super::{Config, Settings}; + use super::{Config, Settings, TelemetryEndpoint}; + + /// Test helper mirroring the old `set_host_from_url`: set only the URL and + /// let `set_endpoint` resolve the telemetry path. + fn set_host_from_url(cfg: &mut Config, host_url: &str) -> anyhow::Result<()> { + cfg.set_endpoint(TelemetryEndpoint { + url: Some(host_url.to_owned()), + ..Default::default() + }) + } #[test] fn test_agent_host_detection_trace_agent_url_should_take_precedence() { @@ -444,8 +460,7 @@ mod tests { fn test_config_set_url() { let mut cfg = Config::default(); - cfg.set_host_from_url("http://example.com/any_path_will_be_ignored") - .unwrap(); + set_host_from_url(&mut cfg, "http://example.com/any_path_will_be_ignored").unwrap(); assert_eq!( "http://example.com/telemetry/proxy/api/v2/apmtelemetry", @@ -467,7 +482,7 @@ mod tests { for (input, expected) in cases { let mut cfg = Config::default(); - cfg.set_host_from_url(input).unwrap(); + set_host_from_url(&mut cfg, input).unwrap(); assert_eq!( "file", @@ -491,7 +506,7 @@ mod tests { fn test_config_set_url_unix_socket() { let mut cfg = Config::default(); - cfg.set_host_from_url("unix:///compatiliby/path").unwrap(); + set_host_from_url(&mut cfg, "unix:///compatiliby/path").unwrap(); assert_eq!( "unix://2f636f6d706174696c6962792f70617468/telemetry/proxy/api/v2/apmtelemetry", cfg.clone().endpoint.unwrap().url.to_string() @@ -508,7 +523,7 @@ mod tests { fn test_config_set_url_windows_pipe() { let mut cfg = Config::default(); - cfg.set_host_from_url("windows:C:\\system32\\foo").unwrap(); + set_host_from_url(&mut cfg, "windows:C:\\system32\\foo").unwrap(); assert_eq!( "windows://433a5c73797374656d33325c666f6f/telemetry/proxy/api/v2/apmtelemetry", cfg.clone().endpoint.unwrap().url.to_string() diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index f14f471916..abf319db5d 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -1299,6 +1299,7 @@ impl TelemetryWorkerBuilder { #[cfg(test)] mod tests { + use crate::config::TelemetryEndpoint; use crate::data::Payload; use crate::worker::http_client::header::{ DD_PARENT_SESSION_ID, DD_ROOT_SESSION_ID, DD_SESSION_ID, @@ -1333,7 +1334,12 @@ mod tests { "1".into(), "tv".into(), ); - b.config.set_endpoint_url("http://127.0.0.1:1").unwrap(); + b.config + .set_endpoint(TelemetryEndpoint { + url: Some("http://127.0.0.1:1".to_owned()), + ..Default::default() + }) + .unwrap(); b.runtime_id = Some("rid".into()); b.config.session_id = session_id; b.config.parent_session_id = parent_session_id; @@ -1465,7 +1471,12 @@ mod tests { "1".into(), "tv".into(), ); - b.config.set_endpoint_url("http://127.0.0.1:1").unwrap(); + b.config + .set_endpoint(TelemetryEndpoint { + url: Some("http://127.0.0.1:1".to_owned()), + ..Default::default() + }) + .unwrap(); b.runtime_id = Some("rid".into()); b.flavor = flavor; b.build_worker(Some(tokio::runtime::Handle::current())).1 @@ -1809,7 +1820,13 @@ mod tests { "1".into(), "tv".into(), ); - builder.config.set_endpoint_url(&server.url("/")).unwrap(); + builder + .config + .set_endpoint(TelemetryEndpoint { + url: Some(server.url("/")), + ..Default::default() + }) + .unwrap(); builder.runtime_id = Some("rid".into()); let shared_runtime = SharedRuntime::new().expect("SharedRuntime::new");