Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 8 additions & 18 deletions crates/trusted-server-adapter-fastly/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,24 +192,14 @@ async fn route_request(
(Method::POST, "/first-party/proxy-rebuild") => {
handle_first_party_proxy_rebuild(settings, runtime_services, req).await
}
(m, path) if integration_registry.has_route(&m, path) => {
// TODO(PR13): migrate integration trait to http types here
integration_registry
.handle_proxy(
&m,
path,
settings,
runtime_services,
compat::to_fastly_request(req),
)
.await
.unwrap_or_else(|| {
Err(Report::new(TrustedServerError::BadRequest {
message: format!("Unknown integration route: {path}"),
}))
})
.map(compat::from_fastly_response)
}
(m, path) if integration_registry.has_route(&m, path) => integration_registry
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 praise — Clean elimination of the compat::to_fastly_request / compat::from_fastly_response round-trip for integration dispatch. The proxy registry now operates on http::Request<EdgeBody> end-to-end, which removes a non-trivial amount of conversion overhead per request and eliminates an entire class of header-mapping bugs.

.handle_proxy(&m, path, settings, runtime_services, req)
.await
.unwrap_or_else(|| {
Err(Report::new(TrustedServerError::BadRequest {
message: format!("Unknown integration route: {path}"),
}))
}),

// No known route matched, proxy to publisher origin as fallback
_ => {
Expand Down
24 changes: 17 additions & 7 deletions crates/trusted-server-core/src/auction/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ The auction orchestration system allows you to:
┌─────────────────────────────────────────────────────────┐
│ AuctionProvider Trait │
│ - request_bids() │
│ - request_bids() async │
│ - parse_response() │
│ - provider_name() │
│ - timeout_ms() │
│ - is_enabled() │
Expand Down Expand Up @@ -507,6 +508,7 @@ timeout_ms = 500
use async_trait::async_trait;
use crate::auction::provider::AuctionProvider;
use crate::auction::types::{AuctionContext, AuctionRequest, AuctionResponse};
use crate::platform::{PlatformPendingRequest, PlatformResponse};

pub struct YourAuctionProvider {
config: YourConfig,
Expand All @@ -522,11 +524,19 @@ impl AuctionProvider for YourAuctionProvider {
&self,
request: &AuctionRequest,
_context: &AuctionContext<'_>,
) -> Result<AuctionResponse, Report<TrustedServerError>> {
) -> Result<PlatformPendingRequest, Report<TrustedServerError>> {
// 1. Transform AuctionRequest to your provider's format
// 2. Make HTTP request to your provider
// 3. Parse response
// 4. Return AuctionResponse with bids
// 2. Launch HTTP request through services.http_client().send_async(...)
// 3. Return PlatformPendingRequest for the orchestrator to await
todo!()
}

async fn parse_response(
&self,
response: PlatformResponse,
Comment thread
prk-Jr marked this conversation as resolved.
response_time_ms: u64,
) -> Result<AuctionResponse, Report<TrustedServerError>> {
// 4. Parse PlatformResponse into AuctionResponse
todo!()
}

Expand Down Expand Up @@ -562,7 +572,7 @@ let orchestrator = AuctionOrchestrator::new(config);
orchestrator.register_provider(Arc::new(PrebidAuctionProvider::new(prebid_config)));
orchestrator.register_provider(Arc::new(ApsAuctionProvider::new(aps_config)));

let result = orchestrator.run_auction(&request, &context).await?;
let result = orchestrator.run_auction(&request, &context, &services).await?;

// Check results
assert_eq!(result.winning_bids.len(), 2);
Expand All @@ -571,7 +581,7 @@ assert!(result.total_time_ms < 2000);

## Performance Considerations

- **Parallel Execution**: Currently runs sequentially in Fastly Compute (no tokio runtime), but structured for easy parallelization
- **Parallel Execution**: Providers are launched concurrently via `select()` over `PendingRequest`s; responses are processed as they become ready within the auction deadline
- **Timeouts**: Each provider has independent timeout; global timeout enforced at flow level
- **Error Handling**: Provider failures don't fail entire auction; partial results returned

Expand Down
27 changes: 9 additions & 18 deletions crates/trusted-server-core/src/auction/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ use error_stack::{Report, ResultExt};
use http::{Request, Response};

use crate::auction::formats::AdRequest;
use crate::compat;
use crate::consent;
use crate::cookies::handle_request_cookies;
use crate::edge_cookie::get_or_generate_ec_id;
use crate::error::TrustedServerError;
use crate::integrations::collect_body_bounded;
use crate::platform::RuntimeServices;
use crate::settings::Settings;

use super::formats::{convert_to_openrtb_response, convert_tsjs_to_auction_request};
use super::types::AuctionContext;
use super::AuctionOrchestrator;

const AUCTION_MAX_BODY_BYTES: usize = 65536;
const AUCTION_MAX_BODY_BYTES: usize = 256 * 1024;

/// Handle auction request from /auction endpoint.
///
Expand All @@ -40,17 +40,12 @@ pub async fn handle_auction(
) -> Result<Response<EdgeBody>, Report<TrustedServerError>> {
let (parts, body) = req.into_parts();

// Parse request body
let body_bytes = body.into_bytes();
if body_bytes.len() > AUCTION_MAX_BODY_BYTES {
return Err(Report::new(TrustedServerError::RequestTooLarge {
message: format!(
"auction payload {} exceeds limit of {}",
body_bytes.len(),
AUCTION_MAX_BODY_BYTES,
),
}));
}
// Parse request body — use a bounded read so streaming bodies cannot exhaust memory.
let body_bytes = collect_body_bounded(body, AUCTION_MAX_BODY_BYTES, "auction")
.await
.change_context(TrustedServerError::Auction {
message: "Failed to read auction request body".to_string(),
})?;
let body: AdRequest =
serde_json::from_slice(&body_bytes).change_context(TrustedServerError::Auction {
message: "Failed to parse auction request body".to_string(),
Expand Down Expand Up @@ -103,14 +98,10 @@ pub async fn handle_auction(
geo,
)?;

// Body already parsed above; provider context only needs request metadata.
let fastly_req = compat::to_fastly_request_ref(&http_req);

// Create auction context
let context = AuctionContext {
settings,
request: &fastly_req,
client_info: services.client_info(),
request: &http_req,
timeout_ms: settings.auction.timeout_ms,
provider_responses: None,
services,
Expand Down
2 changes: 1 addition & 1 deletion crates/trusted-server-core/src/auction/formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub fn convert_tsjs_to_auction_request(
.get(header::USER_AGENT)
.and_then(|value| value.to_str().ok())
.map(str::to_string),
ip: services.client_info.client_ip.map(|ip| ip.to_string()),
ip: services.client_info().client_ip.map(|ip| ip.to_string()),
geo,
});

Expand Down
98 changes: 42 additions & 56 deletions crates/trusted-server-core/src/auction/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

use crate::compat::platform_response_to_fastly;
use crate::error::TrustedServerError;
use crate::platform::{PlatformPendingRequest, RuntimeServices};

Expand Down Expand Up @@ -145,7 +144,6 @@ impl AuctionOrchestrator {
let mediator_context = AuctionContext {
settings: context.settings,
request: context.request,
client_info: context.client_info,
timeout_ms: remaining_ms,
provider_responses: Some(&provider_responses),
services: context.services,
Expand All @@ -154,6 +152,7 @@ impl AuctionOrchestrator {
let start_time = Instant::now();
let pending = mediator
.request_bids(request, &mediator_context)
.await
.change_context(TrustedServerError::Auction {
message: format!("Mediator {} failed to launch", mediator.provider_name()),
})?;
Expand All @@ -165,18 +164,11 @@ impl AuctionOrchestrator {
.change_context(TrustedServerError::Auction {
message: format!("Mediator {} request failed", mediator.provider_name()),
})?;
let backend_response = platform_response_to_fastly(platform_resp).change_context(
TrustedServerError::Auction {
message: format!(
"Mediator {} returned an unsupported response body",
mediator.provider_name()
),
},
)?;

let response_time_ms = start_time.elapsed().as_millis() as u64;
let mediator_resp = mediator
.parse_response(backend_response, response_time_ms)
.parse_response(platform_resp, response_time_ms)
.await
.change_context(TrustedServerError::Auction {
message: format!("Mediator {} parse failed", mediator.provider_name()),
})?;
Expand Down Expand Up @@ -325,7 +317,6 @@ impl AuctionOrchestrator {
let provider_context = AuctionContext {
settings: context.settings,
request: context.request,
client_info: context.client_info,
timeout_ms: effective_timeout,
provider_responses: context.provider_responses,
services: context.services,
Expand All @@ -339,14 +330,25 @@ impl AuctionOrchestrator {
);

let start_time = Instant::now();
match provider.request_bids(request, &provider_context) {
match provider.request_bids(request, &provider_context).await {
Ok(pending) => {
let request_backend_name = pending
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 thinkingpending.backend_name().unwrap_or_else(|| backend_name.clone()) silently falls back to the predicted name. If the runtime returns None for pending.backend_name() and PlatformResponse::backend_name is set to something else by the platform impl, the response handler at line 392 will fail to identify the provider and log "Received response from unknown backend" — meaning that provider's bids are silently dropped.

Worth a log::debug! (or warn!) when the fallback fires, so production has a signal if PlatformPendingRequest::backend_name() ever stops returning the expected value.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 thinkingpending.backend_name().unwrap_or_else(|| backend_name.clone()) silently falls back to the predicted name. If the runtime returns None for pending.backend_name() and PlatformResponse::backend_name is set to something else by the platform impl, the response handler at line 392 will fail to identify the provider and log "Received response from unknown backend" — meaning that provider's bids are silently dropped.

Worth a log::debug! (or warn!) when the fallback fires, so production has a signal if PlatformPendingRequest::backend_name() ever stops returning the expected value.

.backend_name()
.map(str::to_string)
.unwrap_or_else(|| {
log::debug!(
"Provider '{}' pending request returned no backend name; \
using predicted name '{}'",
provider.provider_name(),
backend_name,
);
backend_name.clone()
});
backend_to_provider.insert(
backend_name.clone(),
request_backend_name.clone(),
(provider.provider_name(), start_time, provider.as_ref()),
);
pending_requests
.push(PlatformPendingRequest::new(pending).with_backend_name(backend_name));
pending_requests.push(pending);
log::debug!(
"Request to '{}' launched successfully",
provider.provider_name()
Expand Down Expand Up @@ -400,31 +402,19 @@ impl AuctionOrchestrator {
{
let response_time_ms = start_time.elapsed().as_millis() as u64;

match platform_response_to_fastly(platform_response) {
Ok(response) => {
match provider.parse_response(response, response_time_ms) {
Ok(auction_response) => {
log::info!(
"Provider '{}' returned {} bids (status: {:?}, time: {}ms)",
auction_response.provider,
auction_response.bids.len(),
auction_response.status,
auction_response.response_time_ms
);
responses.push(auction_response);
}
Err(e) => {
log::warn!(
"Provider '{}' failed to parse response: {:?}",
provider_name,
e
);
responses.push(AuctionResponse::error(
provider_name,
response_time_ms,
));
}
}
match provider
.parse_response(platform_response, response_time_ms)
.await
{
Ok(auction_response) => {
log::info!(
"Provider '{}' returned {} bids (status: {:?}, time: {}ms)",
auction_response.provider,
auction_response.bids.len(),
auction_response.status,
auction_response.response_time_ms
);
responses.push(auction_response);
}
Err(e) => {
log::warn!(
Expand Down Expand Up @@ -638,17 +628,8 @@ mod tests {
AdFormat, AdSlot, AuctionRequest, Bid, MediaType, PublisherInfo, UserInfo,
};

// All-None ClientInfo used across tests that don't need real IP/TLS data.
// Defined as a const so &EMPTY_CLIENT_INFO has 'static lifetime, avoiding
// the temporary-lifetime issue that arises with &ClientInfo::default().
const EMPTY_CLIENT_INFO: crate::platform::ClientInfo = crate::platform::ClientInfo {
client_ip: None,
tls_protocol: None,
tls_cipher: None,
};
use crate::platform::test_support::noop_services;
use crate::test_support::tests::crate_test_settings_str;
use fastly::Request;
use std::collections::{HashMap, HashSet};

use super::AuctionOrchestrator;
Expand Down Expand Up @@ -757,18 +738,19 @@ mod tests {
}

// TODO: Re-enable provider integration tests after implementing mock support
// for send_async(). Mock providers can't create PendingRequest without real
// Fastly backends.
// for `PlatformHttpClient::send_async()`. Mock providers currently cannot
// create realistic pending requests for the select loop without real
// platform-backed transport handles.
//
// Untested timeout enforcement paths (require real backends):
// - Deadline check in select() loop (drops remaining requests)
// - Mediator skip when remaining_ms == 0 (bidding exhausts budget)
// - Provider skip when effective_timeout == 0 (budget exhausted before launch)
// - Provider context receives reduced timeout_ms per remaining budget
//
// Follow-up: introduce a thin abstraction over `select()` (e.g. a trait)
// Follow-up: introduce a thin abstraction over `PlatformHttpClient::select()`
// so the deadline/drop logic can be unit-tested with mock futures instead
// of requiring real Fastly backends. An `#[ignore]` integration test
// of requiring real platform backends. An `#[ignore]` integration test
// exercising the full path via Viceroy would also catch regressions.

#[tokio::test]
Expand All @@ -786,8 +768,12 @@ mod tests {

let request = create_test_auction_request();
let settings = create_test_settings();
let req = Request::get("https://test.com/test");
let context = create_test_auction_context(&settings, &req, &EMPTY_CLIENT_INFO, 2000);
let req = http::Request::builder()
.method(http::Method::GET)
.uri("https://test.com/test")
.body(edgezero_core::body::Body::empty())
.expect("should build request");
let context = create_test_auction_context(&settings, &req, 2000);

let result = orchestrator
.run_auction(&request, &context, &noop_services())
Expand Down
Loading
Loading