Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/hotblocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ bytes = { workspace = true }
chrono = { workspace = true, features = ["std"] }
clap = { workspace = true, features = ["derive"] }
flate2 = { workspace = true }
zstd = "0.13"
futures = { workspace = true }
ouroboros = { workspace = true }
prometheus-client = { workspace = true }
Expand Down
23 changes: 16 additions & 7 deletions crates/hotblocks/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use axum::{
BoxError, Extension, Json, Router,
body::{Body, Bytes},
extract::{Path, Request},
http::{StatusCode, Uri},
http::{HeaderMap, StatusCode, Uri},
response::{IntoResponse, Response},
routing::{get, post}
};
Expand All @@ -21,6 +21,7 @@ use tracing::{Instrument, error};
use crate::{
cli::App,
dataset_controller::DatasetController,
encoding::ContentEncoding,
errors::{
BlockItemIsNotAvailable, BlockRangeMissing, Busy, QueryIsAboveTheHead, QueryKindMismatch, UnknownDataset
},
Expand Down Expand Up @@ -178,9 +179,11 @@ async fn stream(
Extension(app): Extension<AppRef>,
Extension(client_id): Extension<ClientId>,
Path(dataset_id): Path<DatasetId>,
headers: HeaderMap,
body: Bytes
) -> impl IntoResponse {
let response = stream_internal(app, dataset_id, body, false, client_id.clone()).await;
let encoding = ContentEncoding::from_headers(&headers);
let response = stream_internal(app, dataset_id, body, false, client_id.clone(), encoding).await;
ResponseWithMetadata::new()
.with_client_id(&client_id)
.with_dataset_id(dataset_id)
Expand All @@ -192,9 +195,11 @@ async fn finalized_stream(
Extension(app): Extension<AppRef>,
Extension(client_id): Extension<ClientId>,
Path(dataset_id): Path<DatasetId>,
headers: HeaderMap,
body: Bytes
) -> impl IntoResponse {
let response = stream_internal(app, dataset_id, body, true, client_id.clone()).await;
let encoding = ContentEncoding::from_headers(&headers);
let response = stream_internal(app, dataset_id, body, true, client_id.clone(), encoding).await;
ResponseWithMetadata::new()
.with_client_id(&client_id)
.with_dataset_id(dataset_id)
Expand All @@ -207,7 +212,8 @@ async fn stream_internal(
dataset_id: DatasetId,
body: Bytes,
finalized: bool,
client_id: ClientId
client_id: ClientId,
encoding: ContentEncoding
) -> Response {
let dataset = get_dataset!(app, dataset_id);

Expand All @@ -221,17 +227,20 @@ async fn stream_internal(
}

let query_result = if finalized {
app.query_service.query_finalized(&dataset, query, client_id).await
app.query_service
.query_finalized(&dataset, query, client_id, encoding)
.await
} else {
app.query_service.query(&dataset, query, client_id).await
app.query_service.query(&dataset, query, client_id, encoding).await
};

match query_result {
Ok(stream) => {
let mut res = Response::builder()
.status(200)
.header("content-type", "text/plain")
.header("content-encoding", "gzip");
.header("content-encoding", encoding.as_str())
.header("vary", "Accept-Encoding");

if let Some(finalized_head) = stream.finalized_head() {
if finalized {
Expand Down
59 changes: 59 additions & 0 deletions crates/hotblocks/src/encoding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use axum::http::{HeaderMap, header::ACCEPT_ENCODING};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ContentEncoding {
Gzip,
Zstd
}

impl ContentEncoding {
pub fn as_str(&self) -> &'static str {
match self {
ContentEncoding::Gzip => "gzip",
ContentEncoding::Zstd => "zstd"
}
}

/// Respond with zstd if the client mentions it in `Accept-Encoding`, otherwise with gzip
pub fn from_headers(headers: &HeaderMap) -> Self {
let accepts_zstd = headers
.get(ACCEPT_ENCODING)
.and_then(|value| value.to_str().ok())
.is_some_and(|value| value.contains("zstd"));
if accepts_zstd {
ContentEncoding::Zstd
} else {
ContentEncoding::Gzip
}
}
}

#[cfg(test)]
mod tests {
use super::*;

fn headers(accept_encoding: &str) -> HeaderMap {
let mut headers = HeaderMap::new();
headers.insert(ACCEPT_ENCODING, accept_encoding.parse().unwrap());
headers
}

#[test]
fn no_header_defaults_to_gzip() {
assert_eq!(ContentEncoding::from_headers(&HeaderMap::new()), ContentEncoding::Gzip);
}

#[test]
fn gzip_only() {
assert_eq!(ContentEncoding::from_headers(&headers("gzip")), ContentEncoding::Gzip);
}

#[test]
fn zstd_preferred_when_mentioned() {
assert_eq!(ContentEncoding::from_headers(&headers("zstd")), ContentEncoding::Zstd);
assert_eq!(
ContentEncoding::from_headers(&headers("gzip, deflate, zstd")),
ContentEncoding::Zstd
);
}
}
1 change: 1 addition & 0 deletions crates/hotblocks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod cli;
mod data_service;
mod dataset_config;
mod dataset_controller;
mod encoding;
mod errors;
mod metrics;
mod query;
Expand Down
6 changes: 4 additions & 2 deletions crates/hotblocks/src/query/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use super::{
running::{RunningQuery, RunningQueryStats}
};
use crate::{
encoding::ContentEncoding,
errors::Busy,
metrics::{
STREAM_BLOCKS, STREAM_BLOCKS_PER_SECOND, STREAM_BYTES, STREAM_BYTES_PER_SECOND, STREAM_CHUNKS, STREAM_DURATIONS
Expand Down Expand Up @@ -85,14 +86,15 @@ impl QueryResponse {
query: Query,
only_finalized: bool,
time_limit: Option<Duration>,
client_id: ClientId
client_id: ClientId,
encoding: ContentEncoding
) -> anyhow::Result<Self> {
let Some(slot) = executor.get_slot() else { bail!(Busy) };

let stats = QueryStreamStats::new();
let mut runner = slot
.run(move |slot| -> anyhow::Result<_> {
let mut runner = RunningQuery::new(db, dataset_id, &query, only_finalized).map(Box::new)?;
let mut runner = RunningQuery::new(db, dataset_id, &query, only_finalized, encoding).map(Box::new)?;
next_run(&mut runner, slot)?;
Ok(runner)
})
Expand Down
76 changes: 66 additions & 10 deletions crates/hotblocks/src/query/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use flate2::{Compression, write::GzEncoder};
use sqd_primitives::{BlockNumber, BlockRef};
use sqd_query::{JsonLinesWriter, Plan, Query};
use sqd_storage::db::{Chunk as StorageChunk, DatasetId};
use zstd::stream::write::Encoder as ZstdEncoder;

use crate::{
encoding::ContentEncoding,
errors::{BlockItemIsNotAvailable, BlockRangeMissing, QueryIsAboveTheHead, QueryKindMismatch},
metrics::{QUERIED_BLOCKS, QUERIED_CHUNKS},
query::static_snapshot::{StaticChunkIterator, StaticChunkReader, StaticSnapshot},
Expand Down Expand Up @@ -49,19 +51,77 @@ impl RunningQueryStats {
}
}

enum Compressor {
Gzip(GzEncoder<bytes::buf::Writer<BytesMut>>),
Zstd(ZstdEncoder<'static, bytes::buf::Writer<BytesMut>>)
}

impl Write for Compressor {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
match self {
Compressor::Gzip(e) => e.write(buf),
Compressor::Zstd(e) => e.write(buf)
}
}

fn flush(&mut self) -> std::io::Result<()> {
match self {
Compressor::Gzip(e) => e.flush(),
Compressor::Zstd(e) => e.flush()
}
}
}

impl Compressor {
fn new(encoding: ContentEncoding) -> anyhow::Result<Self> {
let writer = BytesMut::new().writer();
Ok(match encoding {
ContentEncoding::Gzip => Compressor::Gzip(GzEncoder::new(writer, Compression::fast())),
ContentEncoding::Zstd => Compressor::Zstd(ZstdEncoder::new(writer, 1)?)
})
}

fn get_ref(&self) -> &BytesMut {
match self {
Compressor::Gzip(e) => e.get_ref().get_ref(),
Compressor::Zstd(e) => e.get_ref().get_ref()
}
}

fn get_mut(&mut self) -> &mut BytesMut {
match self {
Compressor::Gzip(e) => e.get_mut().get_mut(),
Compressor::Zstd(e) => e.get_mut().get_mut()
}
}

fn finish(self) -> BytesMut {
match self {
Compressor::Gzip(e) => e.finish().expect("IO errors are not possible").into_inner(),
Compressor::Zstd(e) => e.finish().expect("IO errors are not possible").into_inner()
}
}
}

pub struct RunningQuery {
plan: Plan,
last_block: Option<BlockNumber>,
left_over: Option<LeftOver>,
next_chunk: Option<anyhow::Result<StorageChunk>>,
chunk_iterator: StaticChunkIterator,
finalized_head: Option<BlockRef>,
buf: GzEncoder<bytes::buf::Writer<BytesMut>>,
buf: Compressor,
stats: RunningQueryStats
}

impl RunningQuery {
pub fn new(db: DBRef, dataset_id: DatasetId, query: &Query, only_finalized: bool) -> anyhow::Result<Self> {
pub fn new(
db: DBRef,
dataset_id: DatasetId,
query: &Query,
only_finalized: bool,
encoding: ContentEncoding
) -> anyhow::Result<Self> {
let snapshot = StaticSnapshot::new(db);

let finalized_head = match snapshot.get_label(dataset_id)? {
Expand Down Expand Up @@ -139,7 +199,7 @@ impl RunningQuery {
next_chunk: Some(Ok(first_chunk)),
chunk_iterator,
finalized_head,
buf: GzEncoder::new(BytesMut::new().writer(), Compression::fast()),
buf: Compressor::new(encoding)?,
stats
})
}
Expand All @@ -153,19 +213,15 @@ impl RunningQuery {
}

pub fn buffered_bytes(&self) -> usize {
self.buf.get_ref().get_ref().len()
self.buf.get_ref().len()
}

pub fn take_buffered_bytes(&mut self) -> Bytes {
self.buf.get_mut().get_mut().split().freeze()
self.buf.get_mut().split().freeze()
}

pub fn finish(self) -> Bytes {
self.buf
.finish()
.expect("IO errors are not possible")
.into_inner()
.freeze()
self.buf.finish().freeze()
}

pub fn has_next_chunk(&self) -> bool {
Expand Down
17 changes: 11 additions & 6 deletions crates/hotblocks/src/query/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use sqd_query::Query;
use super::{executor::QueryExecutor, response::QueryResponse};
use crate::{
dataset_controller::DatasetController,
encoding::ContentEncoding,
errors::{Busy, QueryIsAboveTheHead, QueryKindMismatch},
query::QueryExecutorCollector,
types::{ClientId, DBRef, DatasetKind}
Expand Down Expand Up @@ -84,26 +85,29 @@ impl QueryService {
&self,
dataset: &DatasetController,
query: Query,
client_id: ClientId
client_id: ClientId,
encoding: ContentEncoding
) -> anyhow::Result<QueryResponse> {
self.query_internal(dataset, query, false, client_id).await
self.query_internal(dataset, query, false, client_id, encoding).await
}

pub async fn query_finalized(
&self,
dataset: &DatasetController,
query: Query,
client_id: ClientId
client_id: ClientId,
encoding: ContentEncoding
) -> anyhow::Result<QueryResponse> {
self.query_internal(dataset, query, true, client_id).await
self.query_internal(dataset, query, true, client_id, encoding).await
}

async fn query_internal(
&self,
dataset: &DatasetController,
query: Query,
finalized: bool,
client_id: ClientId
client_id: ClientId,
encoding: ContentEncoding
) -> anyhow::Result<QueryResponse> {
ensure!(
dataset.dataset_kind() == DatasetKind::from_query(&query),
Expand Down Expand Up @@ -158,7 +162,8 @@ impl QueryService {
query,
finalized,
None,
client_id
client_id,
encoding
)
.await
}
Expand Down
Loading