From c27964e00480afaf63b6e69677e4f42488b8e865 Mon Sep 17 00:00:00 2001 From: Eugene Formanenko Date: Wed, 25 Mar 2026 12:12:28 +0400 Subject: [PATCH] feat: support zstd response encoding based on Accept-Encoding header MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Respect client Accept-Encoding header per RFC 7231: - If client sends Accept-Encoding: zstd → respond with zstd compression - If client sends Accept-Encoding: gzip → respond with gzip (backward compat) - Quality values respected (e.g. "zstd;q=1.0, gzip;q=0.5" → zstd) - At equal q, prefer zstd over gzip - Default to gzip when no Accept-Encoding header present Related: https://github.com/subsquid/squid-sdk/pull/456 --- Cargo.lock | 1 + crates/hotblocks/Cargo.toml | 1 + crates/hotblocks/src/api.rs | 23 +++++--- crates/hotblocks/src/encoding.rs | 59 ++++++++++++++++++++ crates/hotblocks/src/main.rs | 1 + crates/hotblocks/src/query/response.rs | 6 +- crates/hotblocks/src/query/running.rs | 76 ++++++++++++++++++++++---- crates/hotblocks/src/query/service.rs | 17 ++++-- 8 files changed, 159 insertions(+), 25 deletions(-) create mode 100644 crates/hotblocks/src/encoding.rs diff --git a/Cargo.lock b/Cargo.lock index e3fb3126..e3c74562 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4909,6 +4909,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "zstd", ] [[package]] diff --git a/crates/hotblocks/Cargo.toml b/crates/hotblocks/Cargo.toml index cd9a93c8..37667ae6 100644 --- a/crates/hotblocks/Cargo.toml +++ b/crates/hotblocks/Cargo.toml @@ -11,6 +11,7 @@ bytes = { workspace = true } chrono = { workspace = true, features = ["std"] } clap = { workspace = true, features = ["derive"] } flate2 = { workspace = true } +zstd = "0.13" futures = { workspace = true } ouroboros = { workspace = true } prometheus-client = { workspace = true } diff --git a/crates/hotblocks/src/api.rs b/crates/hotblocks/src/api.rs index c5cb4fef..3c4e0a52 100644 --- a/crates/hotblocks/src/api.rs +++ b/crates/hotblocks/src/api.rs @@ -6,7 +6,7 @@ use axum::{ BoxError, Extension, Json, Router, body::{Body, Bytes}, extract::{Path, Request}, - http::{StatusCode, Uri}, + http::{HeaderMap, StatusCode, Uri}, response::{IntoResponse, Response}, routing::{get, post} }; @@ -21,6 +21,7 @@ use tracing::{Instrument, error}; use crate::{ cli::App, dataset_controller::DatasetController, + encoding::ContentEncoding, errors::{ BlockItemIsNotAvailable, BlockRangeMissing, Busy, QueryIsAboveTheHead, QueryKindMismatch, UnknownDataset }, @@ -178,9 +179,11 @@ async fn stream( Extension(app): Extension, Extension(client_id): Extension, Path(dataset_id): Path, + headers: HeaderMap, body: Bytes ) -> impl IntoResponse { - let response = stream_internal(app, dataset_id, body, false, client_id.clone()).await; + let encoding = ContentEncoding::from_headers(&headers); + let response = stream_internal(app, dataset_id, body, false, client_id.clone(), encoding).await; ResponseWithMetadata::new() .with_client_id(&client_id) .with_dataset_id(dataset_id) @@ -192,9 +195,11 @@ async fn finalized_stream( Extension(app): Extension, Extension(client_id): Extension, Path(dataset_id): Path, + headers: HeaderMap, body: Bytes ) -> impl IntoResponse { - let response = stream_internal(app, dataset_id, body, true, client_id.clone()).await; + let encoding = ContentEncoding::from_headers(&headers); + let response = stream_internal(app, dataset_id, body, true, client_id.clone(), encoding).await; ResponseWithMetadata::new() .with_client_id(&client_id) .with_dataset_id(dataset_id) @@ -207,7 +212,8 @@ async fn stream_internal( dataset_id: DatasetId, body: Bytes, finalized: bool, - client_id: ClientId + client_id: ClientId, + encoding: ContentEncoding ) -> Response { let dataset = get_dataset!(app, dataset_id); @@ -221,9 +227,11 @@ async fn stream_internal( } let query_result = if finalized { - app.query_service.query_finalized(&dataset, query, client_id).await + app.query_service + .query_finalized(&dataset, query, client_id, encoding) + .await } else { - app.query_service.query(&dataset, query, client_id).await + app.query_service.query(&dataset, query, client_id, encoding).await }; match query_result { @@ -231,7 +239,8 @@ async fn stream_internal( let mut res = Response::builder() .status(200) .header("content-type", "text/plain") - .header("content-encoding", "gzip"); + .header("content-encoding", encoding.as_str()) + .header("vary", "Accept-Encoding"); if let Some(finalized_head) = stream.finalized_head() { if finalized { diff --git a/crates/hotblocks/src/encoding.rs b/crates/hotblocks/src/encoding.rs new file mode 100644 index 00000000..929f1921 --- /dev/null +++ b/crates/hotblocks/src/encoding.rs @@ -0,0 +1,59 @@ +use axum::http::{HeaderMap, header::ACCEPT_ENCODING}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ContentEncoding { + Gzip, + Zstd +} + +impl ContentEncoding { + pub fn as_str(&self) -> &'static str { + match self { + ContentEncoding::Gzip => "gzip", + ContentEncoding::Zstd => "zstd" + } + } + + /// Respond with zstd if the client mentions it in `Accept-Encoding`, otherwise with gzip + pub fn from_headers(headers: &HeaderMap) -> Self { + let accepts_zstd = headers + .get(ACCEPT_ENCODING) + .and_then(|value| value.to_str().ok()) + .is_some_and(|value| value.contains("zstd")); + if accepts_zstd { + ContentEncoding::Zstd + } else { + ContentEncoding::Gzip + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn headers(accept_encoding: &str) -> HeaderMap { + let mut headers = HeaderMap::new(); + headers.insert(ACCEPT_ENCODING, accept_encoding.parse().unwrap()); + headers + } + + #[test] + fn no_header_defaults_to_gzip() { + assert_eq!(ContentEncoding::from_headers(&HeaderMap::new()), ContentEncoding::Gzip); + } + + #[test] + fn gzip_only() { + assert_eq!(ContentEncoding::from_headers(&headers("gzip")), ContentEncoding::Gzip); + } + + #[test] + fn zstd_preferred_when_mentioned() { + assert_eq!(ContentEncoding::from_headers(&headers("zstd")), ContentEncoding::Zstd); + assert_eq!( + ContentEncoding::from_headers(&headers("gzip, deflate, zstd")), + ContentEncoding::Zstd + ); + } +} diff --git a/crates/hotblocks/src/main.rs b/crates/hotblocks/src/main.rs index 409ffecd..af6922ec 100644 --- a/crates/hotblocks/src/main.rs +++ b/crates/hotblocks/src/main.rs @@ -3,6 +3,7 @@ mod cli; mod data_service; mod dataset_config; mod dataset_controller; +mod encoding; mod errors; mod metrics; mod query; diff --git a/crates/hotblocks/src/query/response.rs b/crates/hotblocks/src/query/response.rs index 92ae4804..c5374a5d 100644 --- a/crates/hotblocks/src/query/response.rs +++ b/crates/hotblocks/src/query/response.rs @@ -11,6 +11,7 @@ use super::{ running::{RunningQuery, RunningQueryStats} }; use crate::{ + encoding::ContentEncoding, errors::Busy, metrics::{ STREAM_BLOCKS, STREAM_BLOCKS_PER_SECOND, STREAM_BYTES, STREAM_BYTES_PER_SECOND, STREAM_CHUNKS, STREAM_DURATIONS @@ -85,14 +86,15 @@ impl QueryResponse { query: Query, only_finalized: bool, time_limit: Option, - client_id: ClientId + client_id: ClientId, + encoding: ContentEncoding ) -> anyhow::Result { let Some(slot) = executor.get_slot() else { bail!(Busy) }; let stats = QueryStreamStats::new(); let mut runner = slot .run(move |slot| -> anyhow::Result<_> { - let mut runner = RunningQuery::new(db, dataset_id, &query, only_finalized).map(Box::new)?; + let mut runner = RunningQuery::new(db, dataset_id, &query, only_finalized, encoding).map(Box::new)?; next_run(&mut runner, slot)?; Ok(runner) }) diff --git a/crates/hotblocks/src/query/running.rs b/crates/hotblocks/src/query/running.rs index ab06b030..59fe5018 100644 --- a/crates/hotblocks/src/query/running.rs +++ b/crates/hotblocks/src/query/running.rs @@ -6,8 +6,10 @@ use flate2::{Compression, write::GzEncoder}; use sqd_primitives::{BlockNumber, BlockRef}; use sqd_query::{JsonLinesWriter, Plan, Query}; use sqd_storage::db::{Chunk as StorageChunk, DatasetId}; +use zstd::stream::write::Encoder as ZstdEncoder; use crate::{ + encoding::ContentEncoding, errors::{BlockItemIsNotAvailable, BlockRangeMissing, QueryIsAboveTheHead, QueryKindMismatch}, metrics::{QUERIED_BLOCKS, QUERIED_CHUNKS}, query::static_snapshot::{StaticChunkIterator, StaticChunkReader, StaticSnapshot}, @@ -49,6 +51,58 @@ impl RunningQueryStats { } } +enum Compressor { + Gzip(GzEncoder>), + Zstd(ZstdEncoder<'static, bytes::buf::Writer>) +} + +impl Write for Compressor { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + match self { + Compressor::Gzip(e) => e.write(buf), + Compressor::Zstd(e) => e.write(buf) + } + } + + fn flush(&mut self) -> std::io::Result<()> { + match self { + Compressor::Gzip(e) => e.flush(), + Compressor::Zstd(e) => e.flush() + } + } +} + +impl Compressor { + fn new(encoding: ContentEncoding) -> anyhow::Result { + let writer = BytesMut::new().writer(); + Ok(match encoding { + ContentEncoding::Gzip => Compressor::Gzip(GzEncoder::new(writer, Compression::fast())), + ContentEncoding::Zstd => Compressor::Zstd(ZstdEncoder::new(writer, 1)?) + }) + } + + fn get_ref(&self) -> &BytesMut { + match self { + Compressor::Gzip(e) => e.get_ref().get_ref(), + Compressor::Zstd(e) => e.get_ref().get_ref() + } + } + + fn get_mut(&mut self) -> &mut BytesMut { + match self { + Compressor::Gzip(e) => e.get_mut().get_mut(), + Compressor::Zstd(e) => e.get_mut().get_mut() + } + } + + fn finish(self) -> BytesMut { + match self { + Compressor::Gzip(e) => e.finish().expect("IO errors are not possible").into_inner(), + Compressor::Zstd(e) => e.finish().expect("IO errors are not possible").into_inner() + } + } +} + pub struct RunningQuery { plan: Plan, last_block: Option, @@ -56,12 +110,18 @@ pub struct RunningQuery { next_chunk: Option>, chunk_iterator: StaticChunkIterator, finalized_head: Option, - buf: GzEncoder>, + buf: Compressor, stats: RunningQueryStats } impl RunningQuery { - pub fn new(db: DBRef, dataset_id: DatasetId, query: &Query, only_finalized: bool) -> anyhow::Result { + pub fn new( + db: DBRef, + dataset_id: DatasetId, + query: &Query, + only_finalized: bool, + encoding: ContentEncoding + ) -> anyhow::Result { let snapshot = StaticSnapshot::new(db); let finalized_head = match snapshot.get_label(dataset_id)? { @@ -139,7 +199,7 @@ impl RunningQuery { next_chunk: Some(Ok(first_chunk)), chunk_iterator, finalized_head, - buf: GzEncoder::new(BytesMut::new().writer(), Compression::fast()), + buf: Compressor::new(encoding)?, stats }) } @@ -153,19 +213,15 @@ impl RunningQuery { } pub fn buffered_bytes(&self) -> usize { - self.buf.get_ref().get_ref().len() + self.buf.get_ref().len() } pub fn take_buffered_bytes(&mut self) -> Bytes { - self.buf.get_mut().get_mut().split().freeze() + self.buf.get_mut().split().freeze() } pub fn finish(self) -> Bytes { - self.buf - .finish() - .expect("IO errors are not possible") - .into_inner() - .freeze() + self.buf.finish().freeze() } pub fn has_next_chunk(&self) -> bool { diff --git a/crates/hotblocks/src/query/service.rs b/crates/hotblocks/src/query/service.rs index 8edd1bad..7252ed5f 100644 --- a/crates/hotblocks/src/query/service.rs +++ b/crates/hotblocks/src/query/service.rs @@ -12,6 +12,7 @@ use sqd_query::Query; use super::{executor::QueryExecutor, response::QueryResponse}; use crate::{ dataset_controller::DatasetController, + encoding::ContentEncoding, errors::{Busy, QueryIsAboveTheHead, QueryKindMismatch}, query::QueryExecutorCollector, types::{ClientId, DBRef, DatasetKind} @@ -84,18 +85,20 @@ impl QueryService { &self, dataset: &DatasetController, query: Query, - client_id: ClientId + client_id: ClientId, + encoding: ContentEncoding ) -> anyhow::Result { - self.query_internal(dataset, query, false, client_id).await + self.query_internal(dataset, query, false, client_id, encoding).await } pub async fn query_finalized( &self, dataset: &DatasetController, query: Query, - client_id: ClientId + client_id: ClientId, + encoding: ContentEncoding ) -> anyhow::Result { - self.query_internal(dataset, query, true, client_id).await + self.query_internal(dataset, query, true, client_id, encoding).await } async fn query_internal( @@ -103,7 +106,8 @@ impl QueryService { dataset: &DatasetController, query: Query, finalized: bool, - client_id: ClientId + client_id: ClientId, + encoding: ContentEncoding ) -> anyhow::Result { ensure!( dataset.dataset_kind() == DatasetKind::from_query(&query), @@ -158,7 +162,8 @@ impl QueryService { query, finalized, None, - client_id + client_id, + encoding ) .await }