diff --git a/configs/README.md b/configs/README.md index 46c17f4c..64970fe8 100644 --- a/configs/README.md +++ b/configs/README.md @@ -77,11 +77,13 @@ benchmarks: - name: "Benchmark Name" description: "What this benchmark tests" variables: - - type: payload|node_type|num_blocks|gas_limit + - type: payload|node_type|num_blocks|gas_limit|target_gps|consensus_timing value: single-value values: [array, of, values] # for matrix testing ``` +`consensus_timing` can be `prevent-late-fcu` or `base-consensus`. Snapshot load-test runs default to `base-consensus`; other benchmark runs default to `prevent-late-fcu`. + ## 🎯 Choosing the Right Configuration - **Development/Testing**: Use `examples/` configurations for focused testing diff --git a/configs/examples/load-test-config.yml b/configs/examples/load-test-config.yml new file mode 100644 index 00000000..77074cdd --- /dev/null +++ b/configs/examples/load-test-config.yml @@ -0,0 +1,20 @@ +transaction_submission_rpcs: + - "http://localhost:8545" +query_rpc: "http://localhost:8545" +txpool_nodes: [] +flashblocks_ws: "ws://localhost:7111" + +sender_count: 10 +target_gps: 500000000 +duration: "60s" +funding_amount: "10000000000000000000" + +transactions: + - weight: 70 + type: transfer + - weight: 20 + type: calldata + max_size: 256 + - weight: 10 + type: precompile + target: sha256 diff --git a/configs/examples/load-test.yml b/configs/examples/load-test.yml index 6743ca60..143b9774 100644 --- a/configs/examples/load-test.yml +++ b/configs/examples/load-test.yml @@ -4,16 +4,8 @@ payloads: - name: Load Test type: load-test id: load-test - sender_count: 10 - transactions: - - weight: 70 - type: transfer - - weight: 20 - type: calldata - max_size: 256 - - weight: 10 - type: precompile - target: sha256 + network: devnet + config_file: ./load-test-config.yml benchmarks: - variables: @@ -27,3 +19,5 @@ benchmarks: value: 10 - type: gas_limit value: 1000000000 + - type: target_gps + value: 500000000 diff --git a/report/src/components/ChartGrid.tsx b/report/src/components/ChartGrid.tsx index bd525329..d339d146 100644 --- a/report/src/components/ChartGrid.tsx +++ b/report/src/components/ChartGrid.tsx @@ -20,6 +20,22 @@ function resolveMetricKey( return key; } } + + const metricKeys = chartData.flatMap((d) => Object.keys(d.ExecutionMetrics)); + for (const key of keys) { + const quantileSuffix = key.match(/(_quantile_\d+(?:_\d+)?)$/)?.[1] ?? ""; + const metricPrefix = quantileSuffix + ? key.slice(0, -quantileSuffix.length) + : key; + const labeledMetricKey = metricKeys.find( + (metricKey) => + metricKey.startsWith(`${metricPrefix}_`) && + (!quantileSuffix || metricKey.endsWith(quantileSuffix)), + ); + if (labeledMetricKey) { + return labeledMetricKey; + } + } return primaryKey; } diff --git a/report/src/components/ConfigurationTags.tsx b/report/src/components/ConfigurationTags.tsx index 5d3591b7..5568120b 100644 --- a/report/src/components/ConfigurationTags.tsx +++ b/report/src/components/ConfigurationTags.tsx @@ -7,6 +7,61 @@ interface ConfigurationTagsProps { className?: string; } +const CONFIG_LABELS: Record = { + BlockTimeMilliseconds: "Block Time", + ConsensusTimingMode: "Consensus Timing", + GasLimit: "Gas Limit", + NodeType: "Node Type", + TargetGPS: "Target Gas/s", + TransactionPayload: "Transaction Payload", + ValidatorNodeType: "Validator Node Type", +}; + +const CONFIG_ORDER = [ + "TargetGPS", + "GasLimit", + "BlockTimeMilliseconds", + "ConsensusTimingMode", + "NodeType", + "ValidatorNodeType", + "TransactionPayload", + "Roles", +]; + +const configLabel = (key: string): string => + CONFIG_LABELS[key] ?? camelToTitleCase(key); + +const configValue = (key: string, value: unknown): string => { + if (key === "GasLimit") { + return formatValue(Number(value), "gas"); + } + if (key === "TargetGPS") { + return formatValue(Number(value), "gas/s"); + } + if (key === "BlockTimeMilliseconds") { + return formatValue(Number(value), "ms"); + } + return String(formatLabel(`${value}`)); +}; + +const configEntries = (testConfig: Record) => + Object.entries(testConfig || {}) + .filter(([key, value]) => key !== "BenchmarkRun" && value !== "") + .sort(([a], [b]) => { + const aIndex = CONFIG_ORDER.indexOf(a); + const bIndex = CONFIG_ORDER.indexOf(b); + if (aIndex === -1 && bIndex === -1) { + return a.localeCompare(b); + } + if (aIndex === -1) { + return 1; + } + if (bIndex === -1) { + return -1; + } + return aIndex - bIndex; + }); + const ConfigurationTags = ({ testConfig, clientVersion, @@ -25,28 +80,18 @@ const ConfigurationTags = ({ {clientVersion} )} - {Object.entries(testConfig || {}) - .filter(([k]) => k !== "BenchmarkRun" && k !== "GasLimit") - .map(([key, value]) => ( - - - {camelToTitleCase(key)}: - - {key === "GasLimit" ? ( - - {formatValue(Number(value), "gas")} - - ) : ( - - {String(formatLabel(`${value}`))} - - )} + {configEntries(testConfig).map(([key, value]) => ( + + + {configLabel(key)}: - ))} + {configValue(key, value)} + + ))} ); }; diff --git a/report/src/components/RunList.tsx b/report/src/components/RunList.tsx index 0265129b..6c485ba4 100644 --- a/report/src/components/RunList.tsx +++ b/report/src/components/RunList.tsx @@ -291,9 +291,13 @@ const RunList = ({ const statusCounts = groupBy(section.runs, "status"); const sortedRuns = isExpanded ? sortRuns(section.runs) : section.runs; const gasLimit = Number(section.runs?.[0]?.testConfig?.GasLimit); + const targetGasPerSecond = Number( + section.runs?.[0]?.testConfig?.TargetGPS, + ); const blockTimeMilliseconds = Number(section.runs?.[0]?.testConfig?.BlockTimeMilliseconds) || 2000; - const gasPerSecond = gasLimit / (blockTimeMilliseconds / 1000); + const gasPerSecond = + targetGasPerSecond || gasLimit / (blockTimeMilliseconds / 1000); return (
diff --git a/report/src/metricDefinitions.ts b/report/src/metricDefinitions.ts index 58085ebd..52a36b2b 100644 --- a/report/src/metricDefinitions.ts +++ b/report/src/metricDefinitions.ts @@ -192,6 +192,29 @@ export const CHART_CONFIG = { unit: "s", aliases: ["reth_op_rbuilder_state_root_calculation_duration"], }, + reth_base_builder_state_root_calculation_duration_quantile_0_5: { + type: "line", + title: "Builder State Root Calculation Duration p50", + description: "p50 time taken to calculate the state root", + unit: "s", + }, + reth_base_builder_state_root_calculation_duration_quantile_0_9: { + type: "line", + title: "Builder State Root Calculation Duration p90", + description: "p90 time taken to calculate the state root", + unit: "s", + }, + reth_base_builder_state_root_calculation_duration_quantile_0_99: { + type: "line", + title: "Builder State Root Calculation Duration p99", + description: "p99 time taken to calculate the state root", + unit: "s", + }, + reth_base_builder_state_root_time_per_gas_ratio_quantile_0_9: { + type: "line", + title: "Builder State Root Time per Gas p90", + description: "p90 state-root calculation time divided by gas processed", + }, reth_base_builder_sequencer_tx_duration_avg: { type: "line", title: "Builder Sequencer Tx Duration", @@ -224,6 +247,135 @@ export const CHART_CONFIG = { description: "Average gas headroom percentage across flashblocks", unit: "count", }, + reth_storage_providers_database_save_blocks_total_quantile_0_9: { + type: "line", + title: "Save Blocks Total p90", + description: "p90 total database save-blocks duration", + unit: "s", + }, + reth_storage_providers_database_save_blocks_block_count_last: { + type: "line", + title: "Save Blocks Block Count", + description: + "Number of blocks included in the most recent save-blocks operation", + unit: "blocks", + }, + reth_storage_providers_database_save_blocks_commit_sf_quantile_0_9: { + type: "line", + title: "Save Blocks Static File Commit p90", + description: "p90 static-file commit duration during save-blocks", + unit: "s", + }, + reth_storage_providers_database_save_blocks_commit_mdbx_quantile_0_9: { + type: "line", + title: "Save Blocks MDBX Commit p90", + description: "p90 MDBX commit duration during save-blocks", + unit: "s", + }, + reth_storage_providers_database_save_blocks_write_state_quantile_0_9: { + type: "line", + title: "Save Blocks Write State p90", + description: "p90 state write duration during save-blocks", + unit: "s", + }, + reth_storage_providers_database_save_blocks_write_hashed_state_quantile_0_9: { + type: "line", + title: "Save Blocks Write Hashed State p90", + description: "p90 hashed-state write duration during save-blocks", + unit: "s", + }, + reth_storage_providers_database_save_blocks_write_trie_updates_quantile_0_9: { + type: "line", + title: "Save Blocks Write Trie Updates p90", + description: "p90 trie-update write duration during save-blocks", + unit: "s", + }, + reth_storage_providers_database_save_blocks_sf_quantile_0_9: { + type: "line", + title: "Save Blocks Static Files p90", + description: "p90 static-file save-blocks duration", + unit: "s", + }, + reth_trie_leaves_added_quantile_0_9: { + type: "line", + title: "Trie Leaves Added p90", + description: "p90 trie leaves added", + unit: "count", + }, + reth_trie_branches_added_quantile_0_9: { + type: "line", + title: "Trie Branches Added p90", + description: "p90 trie branches added", + unit: "count", + }, + reth_tree_root_sparse_trie_total_duration_histogram_quantile_0_9: { + type: "line", + title: "Sparse Trie Total Duration p90", + description: "p90 sparse-trie total duration", + unit: "s", + aliases: ["reth_tree_root_sparse_trie_total_duration_histogram"], + }, + reth_tree_root_sparse_trie_final_update_duration_histogram_quantile_0_9: { + type: "line", + title: "Sparse Trie Final Update Duration p90", + description: "p90 sparse-trie final update duration", + unit: "s", + aliases: ["reth_tree_root_sparse_trie_final_update_duration_histogram"], + }, + reth_parallel_sparse_trie_subtrie_hash_update_latency_quantile_0_9: { + type: "line", + title: "Sparse Trie Subtrie Hash Update p90", + description: "p90 subtrie hash update latency", + unit: "s", + }, + reth_parallel_sparse_trie_subtrie_upper_hash_latency_quantile_0_9: { + type: "line", + title: "Sparse Trie Subtrie Upper Hash p90", + description: "p90 subtrie upper-hash latency", + unit: "s", + }, + reth_trie_proof_task_storage_worker_idle_time_seconds_quantile_0_9: { + type: "line", + title: "Trie Proof Storage Worker Idle p90", + description: "p90 trie-proof storage worker idle time", + unit: "s", + }, + reth_trie_proof_task_account_worker_idle_time_seconds_quantile_0_9: { + type: "line", + title: "Trie Proof Account Worker Idle p90", + description: "p90 trie-proof account worker idle time", + unit: "s", + }, + reth_trie_proof_task_blinded_storage_nodes_quantile_0_9: { + type: "line", + title: "Trie Proof Blinded Storage Nodes p90", + description: "p90 blinded storage nodes handled by trie proof tasks", + unit: "count", + }, + reth_trie_proof_task_blinded_account_nodes_quantile_0_9: { + type: "line", + title: "Trie Proof Blinded Account Nodes p90", + description: "p90 blinded account nodes handled by trie proof tasks", + unit: "count", + }, + reth_trie_cursor_overall_duration_quantile_0_9: { + type: "line", + title: "Trie Cursor Overall Duration p90", + description: "p90 trie cursor overall duration", + unit: "s", + }, + reth_trie_hashed_cursor_overall_duration_quantile_0_9: { + type: "line", + title: "Trie Hashed Cursor Overall Duration p90", + description: "p90 hashed trie cursor overall duration", + unit: "s", + }, + reth_db_freelist: { + type: "line", + title: "MDBX Freelist", + description: "MDBX freelist size", + unit: "count", + }, reth_sync_state_provider_total_storage_fetch_latency_avg: { type: "line", title: "Validator Storage Load Latency", @@ -268,6 +420,31 @@ const CHART_CONFIG_ORDER: (keyof typeof CHART_CONFIG)[] = [ "chain/storage/commits.50-percentile", "chain/snapshot/commits.50-percentile", "chain/triedb/commits.50-percentile", + "reth_base_builder_state_root_calculation_duration_quantile_0_5", + "reth_base_builder_state_root_calculation_duration_quantile_0_9", + "reth_base_builder_state_root_calculation_duration_quantile_0_99", + "reth_base_builder_state_root_time_per_gas_ratio_quantile_0_9", + "reth_storage_providers_database_save_blocks_total_quantile_0_9", + "reth_storage_providers_database_save_blocks_block_count_last", + "reth_storage_providers_database_save_blocks_commit_sf_quantile_0_9", + "reth_storage_providers_database_save_blocks_commit_mdbx_quantile_0_9", + "reth_storage_providers_database_save_blocks_write_state_quantile_0_9", + "reth_storage_providers_database_save_blocks_write_hashed_state_quantile_0_9", + "reth_storage_providers_database_save_blocks_write_trie_updates_quantile_0_9", + "reth_storage_providers_database_save_blocks_sf_quantile_0_9", + "reth_trie_leaves_added_quantile_0_9", + "reth_trie_branches_added_quantile_0_9", + "reth_tree_root_sparse_trie_total_duration_histogram_quantile_0_9", + "reth_tree_root_sparse_trie_final_update_duration_histogram_quantile_0_9", + "reth_parallel_sparse_trie_subtrie_hash_update_latency_quantile_0_9", + "reth_parallel_sparse_trie_subtrie_upper_hash_latency_quantile_0_9", + "reth_trie_proof_task_storage_worker_idle_time_seconds_quantile_0_9", + "reth_trie_proof_task_account_worker_idle_time_seconds_quantile_0_9", + "reth_trie_proof_task_blinded_storage_nodes_quantile_0_9", + "reth_trie_proof_task_blinded_account_nodes_quantile_0_9", + "reth_trie_cursor_overall_duration_quantile_0_9", + "reth_trie_hashed_cursor_overall_duration_quantile_0_9", + "reth_db_freelist", ]; export const SORTED_CHART_CONFIG: [string, ChartConfig][] = Object.entries( diff --git a/report/src/pages/RunIndex.tsx b/report/src/pages/RunIndex.tsx index 30caa7e2..f20d03e8 100644 --- a/report/src/pages/RunIndex.tsx +++ b/report/src/pages/RunIndex.tsx @@ -51,7 +51,12 @@ const RunIndexInner = ({ benchmarkRuns }: { benchmarkRuns: BenchmarkRuns }) => { } }); - const groups = groupBy(matchedRuns, "testConfig.GasLimit"); + const groups = groupBy(matchedRuns, (run) => { + if (run.testConfig.TargetGPS) { + return `target-gps-${run.testConfig.TargetGPS}`; + } + return `gas-limit-${run.testConfig.GasLimit}`; + }); // Build sections array with diffKeyStart const sections: { diff --git a/runner/benchmark/benchmark.go b/runner/benchmark/benchmark.go index f1614af3..2e7631fd 100644 --- a/runner/benchmark/benchmark.go +++ b/runner/benchmark/benchmark.go @@ -70,6 +70,21 @@ func NewParamsFromValues(assignments map[string]interface{}) (*types.RunParams, } else { return nil, fmt.Errorf("invalid gas limit %s", v) } + case "target_gps": + if vInt, ok := v.(int); ok { + params.TargetGPS = uint64(vInt) + } else { + return nil, fmt.Errorf("invalid target gps %s", v) + } + case "consensus_timing": + if vStr, ok := v.(string); ok { + if vStr != "" && vStr != types.ConsensusTimingModePreventLateFCU && vStr != types.ConsensusTimingModeBaseConsensus { + return nil, fmt.Errorf("invalid consensus timing %s", v) + } + params.ConsensusTimingMode = vStr + } else { + return nil, fmt.Errorf("invalid consensus timing %s", v) + } case "env": if vStr, ok := v.(string); ok { entries := strings.Split(vStr, ";") diff --git a/runner/benchmark/matrix.go b/runner/benchmark/matrix.go index 430aa892..dbd406c6 100644 --- a/runner/benchmark/matrix.go +++ b/runner/benchmark/matrix.go @@ -3,6 +3,8 @@ package benchmark import ( "fmt" "time" + + "github.com/base/base-bench/runner/network/types" ) type ThresholdConfig struct { @@ -121,6 +123,7 @@ func ResolveTestRunsFromMatrix(c TestDefinition, testFileName string, config *Be if c.Tags != nil { params.Tags = *c.Tags } + params.ConsensusTimingMode = consensusTimingMode(params, c, config) testParams[i] = TestRun{ ID: id, @@ -153,3 +156,25 @@ func ResolveTestRunsFromMatrix(c TestDefinition, testFileName string, config *Be return testParams, nil } + +func consensusTimingMode(params *types.RunParams, definition TestDefinition, config *BenchmarkConfig) string { + if params.ConsensusTimingMode != "" { + return params.ConsensusTimingMode + } + if isSnapshotLoadTest(params.PayloadID, definition, config) { + return types.ConsensusTimingModeBaseConsensus + } + return types.ConsensusTimingModePreventLateFCU +} + +func isSnapshotLoadTest(payloadID string, definition TestDefinition, config *BenchmarkConfig) bool { + if definition.Snapshot == nil || definition.Snapshot.Command == "" { + return false + } + for _, transactionPayload := range config.TransactionPayloads { + if transactionPayload.ID == payloadID && transactionPayload.Type == "load-test" { + return true + } + } + return false +} diff --git a/runner/benchmark/matrix_test.go b/runner/benchmark/matrix_test.go index 5440dccf..590c131e 100644 --- a/runner/benchmark/matrix_test.go +++ b/runner/benchmark/matrix_test.go @@ -6,6 +6,7 @@ import ( "github.com/base/base-bench/runner/benchmark" "github.com/base/base-bench/runner/network/types" + "github.com/base/base-bench/runner/payload" "github.com/stretchr/testify/require" ) @@ -29,10 +30,11 @@ func TestResolveTestRunsFromMatrix(t *testing.T) { want: []benchmark.TestRun{ { Params: types.RunParams{ - NodeType: "geth", - PayloadID: "simple", - GasLimit: benchmark.DefaultParams.GasLimit, - BlockTime: 1 * time.Second, + NodeType: "geth", + PayloadID: "simple", + GasLimit: benchmark.DefaultParams.GasLimit, + BlockTime: 1 * time.Second, + ConsensusTimingMode: types.ConsensusTimingModePreventLateFCU, }, }, }, @@ -55,34 +57,66 @@ func TestResolveTestRunsFromMatrix(t *testing.T) { want: []benchmark.TestRun{ { Params: types.RunParams{ - NodeType: "geth", - GasLimit: benchmark.DefaultParams.GasLimit, - PayloadID: "simple", - BlockTime: 1 * time.Second, + NodeType: "geth", + GasLimit: benchmark.DefaultParams.GasLimit, + PayloadID: "simple", + BlockTime: 1 * time.Second, + ConsensusTimingMode: types.ConsensusTimingModePreventLateFCU, }, }, { Params: types.RunParams{ - NodeType: "erigon", - GasLimit: benchmark.DefaultParams.GasLimit, - PayloadID: "simple", - BlockTime: 1 * time.Second, + NodeType: "erigon", + GasLimit: benchmark.DefaultParams.GasLimit, + PayloadID: "simple", + BlockTime: 1 * time.Second, + ConsensusTimingMode: types.ConsensusTimingModePreventLateFCU, }, }, { Params: types.RunParams{ - NodeType: "geth", - GasLimit: benchmark.DefaultParams.GasLimit, - PayloadID: "complex", - BlockTime: 1 * time.Second, + NodeType: "geth", + GasLimit: benchmark.DefaultParams.GasLimit, + PayloadID: "complex", + BlockTime: 1 * time.Second, + ConsensusTimingMode: types.ConsensusTimingModePreventLateFCU, }, }, { Params: types.RunParams{ - NodeType: "erigon", - GasLimit: benchmark.DefaultParams.GasLimit, - PayloadID: "complex", - BlockTime: 1 * time.Second, + NodeType: "erigon", + GasLimit: benchmark.DefaultParams.GasLimit, + PayloadID: "complex", + BlockTime: 1 * time.Second, + ConsensusTimingMode: types.ConsensusTimingModePreventLateFCU, + }, + }, + }, + wantErr: false, + }, + { + name: "config with target gps", + config: benchmark.TestDefinition{ + Variables: []benchmark.Param{ + { + ParamType: "payload", + Value: "load-test", + }, + { + ParamType: "target_gps", + Value: 200_000_000, + }, + }, + }, + want: []benchmark.TestRun{ + { + Params: types.RunParams{ + NodeType: "geth", + PayloadID: "load-test", + GasLimit: benchmark.DefaultParams.GasLimit, + TargetGPS: 200_000_000, + BlockTime: 1 * time.Second, + ConsensusTimingMode: types.ConsensusTimingModePreventLateFCU, }, }, }, @@ -159,6 +193,123 @@ func TestResolveTestRunsFromMatrix(t *testing.T) { } } +func TestResolveTestRunsFromMatrixExpandsTargetGPSValues(t *testing.T) { + config := &benchmark.BenchmarkConfig{Name: "snapshot load test"} + definition := benchmark.TestDefinition{ + Variables: []benchmark.Param{ + { + ParamType: "payload", + Value: "mainnet-snapshot-load-test", + }, + { + ParamType: "node_type", + Value: "builder", + }, + { + ParamType: "gas_limit", + Value: 1_200_000_000, + }, + { + ParamType: "target_gps", + Values: []interface{}{ + 80_000_000, + 400_000_000, + 1_200_000_000, + }, + }, + }, + } + + runs, err := benchmark.ResolveTestRunsFromMatrix(definition, "snapshot-load-test.yml", config) + require.NoError(t, err) + require.Len(t, runs, 3) + + require.Equal(t, uint64(1_200_000_000), runs[0].Params.GasLimit) + require.Equal(t, uint64(1_200_000_000), runs[1].Params.GasLimit) + require.Equal(t, uint64(1_200_000_000), runs[2].Params.GasLimit) + require.Equal(t, uint64(80_000_000), runs[0].Params.TargetGPS) + require.Equal(t, uint64(400_000_000), runs[1].Params.TargetGPS) + require.Equal(t, uint64(1_200_000_000), runs[2].Params.TargetGPS) + require.Equal(t, types.ConsensusTimingModePreventLateFCU, runs[0].Params.ConsensusTimingMode) + require.Equal(t, types.ConsensusTimingModePreventLateFCU, runs[1].Params.ConsensusTimingMode) + require.Equal(t, types.ConsensusTimingModePreventLateFCU, runs[2].Params.ConsensusTimingMode) +} + +func TestResolveTestRunsFromMatrixDefaultsSnapshotLoadTestsToBaseConsensusTiming(t *testing.T) { + config := &benchmark.BenchmarkConfig{ + Name: "snapshot load test", + TransactionPayloads: []payload.Definition{ + { + ID: "mainnet-snapshot-load-test", + Type: "load-test", + }, + }, + } + definition := benchmark.TestDefinition{ + Snapshot: &benchmark.SnapshotDefinition{ + Command: "./setup-initial-snapshot.sh --network mainnet", + }, + Variables: []benchmark.Param{ + { + ParamType: "payload", + Value: "mainnet-snapshot-load-test", + }, + }, + } + + runs, err := benchmark.ResolveTestRunsFromMatrix(definition, "snapshot-load-test.yml", config) + require.NoError(t, err) + require.Len(t, runs, 1) + require.Equal(t, types.ConsensusTimingModeBaseConsensus, runs[0].Params.ConsensusTimingMode) +} + +func TestResolveTestRunsFromMatrixAllowsSnapshotLoadTestsToOverrideConsensusTiming(t *testing.T) { + config := &benchmark.BenchmarkConfig{ + Name: "snapshot load test", + TransactionPayloads: []payload.Definition{ + { + ID: "mainnet-snapshot-load-test", + Type: "load-test", + }, + }, + } + definition := benchmark.TestDefinition{ + Snapshot: &benchmark.SnapshotDefinition{ + Command: "./setup-initial-snapshot.sh --network mainnet", + }, + Variables: []benchmark.Param{ + { + ParamType: "payload", + Value: "mainnet-snapshot-load-test", + }, + { + ParamType: "consensus_timing", + Value: types.ConsensusTimingModePreventLateFCU, + }, + }, + } + + runs, err := benchmark.ResolveTestRunsFromMatrix(definition, "snapshot-load-test.yml", config) + require.NoError(t, err) + require.Len(t, runs, 1) + require.Equal(t, types.ConsensusTimingModePreventLateFCU, runs[0].Params.ConsensusTimingMode) +} + +func TestResolveTestRunsFromMatrixRejectsInvalidConsensusTiming(t *testing.T) { + config := &benchmark.BenchmarkConfig{Name: "benchmark"} + definition := benchmark.TestDefinition{ + Variables: []benchmark.Param{ + { + ParamType: "consensus_timing", + Value: "aligned", + }, + }, + } + + _, err := benchmark.ResolveTestRunsFromMatrix(definition, "benchmark.yml", config) + require.ErrorContains(t, err, "invalid consensus timing") +} + func stringPtr(s string) *string { return &s } diff --git a/runner/benchmark/result_metadata.go b/runner/benchmark/result_metadata.go index e31bdbff..03336e26 100644 --- a/runner/benchmark/result_metadata.go +++ b/runner/benchmark/result_metadata.go @@ -51,7 +51,9 @@ func (runs *RunGroup) AddResult(testIdx int, runResult RunResult) { } const ( - BenchmarkRunTag = "BenchmarkRun" + BenchmarkRunTag = "BenchmarkRun" + LoadTestResultsDir = "load-tests" + LoadTestTimestampLayout = "2006-01-02-15-04-05" ) func RunGroupFromTestPlans(testPlans []TestPlan, machineInfo *MachineInfo) RunGroup { diff --git a/runner/clients/baserethnode/client.go b/runner/clients/baserethnode/client.go index 844d33b3..3a84cff4 100644 --- a/runner/clients/baserethnode/client.go +++ b/runner/clients/baserethnode/client.go @@ -276,3 +276,7 @@ func (r *BaseRethNodeClient) FlashblocksClient() types.FlashblocksClient { func (r *BaseRethNodeClient) SupportsFlashblocks() bool { return true } + +func (r *BaseRethNodeClient) FlashblocksWsURL() string { + return "" +} diff --git a/runner/clients/builder/client.go b/runner/clients/builder/client.go index b229905c..605f93e1 100644 --- a/runner/clients/builder/client.go +++ b/runner/clients/builder/client.go @@ -128,3 +128,12 @@ func (r *BuilderClient) FlashblocksClient() types.FlashblocksClient { func (r *BuilderClient) SupportsFlashblocks() bool { return false } + +// FlashblocksWsURL returns the local WebSocket URL of the flashblocks server +// hosted by the builder. +func (r *BuilderClient) FlashblocksWsURL() string { + if r.websocketPort == 0 { + return "" + } + return fmt.Sprintf("ws://localhost:%d", r.websocketPort) +} diff --git a/runner/clients/builder/metrics.go b/runner/clients/builder/metrics.go index d6f83366..d4bcd739 100644 --- a/runner/clients/builder/metrics.go +++ b/runner/clients/builder/metrics.go @@ -5,11 +5,17 @@ import ( "context" "fmt" "io" + "math" "net/http" + "sort" + "strconv" + "strings" + "unicode" "github.com/base/base-bench/runner/metrics" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" + io_prometheus_client "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" ) @@ -19,6 +25,7 @@ type metricsCollector struct { client *ethclient.Client metrics []metrics.BlockMetrics metricsPort int + prevMetrics map[string]*io_prometheus_client.Metric } func newMetricsCollector(log log.Logger, client *ethclient.Client, metricsPort int) metrics.Collector { @@ -27,6 +34,7 @@ func newMetricsCollector(log log.Logger, client *ethclient.Client, metricsPort i client: client, metricsPort: metricsPort, metrics: make([]metrics.BlockMetrics, 0), + prevMetrics: make(map[string]*io_prometheus_client.Metric), } } @@ -40,15 +48,48 @@ func (r *metricsCollector) GetMetrics() []metrics.BlockMetrics { func (r *metricsCollector) GetMetricTypes() map[string]bool { return map[string]bool{ - "reth_sync_execution_execution_duration": true, - "reth_sync_block_validation_state_root_duration": true, - "reth_op_rbuilder_block_built_success": true, - "reth_op_rbuilder_flashblock_count": true, - "reth_op_rbuilder_total_block_built_duration": true, - "reth_op_rbuilder_flashblock_build_duration": true, - "reth_op_rbuilder_state_root_calculation_duration": true, - "reth_op_rbuilder_sequencer_tx_duration": true, - "reth_op_rbuilder_payload_tx_simulation_duration": true, + "reth_sync_execution_execution_duration": true, + "reth_sync_block_validation_state_root_duration": true, + "reth_op_rbuilder_block_built_success": true, + "reth_op_rbuilder_flashblock_count": true, + "reth_op_rbuilder_total_block_built_duration": true, + "reth_op_rbuilder_flashblock_build_duration": true, + "reth_op_rbuilder_state_root_calculation_duration": true, + "reth_op_rbuilder_sequencer_tx_duration": true, + "reth_op_rbuilder_payload_tx_simulation_duration": true, + "reth_base_builder_block_built_success": true, + "reth_base_builder_flashblock_count": true, + "reth_base_builder_total_block_built_duration": true, + "reth_base_builder_flashblock_build_duration": true, + "reth_base_builder_state_root_calculation_duration": true, + "reth_base_builder_state_root_time_per_gas_ratio": true, + "reth_base_builder_sequencer_tx_duration": true, + "reth_base_builder_payload_transaction_simulation_duration": true, + "reth_base_builder_payload_tx_simulation_duration": true, + "reth_base_builder_tx_simulation_duration": true, + "reth_base_builder_payload_num_tx_gauge": true, + "reth_base_builder_flashblock_gas_headroom_pct": true, + "reth_storage_providers_database_save_blocks_total": true, + "reth_storage_providers_database_save_blocks_block_count_last": true, + "reth_storage_providers_database_save_blocks_commit_sf": true, + "reth_storage_providers_database_save_blocks_commit_mdbx": true, + "reth_storage_providers_database_save_blocks_write_state": true, + "reth_storage_providers_database_save_blocks_write_hashed_state": true, + "reth_storage_providers_database_save_blocks_write_trie_updates": true, + "reth_storage_providers_database_save_blocks_sf": true, + "reth_trie_leaves_added": true, + "reth_trie_branches_added": true, + "reth_tree_root_sparse_trie_total_duration_histogram": true, + "reth_tree_root_sparse_trie_final_update_duration_histogram": true, + "reth_parallel_sparse_trie_subtrie_hash_update_latency": true, + "reth_parallel_sparse_trie_subtrie_upper_hash_latency": true, + "reth_trie_proof_task_storage_worker_idle_time_seconds": true, + "reth_trie_proof_task_account_worker_idle_time_seconds": true, + "reth_trie_proof_task_blinded_storage_nodes": true, + "reth_trie_proof_task_blinded_account_nodes": true, + "reth_trie_cursor_overall_duration": true, + "reth_trie_hashed_cursor_overall_duration": true, + "reth_db_freelist": true, } } @@ -73,21 +114,139 @@ func (r *metricsCollector) Collect(ctx context.Context, m *metrics.BlockMetrics) } metricTypes := r.GetMetricTypes() + m.SetPreviousPrometheusMetrics(r.prevMetrics) for _, metric := range metrics { name := metric.GetName() if metricTypes[name] { metricVal := metric.GetMetric() - if len(metricVal) != 1 { - r.log.Warn("expected 1 metric, got %d for metric %s", len(metricVal), name) - } - err = m.UpdatePrometheusMetric(name, metricVal[0]) - if err != nil { - r.log.Warn("failed to add metric %s: %s", name, err) + for _, value := range metricVal { + metricName := prometheusMetricName(name, value) + addPrometheusMetric(r.log, m, metricName, value) + + if metricName != name && len(metricVal) == 1 { + addPrometheusMetric(r.log, m, name, value) + } } } } + r.prevMetrics = m.PreviousPrometheusMetrics() r.metrics = append(r.metrics, *m.Copy()) return nil } + +func addPrometheusMetric(log log.Logger, m *metrics.BlockMetrics, name string, metric *io_prometheus_client.Metric) { + addHistogramQuantiles(m, name, metric, m.PreviousPrometheusMetric(name)) + + err := m.UpdatePrometheusMetric(name, metric) + if err != nil { + log.Warn("failed to add metric %s: %s", name, err) + } + addSummaryQuantiles(m, name, metric) +} + +func prometheusMetricName(name string, metric *io_prometheus_client.Metric) string { + labels := metric.GetLabel() + if len(labels) == 0 { + return name + } + + parts := make([]string, 0, len(labels)) + for _, label := range labels { + parts = append(parts, sanitizeMetricPart(label.GetName())+"_"+sanitizeMetricPart(label.GetValue())) + } + sort.Strings(parts) + return name + "_" + strings.Join(parts, "_") +} + +func addSummaryQuantiles(m *metrics.BlockMetrics, name string, metric *io_prometheus_client.Metric) { + if metric.Summary == nil { + return + } + + for _, quantile := range metric.Summary.GetQuantile() { + m.AddExecutionMetric(name+"_quantile_"+formatQuantile(quantile.GetQuantile()), quantile.GetValue()) + } +} + +func addHistogramQuantiles(m *metrics.BlockMetrics, name string, metric *io_prometheus_client.Metric, prevMetric *io_prometheus_client.Metric) { + if metric.Histogram == nil { + return + } + + var prevHistogram *io_prometheus_client.Histogram + if prevMetric != nil { + prevHistogram = prevMetric.Histogram + } + + for _, quantile := range []float64{0.5, 0.9, 0.99} { + value, ok := histogramQuantile(quantile, metric.Histogram, prevHistogram) + if ok { + m.AddExecutionMetric(name+"_quantile_"+formatQuantile(quantile), value) + } + } +} + +func histogramQuantile(quantile float64, histogram *io_prometheus_client.Histogram, prevHistogram *io_prometheus_client.Histogram) (float64, bool) { + if histogram == nil || histogram.SampleCount == nil || len(histogram.Bucket) == 0 { + return 0, false + } + + prevCount := uint64(0) + if prevHistogram != nil && prevHistogram.SampleCount != nil { + prevCount = *prevHistogram.SampleCount + } + count := deltaUint64(*histogram.SampleCount, prevCount) + if count == 0 { + return 0, false + } + + rank := quantile * float64(count) + lastFiniteUpperBound := 0.0 + hasFiniteUpperBound := false + for i, bucket := range histogram.Bucket { + if bucket.CumulativeCount == nil || bucket.UpperBound == nil { + continue + } + if !math.IsInf(*bucket.UpperBound, 0) { + lastFiniteUpperBound = *bucket.UpperBound + hasFiniteUpperBound = true + } + + prevBucketCount := uint64(0) + if prevHistogram != nil && i < len(prevHistogram.Bucket) && prevHistogram.Bucket[i].CumulativeCount != nil { + prevBucketCount = *prevHistogram.Bucket[i].CumulativeCount + } + + bucketCount := deltaUint64(*bucket.CumulativeCount, prevBucketCount) + if float64(bucketCount) >= rank { + if math.IsInf(*bucket.UpperBound, 0) { + return lastFiniteUpperBound, hasFiniteUpperBound + } + return *bucket.UpperBound, true + } + } + + return 0, false +} + +func deltaUint64(current uint64, previous uint64) uint64 { + if current < previous { + return current + } + return current - previous +} + +func formatQuantile(quantile float64) string { + return strings.ReplaceAll(strconv.FormatFloat(quantile, 'f', -1, 64), ".", "_") +} + +func sanitizeMetricPart(value string) string { + return strings.Map(func(r rune) rune { + if unicode.IsLetter(r) || unicode.IsDigit(r) { + return r + } + return '_' + }, value) +} diff --git a/runner/clients/builder/metrics_test.go b/runner/clients/builder/metrics_test.go new file mode 100644 index 00000000..c5d462e4 --- /dev/null +++ b/runner/clients/builder/metrics_test.go @@ -0,0 +1,98 @@ +package builder + +import ( + "math" + "testing" + + io_prometheus_client "github.com/prometheus/client_model/go" +) + +func TestPrometheusMetricNameSortsAndSanitizesLabels(t *testing.T) { + metric := &io_prometheus_client.Metric{ + Label: []*io_prometheus_client.LabelPair{ + {Name: stringPtr("type"), Value: stringPtr("account.worker")}, + {Name: stringPtr("configname"), Value: stringPtr("mainnet/snapshot")}, + }, + } + + got := prometheusMetricName("reth_example_metric", metric) + want := "reth_example_metric_configname_mainnet_snapshot_type_account_worker" + if got != want { + t.Fatalf("prometheusMetricName() = %q, want %q", got, want) + } +} + +func TestHistogramQuantileUsesIntervalBuckets(t *testing.T) { + prev := histogramMetric( + 10, + bucket(1, 5), + bucket(2, 9), + bucket(3, 10), + ) + current := histogramMetric( + 20, + bucket(1, 8), + bucket(2, 17), + bucket(3, 20), + ) + + got, ok := histogramQuantile(0.5, current.Histogram, prev.Histogram) + if !ok { + t.Fatal("histogramQuantile() did not return a value") + } + if got != 2 { + t.Fatalf("histogramQuantile(0.5) = %f, want 2", got) + } + + got, ok = histogramQuantile(0.9, current.Histogram, prev.Histogram) + if !ok { + t.Fatal("histogramQuantile() did not return a value") + } + if got != 3 { + t.Fatalf("histogramQuantile(0.9) = %f, want 3", got) + } +} + +func TestHistogramQuantileAvoidsInfiniteUpperBound(t *testing.T) { + current := histogramMetric( + 10, + bucket(1, 5), + bucket(math.Inf(1), 10), + ) + + got, ok := histogramQuantile(0.9, current.Histogram, nil) + if !ok { + t.Fatal("histogramQuantile() did not return a value") + } + if got != 1 { + t.Fatalf("histogramQuantile(0.9) = %f, want 1", got) + } +} + +func histogramMetric(count uint64, buckets ...*io_prometheus_client.Bucket) *io_prometheus_client.Metric { + return &io_prometheus_client.Metric{ + Histogram: &io_prometheus_client.Histogram{ + SampleCount: uint64Ptr(count), + Bucket: buckets, + }, + } +} + +func bucket(upperBound float64, count uint64) *io_prometheus_client.Bucket { + return &io_prometheus_client.Bucket{ + UpperBound: float64Ptr(upperBound), + CumulativeCount: uint64Ptr(count), + } +} + +func stringPtr(value string) *string { + return &value +} + +func float64Ptr(value float64) *float64 { + return &value +} + +func uint64Ptr(value uint64) *uint64 { + return &value +} diff --git a/runner/clients/common/proxy/proxy.go b/runner/clients/common/proxy/proxy.go index f261c39e..bf3f8147 100644 --- a/runner/clients/common/proxy/proxy.go +++ b/runner/clients/common/proxy/proxy.go @@ -17,9 +17,11 @@ import ( "fmt" "io" "net/http" + "sync" "github.com/base/base-bench/runner/network/mempool" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" ethTypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" ) @@ -31,6 +33,22 @@ type ProxyServer struct { pendingTxs []*ethTypes.Transaction clientURL string mempool *mempool.StaticWorkloadMempool + nextNonce map[common.Address]uint64 + mu sync.Mutex +} + +type rpcRequest struct { + Method string `json:"method"` + Params json.RawMessage `json:"params"` + ID interface{} `json:"id"` + JSONRPC string `json:"jsonrpc"` +} + +type rpcResponse struct { + JSONRPC string `json:"jsonrpc"` + ID interface{} `json:"id"` + Result json.RawMessage `json:"result,omitempty"` + Error interface{} `json:"error,omitempty"` } func NewProxyServer(clientURL string, log log.Logger, port int, mempool *mempool.StaticWorkloadMempool) *ProxyServer { @@ -39,6 +57,7 @@ func NewProxyServer(clientURL string, log log.Logger, port int, mempool *mempool log: log, port: port, mempool: mempool, + nextNonce: make(map[common.Address]uint64), } } @@ -62,13 +81,31 @@ func (p *ProxyServer) Run(ctx context.Context) error { } func (p *ProxyServer) PendingTxs() []*ethTypes.Transaction { - return p.pendingTxs + p.mu.Lock() + defer p.mu.Unlock() + + txs := make([]*ethTypes.Transaction, len(p.pendingTxs)) + copy(txs, p.pendingTxs) + return txs } func (p *ProxyServer) ClearPendingTxs() { + p.mu.Lock() + defer p.mu.Unlock() + p.pendingTxs = make([]*ethTypes.Transaction, 0) } +func (p *ProxyServer) DrainPendingTxs() []*ethTypes.Transaction { + p.mu.Lock() + defer p.mu.Unlock() + + txs := make([]*ethTypes.Transaction, len(p.pendingTxs)) + copy(txs, p.pendingTxs) + p.pendingTxs = make([]*ethTypes.Transaction, 0) + return txs +} + // Stop stops both the proxy server and the underlying client func (p *ProxyServer) Stop() { if p.server != nil { @@ -89,13 +126,13 @@ func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) { return } - var request struct { - Method string `json:"method"` - Params json.RawMessage `json:"params"` - ID interface{} `json:"id"` - JSONRPC string `json:"jsonrpc"` + if len(body) > 0 && body[0] == '[' { + p.handleBatchRequest(w, body) + return } + var request rpcRequest + if err := json.Unmarshal(body, &request); err != nil { http.Error(w, "Error parsing request", http.StatusBadRequest) return @@ -108,11 +145,7 @@ func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) { } if handled { - resp := struct { - JSONRPC string `json:"jsonrpc"` - ID interface{} `json:"id"` - Result json.RawMessage `json:"result"` - }{ + resp := rpcResponse{ JSONRPC: request.JSONRPC, ID: request.ID, Result: response, @@ -164,6 +197,78 @@ func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) { p.DebugResponse(request.Method, request.Params, respBody) } +func (p *ProxyServer) handleBatchRequest(w http.ResponseWriter, body []byte) { + var requests []rpcRequest + if err := json.Unmarshal(body, &requests); err != nil { + http.Error(w, "Error parsing batch request", http.StatusBadRequest) + return + } + + responses := make([]rpcResponse, 0, len(requests)) + for _, request := range requests { + handled, result, err := p.OverrideRequest(request.Method, request.Params) + response := rpcResponse{ + JSONRPC: "2.0", + ID: request.ID, + } + if err != nil { + response.Error = map[string]interface{}{"code": -32000, "message": err.Error()} + } else if handled { + response.Result = result + } else { + forwardedResponse, err := p.forwardRPCRequest(request) + if err != nil { + response.Error = map[string]interface{}{"code": -32000, "message": err.Error()} + } else { + response = forwardedResponse + } + } + responses = append(responses, response) + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(responses); err != nil { + p.log.Error("Error encoding batch response", "err", err) + } +} + +func (p *ProxyServer) forwardRPCRequest(request rpcRequest) (rpcResponse, error) { + body, err := json.Marshal(request) + if err != nil { + return rpcResponse{}, fmt.Errorf("failed to marshal upstream request: %w", err) + } + + resp, err := http.Post(p.clientURL, "application/json", bytes.NewReader(body)) + if err != nil { + return rpcResponse{}, fmt.Errorf("failed to forward request: %w", err) + } + defer func() { + if err := resp.Body.Close(); err != nil { + p.log.Error("Error closing response body", "err", err) + } + }() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return rpcResponse{}, fmt.Errorf("failed to read upstream response: %w", err) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return rpcResponse{}, fmt.Errorf("upstream request returned status %d: %s", resp.StatusCode, string(respBody)) + } + + var forwardedResponse rpcResponse + if err := json.Unmarshal(respBody, &forwardedResponse); err != nil { + return rpcResponse{}, fmt.Errorf("failed to decode upstream response: %w", err) + } + if forwardedResponse.JSONRPC == "" { + forwardedResponse.JSONRPC = request.JSONRPC + } + if forwardedResponse.ID == nil { + forwardedResponse.ID = request.ID + } + return forwardedResponse, nil +} + func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) (bool, json.RawMessage, error) { switch method { case "eth_getTransactionCount": @@ -171,8 +276,22 @@ func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) if err := json.Unmarshal(rawParams, ¶ms); err != nil { return false, nil, fmt.Errorf("failed to unmarshal params: %w", err) } + if len(params) == 0 { + return false, nil, fmt.Errorf("no params found") + } - nonce := p.mempool.GetTransactionCount(common.HexToAddress(params[0])) + address := common.HexToAddress(params[0]) + nonce, err := p.upstreamTransactionCount(rawParams) + if err != nil { + if observedNonce, ok := p.observedTransactionCount(address); ok { + jsonResponse, _ := json.Marshal(fmt.Sprintf("0x%x", observedNonce)) + return true, jsonResponse, nil + } + return false, nil, err + } + if observedNonce, ok := p.observedTransactionCount(address); ok && observedNonce > nonce { + nonce = observedNonce + } jsonResponse, _ := json.Marshal(fmt.Sprintf("0x%x", nonce)) return true, jsonResponse, nil @@ -204,7 +323,7 @@ func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) return false, nil, fmt.Errorf("failed to decode transaction: %w", err) } - p.pendingTxs = append(p.pendingTxs, &tx) + p.recordPendingTransaction(&tx) txHash := tx.Hash().Hex() jsonResponse, _ := json.Marshal(txHash) @@ -214,10 +333,101 @@ func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) } } +func (p *ProxyServer) upstreamTransactionCount(rawParams json.RawMessage) (uint64, error) { + body, err := json.Marshal(struct { + JSONRPC string `json:"jsonrpc"` + ID int `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` + }{ + JSONRPC: "2.0", + ID: 1, + Method: "eth_getTransactionCount", + Params: rawParams, + }) + if err != nil { + return 0, fmt.Errorf("failed to marshal upstream nonce request: %w", err) + } + + resp, err := http.Post(p.clientURL, "application/json", bytes.NewReader(body)) + if err != nil { + return 0, fmt.Errorf("failed to fetch upstream transaction count: %w", err) + } + defer func() { + if err := resp.Body.Close(); err != nil { + p.log.Error("Error closing response body", "err", err) + } + }() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return 0, fmt.Errorf("failed to read upstream transaction count response: %w", err) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return 0, fmt.Errorf("upstream transaction count request returned status %d: %s", resp.StatusCode, string(respBody)) + } + + var rpcResp struct { + Result json.RawMessage `json:"result"` + Error *struct { + Code int `json:"code"` + Message string `json:"message"` + } `json:"error"` + } + if err := json.Unmarshal(respBody, &rpcResp); err != nil { + return 0, fmt.Errorf("failed to decode upstream transaction count response: %w", err) + } + if rpcResp.Error != nil { + return 0, fmt.Errorf("upstream transaction count error %d: %s", rpcResp.Error.Code, rpcResp.Error.Message) + } + + var nonceHex string + if err := json.Unmarshal(rpcResp.Result, &nonceHex); err != nil { + return 0, fmt.Errorf("failed to decode upstream transaction count result: %w", err) + } + nonce, err := hexutil.DecodeUint64(nonceHex) + if err != nil { + return 0, fmt.Errorf("failed to parse upstream transaction count %q: %w", nonceHex, err) + } + return nonce, nil +} + +func (p *ProxyServer) observedTransactionCount(address common.Address) (uint64, bool) { + p.mu.Lock() + defer p.mu.Unlock() + + nonce, ok := p.nextNonce[address] + return nonce, ok +} + +func (p *ProxyServer) recordPendingTransaction(tx *ethTypes.Transaction) { + from, err := ethTypes.Sender(ethTypes.LatestSignerForChainID(tx.ChainId()), tx) + if err != nil { + p.log.Warn("failed to recover sender for observed transaction", "err", err, "hash", tx.Hash()) + p.mu.Lock() + p.pendingTxs = append(p.pendingTxs, tx) + p.mu.Unlock() + return + } + + nextNonce := tx.Nonce() + 1 + p.mu.Lock() + p.pendingTxs = append(p.pendingTxs, tx) + if nextNonce > p.nextNonce[from] { + p.nextNonce[from] = nextNonce + } + p.mu.Unlock() +} + func (p *ProxyServer) DebugResponse(method string, params json.RawMessage, respBody []byte) { p.log.Debug("method", "method", method) p.log.Debug("params", "params", params) + if !bytes.HasPrefix(respBody, []byte{0x1f, 0x8b}) { + p.log.Debug("Response body", "body", string(respBody)) + return + } + gzipReader, err := gzip.NewReader(bytes.NewReader(respBody)) if err != nil { p.log.Error("Error creating gzip reader", "err", err) diff --git a/runner/clients/common/proxy/proxy_test.go b/runner/clients/common/proxy/proxy_test.go new file mode 100644 index 00000000..013f3ebe --- /dev/null +++ b/runner/clients/common/proxy/proxy_test.go @@ -0,0 +1,368 @@ +package proxy + +import ( + "bytes" + "encoding/json" + "math/big" + "net/http" + "net/http/httptest" + "testing" + + "github.com/base/base-bench/runner/network/mempool" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" +) + +func TestHandleBatchRequestCapturesRawTransactions(t *testing.T) { + chainID := big.NewInt(8453) + tx := signedTestTx(t, chainID) + rawTx, err := tx.MarshalBinary() + if err != nil { + t.Fatalf("marshal tx: %v", err) + } + + server := NewProxyServer( + "http://127.0.0.1:8545", + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), chainID), + ) + + body, err := json.Marshal([]map[string]any{ + { + "jsonrpc": "2.0", + "id": 0, + "method": "eth_sendRawTransaction", + "params": []string{hexutil.Encode(rawTx)}, + }, + }) + if err != nil { + t.Fatalf("marshal request: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + server.handleRequest(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var responses []struct { + Result string `json:"result"` + Error map[string]any `json:"error"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &responses); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + if len(responses) != 1 { + t.Fatalf("expected 1 response, got %d", len(responses)) + } + if responses[0].Error != nil { + t.Fatalf("expected successful response, got error %v", responses[0].Error) + } + if responses[0].Result != tx.Hash().Hex() { + t.Fatalf("expected tx hash %s, got %s", tx.Hash().Hex(), responses[0].Result) + } + + pending := server.DrainPendingTxs() + if len(pending) != 1 { + t.Fatalf("expected 1 pending tx, got %d", len(pending)) + } + if pending[0].Hash() != tx.Hash() { + t.Fatalf("expected pending tx %s, got %s", tx.Hash(), pending[0].Hash()) + } +} + +func TestHandleBatchRequestForwardsPassThroughMethods(t *testing.T) { + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req rpcRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + t.Fatalf("decode upstream request: %v", err) + } + if req.Method != "eth_chainId" { + t.Fatalf("expected eth_chainId, got %s", req.Method) + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]any{ + "jsonrpc": "2.0", + "id": req.ID, + "result": "0x2105", + }); err != nil { + t.Fatalf("encode upstream response: %v", err) + } + })) + defer upstream.Close() + + server := NewProxyServer( + upstream.URL, + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), big.NewInt(8453)), + ) + + body, err := json.Marshal([]map[string]any{ + { + "jsonrpc": "2.0", + "id": 7, + "method": "eth_chainId", + "params": []string{}, + }, + }) + if err != nil { + t.Fatalf("marshal request: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + server.handleRequest(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var responses []struct { + Result string `json:"result"` + Error map[string]any `json:"error"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &responses); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + if len(responses) != 1 { + t.Fatalf("expected 1 response, got %d", len(responses)) + } + if responses[0].Error != nil { + t.Fatalf("expected successful response, got error %v", responses[0].Error) + } + if responses[0].Result != "0x2105" { + t.Fatalf("expected forwarded result 0x2105, got %s", responses[0].Result) + } +} + +func TestHandleBatchRequestSupportsMixedForwardAndCapture(t *testing.T) { + chainID := big.NewInt(8453) + tx := signedTestTx(t, chainID) + rawTx, err := tx.MarshalBinary() + if err != nil { + t.Fatalf("marshal tx: %v", err) + } + + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req rpcRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + t.Fatalf("decode upstream request: %v", err) + } + if req.Method != "eth_gasPrice" { + t.Fatalf("expected eth_gasPrice, got %s", req.Method) + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]any{ + "jsonrpc": "2.0", + "id": req.ID, + "result": "0x3b9aca00", + }); err != nil { + t.Fatalf("encode upstream response: %v", err) + } + })) + defer upstream.Close() + + server := NewProxyServer( + upstream.URL, + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), chainID), + ) + + body, err := json.Marshal([]map[string]any{ + { + "jsonrpc": "2.0", + "id": 0, + "method": "eth_gasPrice", + "params": []string{}, + }, + { + "jsonrpc": "2.0", + "id": 1, + "method": "eth_sendRawTransaction", + "params": []string{hexutil.Encode(rawTx)}, + }, + }) + if err != nil { + t.Fatalf("marshal request: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + server.handleRequest(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var responses []struct { + Result string `json:"result"` + Error map[string]any `json:"error"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &responses); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + if len(responses) != 2 { + t.Fatalf("expected 2 responses, got %d", len(responses)) + } + if responses[0].Error != nil { + t.Fatalf("expected successful forwarded response, got error %v", responses[0].Error) + } + if responses[0].Result != "0x3b9aca00" { + t.Fatalf("expected forwarded gas price, got %s", responses[0].Result) + } + if responses[1].Error != nil { + t.Fatalf("expected successful captured tx response, got error %v", responses[1].Error) + } + if responses[1].Result != tx.Hash().Hex() { + t.Fatalf("expected tx hash %s, got %s", tx.Hash().Hex(), responses[1].Result) + } + + pending := server.DrainPendingTxs() + if len(pending) != 1 { + t.Fatalf("expected 1 pending tx, got %d", len(pending)) + } + if pending[0].Hash() != tx.Hash() { + t.Fatalf("expected pending tx %s, got %s", tx.Hash(), pending[0].Hash()) + } +} + +func TestGetTransactionCountForwardsUpstreamNonce(t *testing.T) { + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "result": "0xfa", + }); err != nil { + t.Fatalf("encode upstream response: %v", err) + } + })) + defer upstream.Close() + + server := NewProxyServer( + upstream.URL, + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), big.NewInt(8453)), + ) + + result := callProxyRPC(t, server, "eth_getTransactionCount", []string{common.Address{1}.Hex(), "pending"}) + if result != "0xfa" { + t.Fatalf("expected upstream nonce 0xfa, got %s", result) + } +} + +func TestGetTransactionCountIncludesObservedPendingTransactions(t *testing.T) { + chainID := big.NewInt(8453) + tx := signedTestTxWithNonce(t, chainID, 250) + from, err := types.Sender(types.LatestSignerForChainID(chainID), tx) + if err != nil { + t.Fatalf("recover sender: %v", err) + } + + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "result": "0xfa", + }); err != nil { + t.Fatalf("encode upstream response: %v", err) + } + })) + defer upstream.Close() + + server := NewProxyServer( + upstream.URL, + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), chainID), + ) + + rawTx, err := tx.MarshalBinary() + if err != nil { + t.Fatalf("marshal tx: %v", err) + } + callProxyRPC(t, server, "eth_sendRawTransaction", []string{hexutil.Encode(rawTx)}) + + result := callProxyRPC(t, server, "eth_getTransactionCount", []string{from.Hex(), "pending"}) + if result != "0xfb" { + t.Fatalf("expected observed nonce 0xfb, got %s", result) + } +} + +func callProxyRPC(t *testing.T, server *ProxyServer, method string, params any) string { + t.Helper() + + body, err := json.Marshal(map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": params, + }) + if err != nil { + t.Fatalf("marshal request: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + rec := httptest.NewRecorder() + server.handleRequest(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var response struct { + Result string `json:"result"` + Error map[string]any `json:"error"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &response); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + if response.Error != nil { + t.Fatalf("expected successful response, got error %v", response.Error) + } + return response.Result +} + +func signedTestTx(t *testing.T, chainID *big.Int) *types.Transaction { + return signedTestTxWithNonce(t, chainID, 0) +} + +func signedTestTxWithNonce(t *testing.T, chainID *big.Int, nonce uint64) *types.Transaction { + t.Helper() + + key, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("generate key: %v", err) + } + + tx := types.NewTx(&types.DynamicFeeTx{ + ChainID: chainID, + Nonce: nonce, + GasTipCap: big.NewInt(1), + GasFeeCap: big.NewInt(1), + Gas: 21_000, + To: &common.Address{1}, + Value: big.NewInt(1), + }) + + signed, err := types.SignTx(tx, types.NewIsthmusSigner(chainID), key) + if err != nil { + t.Fatalf("sign tx: %v", err) + } + return signed +} diff --git a/runner/clients/geth/client.go b/runner/clients/geth/client.go index 89e40b67..a4efd25b 100644 --- a/runner/clients/geth/client.go +++ b/runner/clients/geth/client.go @@ -280,3 +280,7 @@ func (g *GethClient) FlashblocksClient() types.FlashblocksClient { func (g *GethClient) SupportsFlashblocks() bool { return false } + +func (g *GethClient) FlashblocksWsURL() string { + return "" +} diff --git a/runner/clients/reth/client.go b/runner/clients/reth/client.go index 7a4a25c8..35739d4e 100644 --- a/runner/clients/reth/client.go +++ b/runner/clients/reth/client.go @@ -296,3 +296,7 @@ func (r *RethClient) FlashblocksClient() types.FlashblocksClient { func (r *RethClient) SupportsFlashblocks() bool { return true } + +func (r *RethClient) FlashblocksWsURL() string { + return "" +} diff --git a/runner/clients/types/types.go b/runner/clients/types/types.go index a0be0080..c88b6212 100644 --- a/runner/clients/types/types.go +++ b/runner/clients/types/types.go @@ -32,4 +32,7 @@ type ExecutionClient interface { SetHead(ctx context.Context, blockNumber uint64) error FlashblocksClient() FlashblocksClient // returns nil for clients that don't support flashblocks SupportsFlashblocks() bool // returns true if the client supports receiving flashblock payloads + // FlashblocksWsURL returns the local WebSocket URL hosted by this client, + // or an empty string when the client does not expose one. + FlashblocksWsURL() string } diff --git a/runner/metrics/metrics_interface.go b/runner/metrics/metrics_interface.go index 00c0811f..350dd3f9 100644 --- a/runner/metrics/metrics_interface.go +++ b/runner/metrics/metrics_interface.go @@ -51,6 +51,21 @@ func (m *BlockMetrics) Copy() *BlockMetrics { } } +func (m *BlockMetrics) SetPreviousPrometheusMetrics(prev map[string]*io_prometheus_client.Metric) { + m.prevMetrics = make(map[string]*io_prometheus_client.Metric, len(prev)) + maps.Copy(m.prevMetrics, prev) +} + +func (m *BlockMetrics) PreviousPrometheusMetrics() map[string]*io_prometheus_client.Metric { + prevMetrics := make(map[string]*io_prometheus_client.Metric, len(m.prevMetrics)) + maps.Copy(prevMetrics, m.prevMetrics) + return prevMetrics +} + +func (m *BlockMetrics) PreviousPrometheusMetric(name string) *io_prometheus_client.Metric { + return m.prevMetrics[name] +} + func (m *BlockMetrics) UpdatePrometheusMetric(name string, value *io_prometheus_client.Metric) error { if value.Histogram != nil { // get the average change in sum divided by the average change in count diff --git a/runner/network/consensus/client.go b/runner/network/consensus/client.go index a2308230..76cb5d8b 100644 --- a/runner/network/consensus/client.go +++ b/runner/network/consensus/client.go @@ -23,6 +23,8 @@ type ConsensusClientOptions struct { GasLimitSetup uint64 // ParallelTxBatches is the number of parallel batches for sending transactions ParallelTxBatches int + // ConsensusTimingMode controls how FCU and getPayload calls are scheduled. + ConsensusTimingMode string } // BaseConsensusClient contains common functionality shared between different consensus client implementations. diff --git a/runner/network/consensus/sequencer_consensus.go b/runner/network/consensus/sequencer_consensus.go index 2e2ed3c9..81faaed4 100644 --- a/runner/network/consensus/sequencer_consensus.go +++ b/runner/network/consensus/sequencer_consensus.go @@ -112,7 +112,7 @@ func marshalBinaryWithSignature(info *derive.L1BlockInfo, signature []byte) ([]b return w.Bytes(), nil } -func (f *SequencerConsensusClient) generatePayloadAttributes(sequencerTxs [][]byte, isSetupPayload bool, nextBoundary time.Time, blockTime time.Duration) (*eth.PayloadAttributes, *common.Hash, error) { +func (f *SequencerConsensusClient) generatePayloadAttributes(sequencerTxs [][]byte, isSetupPayload bool, timestamp uint64) (*eth.PayloadAttributes, *common.Hash, error) { gasLimit := eth.Uint64Quantity(f.options.GasLimit) if isSetupPayload { gasLimit = eth.Uint64Quantity(f.options.GasLimitSetup) @@ -121,12 +121,6 @@ func (f *SequencerConsensusClient) generatePayloadAttributes(sequencerTxs [][]by var b8 eth.Bytes8 copy(b8[:], eip1559.EncodeHolocene1559Params(50, 1)) - // Use nextBoundary (the block-time-aligned wall-clock boundary we slept to) plus - // one block time as the block timestamp. This guarantees the FCU always arrives - // at the same point relative to the block deadline, eliminating the jitter that - // causes "FCU arrived too late" and empty blocks when sendTxs takes variable time. - timestamp := uint64(nextBoundary.Add(blockTime).Unix()) - number := uint64(0) time := uint64(0) baseFee := big.NewInt(1) @@ -210,6 +204,29 @@ func (f *SequencerConsensusClient) generatePayloadAttributes(sequencerTxs [][]by return payloadAttrs, &root, nil } +func nextPayloadTimestamp(lastTimestamp uint64, now time.Time, blockTime time.Duration) uint64 { + blockTimeSeconds := uint64(blockTime / time.Second) + if blockTimeSeconds == 0 { + blockTimeSeconds = 1 + } + + // Match the sequencer cadence when possible: the next payload timestamp is + // one block time after the parent. If transaction draining made that slot + // too close to wall clock, skip ahead so the builder still has time to work. + timestamp := lastTimestamp + blockTimeSeconds + minLead := blockTime / 2 + if minLead <= 0 { + minLead = time.Second + } + + minDeadline := now.Add(minLead) + for time.Unix(int64(timestamp), 0).Before(minDeadline) { + timestamp += blockTimeSeconds + } + + return timestamp +} + // Propose starts block generation, waits BlockTime, and generates a block. func (f *SequencerConsensusClient) Propose(ctx context.Context, blockMetrics *metrics.BlockMetrics, isSetupPayload bool) (*engine.ExecutableData, error) { startTime := time.Now() @@ -265,16 +282,32 @@ func (f *SequencerConsensusClient) Propose(ctx context.Context, blockMetrics *me f.log.Info("Sent transactions", "duration", duration, "num_txs", len(sendTxs)) blockMetrics.AddExecutionMetric(networktypes.SendTxsLatencyMetric, duration) - now := time.Now() - nextBoundary := now.Truncate(f.options.BlockTime).Add(f.options.BlockTime) - sleepDuration := time.Until(nextBoundary) - f.log.Info("Aligning to next block time boundary before FCU", "sleep", sleepDuration, "block_time", f.options.BlockTime) - time.Sleep(sleepDuration) - startBlockBuildingTime := time.Now() + var payloadTimestamp uint64 + var blockDeadline time.Time + if f.options.ConsensusTimingMode == networktypes.ConsensusTimingModeBaseConsensus { + payloadTimestamp = nextPayloadTimestamp(f.lastTimestamp, startBlockBuildingTime, f.options.BlockTime) + blockDeadline = time.Unix(int64(payloadTimestamp), 0) + } else { + nextBoundary := startBlockBuildingTime.Truncate(f.options.BlockTime).Add(f.options.BlockTime) + sleepDuration := time.Until(nextBoundary) + f.log.Info("Aligning to next block time boundary before FCU", "sleep", sleepDuration, "block_time", f.options.BlockTime) + if sleepDuration > 0 { + time.Sleep(sleepDuration) + } + startBlockBuildingTime = time.Now() + + // Use nextBoundary (the block-time-aligned wall-clock boundary we slept to) plus + // one block time as the block timestamp. This guarantees the FCU always arrives + // at the same point relative to the block deadline, eliminating the jitter that + // causes "FCU arrived too late" and empty blocks when sendTxs takes variable time. + blockDeadline = nextBoundary.Add(f.options.BlockTime) + payloadTimestamp = uint64(blockDeadline.Unix()) + } + f.log.Info("Starting block building") - payloadAttrs, beaconRoot, err := f.generatePayloadAttributes(sequencerTxs, isSetupPayload, nextBoundary, f.options.BlockTime) + payloadAttrs, beaconRoot, err := f.generatePayloadAttributes(sequencerTxs, isSetupPayload, payloadTimestamp) if err != nil { return nil, errors.Wrap(err, "failed to generate payload attributes") } @@ -292,10 +325,11 @@ func (f *SequencerConsensusClient) Propose(ctx context.Context, blockMetrics *me blockMetrics.AddExecutionMetric(networktypes.UpdateForkChoiceLatencyMetric, duration) f.currentPayloadID = payloadID - blockDeadline := nextBoundary.Add(f.options.BlockTime) waitDuration := time.Until(blockDeadline) f.log.Info("Waiting for block deadline", "wait", waitDuration) - time.Sleep(waitDuration) + if waitDuration > 0 { + time.Sleep(waitDuration) + } startTime = time.Now() diff --git a/runner/network/consensus/sequencer_consensus_test.go b/runner/network/consensus/sequencer_consensus_test.go new file mode 100644 index 00000000..54c8421f --- /dev/null +++ b/runner/network/consensus/sequencer_consensus_test.go @@ -0,0 +1,36 @@ +package consensus + +import ( + "testing" + "time" +) + +func TestNextPayloadTimestampUsesNextBlockTime(t *testing.T) { + now := time.Unix(100, int64(100*time.Millisecond)) + + timestamp := nextPayloadTimestamp(100, now, 2*time.Second) + + if timestamp != 102 { + t.Fatalf("expected next payload timestamp 102, got %d", timestamp) + } +} + +func TestNextPayloadTimestampSkipsTooCloseSlot(t *testing.T) { + now := time.Unix(101, int64(250*time.Millisecond)) + + timestamp := nextPayloadTimestamp(100, now, 2*time.Second) + + if timestamp != 104 { + t.Fatalf("expected next payload timestamp 104, got %d", timestamp) + } +} + +func TestNextPayloadTimestampCatchesUpFromWallClock(t *testing.T) { + now := time.Unix(120, 0) + + timestamp := nextPayloadTimestamp(100, now, 2*time.Second) + + if timestamp != 122 { + t.Fatalf("expected next payload timestamp 122, got %d", timestamp) + } +} diff --git a/runner/network/network_benchmark.go b/runner/network/network_benchmark.go index 2bf13775..fd83d113 100644 --- a/runner/network/network_benchmark.go +++ b/runner/network/network_benchmark.go @@ -44,9 +44,9 @@ type NetworkBenchmark struct { testConfig *benchtypes.TestConfig proofConfig *benchmark.ProofProgramOptions - transactionPayload payload.Definition - ports portmanager.PortManager - flashblocksBlockTime string + transactionPayload payload.Definition + ports portmanager.PortManager + flashblocksBlockTime string } // NewNetworkBenchmark creates a new network benchmark and initializes the payload worker and consensus client diff --git a/runner/network/sequencer_benchmark.go b/runner/network/sequencer_benchmark.go index 29aa8c28..45c0fd82 100644 --- a/runner/network/sequencer_benchmark.go +++ b/runner/network/sequencer_benchmark.go @@ -15,6 +15,7 @@ import ( "github.com/base/base-bench/runner/network/proofprogram/fakel1" benchtypes "github.com/base/base-bench/runner/network/types" "github.com/base/base-bench/runner/payload" + payloadworker "github.com/base/base-bench/runner/payload/worker" "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" @@ -24,6 +25,8 @@ import ( "github.com/pkg/errors" ) +const gracefulWorkerShutdownTimeout = 90 * time.Second + type sequencerBenchmark struct { log log.Logger sequencerClient types.ExecutionClient @@ -204,10 +207,11 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. go func() { consensusClient := consensus.NewSequencerConsensusClient(nb.log, sequencerClient.Client(), sequencerClient.AuthClient(), mempool, consensus.ConsensusClientOptions{ - BlockTime: params.BlockTime, - GasLimit: params.GasLimit, - GasLimitSetup: 1e9, // 1G gas - ParallelTxBatches: nb.config.Config.ParallelTxBatches(), + BlockTime: params.BlockTime, + GasLimit: params.GasLimit, + GasLimitSetup: 1e9, // 1G gas + ParallelTxBatches: nb.config.Config.ParallelTxBatches(), + ConsensusTimingMode: params.ConsensusTimingMode, }, headBlockHash, headBlockNumber, l1Chain, nb.config.BatcherAddr()) payloads := make([]engine.ExecutableData, 0) @@ -236,54 +240,66 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. payloads = append(payloads, *lastSetupPayload) - blockMetrics := metrics.NewBlockMetrics() - pendingTxs := 0 + completionWorker, runUntilWorkerDone := transactionWorker.(payloadworker.CompletionWorker) + if runUntilWorkerDone { + nb.log.Info("Running benchmark blocks until payload worker completes") + blockIndex := uint64(1) + runUntilDoneLoop: + for { + select { + case <-completionWorker.Done(): + if err := completionWorker.Err(); err != nil { + errChan <- errors.Wrap(err, "payload worker failed") + return + } + nb.log.Info("Payload worker completed", "blocks", blockIndex-1) + break runUntilDoneLoop + default: + } - // run for a few blocks - for i := 0; i < params.NumBlocks; i++ { - blockMetrics.SetBlockNumber(uint64(i) + 1) - txsSent, err := transactionWorker.SendTxs(benchmarkCtx, pendingTxs) - if err != nil { - nb.log.Warn("failed to send transactions", "err", err) - errChan <- err - return + payload, updatedPendingTxs, err := nb.proposeBenchmarkBlock( + benchmarkCtx, + transactionWorker, + consensusClient, + metricsCollector, + blockIndex, + pendingTxs, + ) + if err != nil { + errChan <- err + return + } + pendingTxs = updatedPendingTxs + payloads = append(payloads, *payload) + blockIndex++ } - - payload, err := consensusClient.Propose(benchmarkCtx, blockMetrics, false) - if err != nil { - errChan <- err - return + } else { + // run for a few blocks + for i := 0; i < params.NumBlocks; i++ { + payload, updatedPendingTxs, err := nb.proposeBenchmarkBlock( + benchmarkCtx, + transactionWorker, + consensusClient, + metricsCollector, + uint64(i)+1, + pendingTxs, + ) + if err != nil { + errChan <- err + return + } + pendingTxs = updatedPendingTxs + payloads = append(payloads, *payload) } - if payload == nil { - errChan <- errors.New("received nil payload from consensus client") + if err := nb.settleGracefulWorkerShutdown(benchmarkCtx, transactionWorker, consensusClient, pendingTxs); err != nil { + errChan <- err return } - - // Track how many user txs are still pending in the node's mempool. - // payload.Transactions includes the L1 info deposit tx, so user txs = total - 1. - userTxsIncluded := len(payload.Transactions) - 1 - if userTxsIncluded < 0 { - userTxsIncluded = 0 - } - pendingTxs = pendingTxs + txsSent - userTxsIncluded - if pendingTxs < 0 { - pendingTxs = 0 - } - - log.Info("Sleeping for block time", "block_time", params.BlockTime) - time.Sleep(params.BlockTime) - - err = metricsCollector.Collect(benchmarkCtx, blockMetrics) - if err != nil { - nb.log.Error("Failed to collect metrics", "error", err) - } - payloads = append(payloads, *payload) } - err = consensusClient.Stop(benchmarkCtx) - if err != nil { + if err := consensusClient.Stop(benchmarkCtx); err != nil { nb.log.Warn("failed to stop consensus client", "err", err) } @@ -309,6 +325,114 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. } } +func (nb *sequencerBenchmark) proposeBenchmarkBlock( + ctx context.Context, + transactionWorker payloadworker.Worker, + consensusClient *consensus.SequencerConsensusClient, + metricsCollector metrics.Collector, + blockIndex uint64, + pendingTxs int, +) (*engine.ExecutableData, int, error) { + blockMetrics := metrics.NewBlockMetrics() + blockMetrics.SetBlockNumber(blockIndex) + + txsSent, err := transactionWorker.SendTxs(ctx, pendingTxs) + if err != nil { + nb.log.Warn("failed to send transactions", "err", err) + return nil, pendingTxs, err + } + + payload, err := consensusClient.Propose(ctx, blockMetrics, false) + if err != nil { + return nil, pendingTxs, err + } + if payload == nil { + return nil, pendingTxs, errors.New("received nil payload from consensus client") + } + + // Track how many user txs are still pending in the node's mempool. + // payload.Transactions includes the L1 info deposit tx, so user txs = total - 1. + userTxsIncluded := len(payload.Transactions) - 1 + if userTxsIncluded < 0 { + userTxsIncluded = 0 + } + updatedPendingTxs := pendingTxs + txsSent - userTxsIncluded + if updatedPendingTxs < 0 { + updatedPendingTxs = 0 + } + + if !nb.config.Params.UseBaseConsensusTiming() { + log.Info("Sleeping for block time", "block_time", nb.config.Params.BlockTime) + time.Sleep(nb.config.Params.BlockTime) + } + + if err := metricsCollector.Collect(ctx, blockMetrics); err != nil { + nb.log.Error("Failed to collect metrics", "error", err) + } + + return payload, updatedPendingTxs, nil +} + +func (nb *sequencerBenchmark) settleGracefulWorkerShutdown( + ctx context.Context, + transactionWorker payloadworker.Worker, + consensusClient *consensus.SequencerConsensusClient, + pendingTxs int, +) error { + gracefulWorker, ok := transactionWorker.(payloadworker.GracefulShutdownWorker) + if !ok { + return nil + } + + if err := gracefulWorker.BeginGracefulShutdown(ctx); err != nil { + return errors.Wrap(err, "failed to begin graceful payload worker shutdown") + } + + timeout := time.NewTimer(gracefulWorkerShutdownTimeout) + defer timeout.Stop() + + settlementBlock := 0 + for { + select { + case <-gracefulWorker.Done(): + nb.log.Info("Payload worker stopped gracefully", "settlement_blocks", settlementBlock) + return nil + case <-timeout.C: + nb.log.Warn("Timed out waiting for payload worker to stop gracefully", "settlement_blocks", settlementBlock) + return nil + default: + } + + blockMetrics := metrics.NewBlockMetrics() + txsSent, err := transactionWorker.SendTxs(ctx, pendingTxs) + if err != nil { + return errors.Wrap(err, "failed to collect settlement transactions") + } + + payload, err := consensusClient.Propose(ctx, blockMetrics, true) + if err != nil { + return errors.Wrap(err, "failed to propose settlement block") + } + if payload == nil { + return errors.New("received nil settlement payload from consensus client") + } + + userTxsIncluded := len(payload.Transactions) - 1 + if userTxsIncluded < 0 { + userTxsIncluded = 0 + } + pendingTxs = pendingTxs + txsSent - userTxsIncluded + if pendingTxs < 0 { + pendingTxs = 0 + } + + settlementBlock++ + if !nb.config.Params.UseBaseConsensusTiming() { + time.Sleep(nb.config.Params.BlockTime) + } + } +} + // flashblockCollector implements FlashblockListener to collect flashblocks. type flashblockCollector struct { log log.Logger diff --git a/runner/network/types/types.go b/runner/network/types/types.go index c219c72a..8f78d9e7 100644 --- a/runner/network/types/types.go +++ b/runner/network/types/types.go @@ -40,6 +40,10 @@ type TestConfig struct { PrefundPrivateKey ecdsa.PrivateKey PrefundAmount big.Int + + // LoadTestOutputPath is the optional normal load-test report JSON path used + // by the load-test payload worker. + LoadTestOutputPath string } // BatcherAddr returns the batcher address, computing it if necessary @@ -62,6 +66,9 @@ type RunParams struct { // GasLimit is the gas limit for the benchmark run which is the maximum gas that the sequencer will include per block. GasLimit uint64 + // TargetGPS is the target gas per second for load-test payloads. + TargetGPS uint64 + // PayloadID is a reference to a transaction payload that will be sent to the sequencer. PayloadID string @@ -77,6 +84,9 @@ type RunParams struct { // BlockTime is the time between blocks in the benchmark run. BlockTime time.Duration + // ConsensusTimingMode controls how the fake consensus client schedules FCU/getPayload calls. + ConsensusTimingMode string + // Env is the environment variables for the benchmark run. Env map[string]string @@ -93,6 +103,15 @@ type RunParams struct { ClientBinPath string } +const ( + ConsensusTimingModePreventLateFCU = "prevent-late-fcu" + ConsensusTimingModeBaseConsensus = "base-consensus" +) + +func (p RunParams) UseBaseConsensusTiming() bool { + return p.ConsensusTimingMode == ConsensusTimingModeBaseConsensus +} + func (p RunParams) ToConfig() map[string]interface{} { params := map[string]interface{}{ "NodeType": p.NodeType, @@ -108,6 +127,14 @@ func (p RunParams) ToConfig() map[string]interface{} { params["ValidatorNodeType"] = p.ValidatorNodeType } + if p.TargetGPS > 0 { + params["TargetGPS"] = p.TargetGPS + } + + if p.ConsensusTimingMode != "" { + params["ConsensusTimingMode"] = p.ConsensusTimingMode + } + for k, v := range p.Tags { params[k] = v } diff --git a/runner/payload/factory.go b/runner/payload/factory.go index b8e7176e..7894563f 100644 --- a/runner/payload/factory.go +++ b/runner/payload/factory.go @@ -38,7 +38,7 @@ func NewPayloadWorker(ctx context.Context, log log.Logger, testConfig *benchtype def = &loadtest.LoadTestPayloadDefinition{} } worker, err = loadtest.NewLoadTestPayloadWorker( - log, sequencerClient.ClientURL(), params, privateKey, amount, config, genesis.Config.ChainID, *def) + log, sequencerClient.ClientURL(), sequencerClient.FlashblocksWsURL(), params, privateKey, amount, config, genesis.Config.ChainID, *def, testConfig.LoadTestOutputPath) case "transfer-only": worker, err = transferonly.NewTransferPayloadWorker( ctx, log, sequencerClient.ClientURL(), params, privateKey, amount, &genesis, definition.Params) diff --git a/runner/payload/loadtest/load_test_worker.go b/runner/payload/loadtest/load_test_worker.go index 5b6d62ff..ecb557cb 100644 --- a/runner/payload/loadtest/load_test_worker.go +++ b/runner/payload/loadtest/load_test_worker.go @@ -3,14 +3,16 @@ package loadtest import ( "context" "crypto/ecdsa" - cryptorand "crypto/rand" "encoding/hex" "fmt" "math/big" "os" "os/exec" + "path/filepath" + "strconv" + "sync" + "time" - "github.com/base/base-bench/runner/clients/common/proxy" "github.com/base/base-bench/runner/config" "github.com/base/base-bench/runner/network/mempool" "github.com/base/base-bench/runner/network/types" @@ -21,70 +23,65 @@ import ( ) // LoadTestPayloadDefinition is the YAML payload params for the load-test type. -// Fields map directly to the Rust base-load-test config format. -// The `transactions` field is passed through as raw YAML to support the full -// Rust config schema (transfer, calldata, precompile, erc20, etc.). +// The load-test workload itself lives in a native base-load-tester config file; +// benchmark mode overlays the RPC fields it must control and overlays target_gps +// only when the benchmark matrix specifies one. type LoadTestPayloadDefinition struct { - SenderCount uint64 `yaml:"sender_count"` - FundingAmount string `yaml:"funding_amount"` - Transactions yaml.Node `yaml:"transactions"` -} - -// loadTestConfig is the YAML config written to a temp file for the load-test binary. -type loadTestConfig struct { - RPC string `yaml:"rpc"` - SenderCount uint64 `yaml:"sender_count"` - TargetGPS uint64 `yaml:"target_gps"` - Duration string `yaml:"duration"` - Seed uint64 `yaml:"seed"` - FundingAmount string `yaml:"funding_amount"` - Transactions yaml.Node `yaml:"transactions"` + ConfigFile string `yaml:"config_file"` + Network string `yaml:"network"` } type loadTestPayloadWorker struct { - log log.Logger - prefundSK string - loadTestBin string - elRPCURL string - gasLimit uint64 - blockTimeSec uint64 - params LoadTestPayloadDefinition - mempool *mempool.StaticWorkloadMempool - proxyServer *proxy.ProxyServer - cmd *exec.Cmd - configFilePath string + log log.Logger + prefundSK string + loadTestBin string + elRPCURL string + flashblocksURL string + targetGPS uint64 + params LoadTestPayloadDefinition + mempool *mempool.StaticWorkloadMempool + cmd *exec.Cmd + done chan struct{} + shutdownOnce sync.Once + waitErrMu sync.Mutex + waitErr error + sourceConfigPath string + renderedConfigPath string + outputPath string } // NewLoadTestPayloadWorker creates a worker that runs the base-load-test binary -// as an external transaction generator, capturing transactions via a proxy server. +// as an external transaction generator against the benchmark node's RPC. func NewLoadTestPayloadWorker( log log.Logger, elRPCURL string, + flashblocksURL string, params types.RunParams, prefundedPrivateKey ecdsa.PrivateKey, prefundAmount *big.Int, cfg config.Config, chainID *big.Int, definition LoadTestPayloadDefinition, + outputPath string, ) (worker.Worker, error) { mp := mempool.NewStaticWorkloadMempool(log, chainID) - ps := proxy.NewProxyServer(elRPCURL, log, cfg.ProxyPort(), mp) - blockTimeSec := uint64(params.BlockTime.Seconds()) - if blockTimeSec == 0 { - blockTimeSec = 1 + sourceConfigPath, err := resolveConfigFilePath(cfg.ConfigPath(), definition.ConfigFile) + if err != nil { + return nil, err } w := &loadTestPayloadWorker{ - log: log, - prefundSK: hex.EncodeToString(prefundedPrivateKey.D.Bytes()), - loadTestBin: cfg.LoadTestBinary(), - elRPCURL: elRPCURL, - gasLimit: params.GasLimit, - blockTimeSec: blockTimeSec, - params: definition, - mempool: mp, - proxyServer: ps, + log: log, + prefundSK: hex.EncodeToString(prefundedPrivateKey.D.Bytes()), + loadTestBin: cfg.LoadTestBinary(), + elRPCURL: elRPCURL, + flashblocksURL: flashblocksURL, + targetGPS: params.TargetGPS, + params: definition, + mempool: mp, + sourceConfigPath: sourceConfigPath, + outputPath: outputPath, } return w, nil @@ -95,15 +92,11 @@ func (w *loadTestPayloadWorker) Mempool() mempool.FakeMempool { } func (w *loadTestPayloadWorker) Setup(ctx context.Context) error { - if err := w.proxyServer.Run(ctx); err != nil { - return errors.Wrap(err, "failed to run proxy server") - } - configPath, err := w.writeConfig() if err != nil { return errors.Wrap(err, "failed to write load-test config") } - w.configFilePath = configPath + w.renderedConfigPath = configPath w.log.Info("Starting load test", "binary", w.loadTestBin, "config", configPath) @@ -111,112 +104,197 @@ func (w *loadTestPayloadWorker) Setup(ctx context.Context) error { cmd.Stdout = os.Stdout cmd.Stderr = os.Stdout cmd.Env = append(os.Environ(), fmt.Sprintf("FUNDER_KEY=%s", w.prefundSK)) + if w.outputPath != "" { + if err := os.MkdirAll(filepath.Dir(w.outputPath), 0755); err != nil { + return errors.Wrap(err, "failed to create load-test output directory") + } + cmd.Env = append(cmd.Env, fmt.Sprintf("LOAD_TEST_OUTPUT=%s", w.outputPath)) + } if err := cmd.Start(); err != nil { return errors.Wrap(err, "failed to start load test binary") } w.cmd = cmd + w.done = make(chan struct{}) + go func() { + err := cmd.Wait() + w.waitErrMu.Lock() + w.waitErr = err + w.waitErrMu.Unlock() + close(w.done) + }() return nil } +func (w *loadTestPayloadWorker) BeginGracefulShutdown(ctx context.Context) error { + if w.cmd == nil || w.cmd.Process == nil { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-w.Done(): + return nil + default: + } + + var signalErr error + w.shutdownOnce.Do(func() { + w.log.Info("Stopping load test process gracefully", "pid", w.cmd.Process.Pid, "output", w.outputPath) + signalErr = w.cmd.Process.Signal(os.Interrupt) + }) + if signalErr != nil { + select { + case <-w.Done(): + return nil + default: + } + } + return signalErr +} + +func (w *loadTestPayloadWorker) Done() <-chan struct{} { + if w.done != nil { + return w.done + } + + done := make(chan struct{}) + close(done) + return done +} + +func (w *loadTestPayloadWorker) Err() error { + w.waitErrMu.Lock() + defer w.waitErrMu.Unlock() + return w.waitErr +} + func (w *loadTestPayloadWorker) Stop(ctx context.Context) error { if w.cmd != nil && w.cmd.Process != nil { - w.log.Info("Stopping load test process", "pid", w.cmd.Process.Pid) - if err := w.cmd.Process.Kill(); err != nil { - w.log.Warn("failed to kill load test process", "err", err) - } else { - // Reap the process to avoid zombies. - _, _ = w.cmd.Process.Wait() + if err := w.BeginGracefulShutdown(ctx); err != nil { + w.log.Warn("failed to signal load test process", "err", err) } - } - w.proxyServer.Stop() + select { + case <-w.Done(): + case <-time.After(10 * time.Second): + w.log.Warn("load test process did not stop gracefully, killing", "pid", w.cmd.Process.Pid) + if err := w.cmd.Process.Kill(); err != nil { + w.log.Warn("failed to kill load test process", "err", err) + } + select { + case <-w.Done(): + case <-time.After(5 * time.Second): + w.log.Warn("timed out waiting for killed load test process") + } + } + } - if w.configFilePath != "" { - if err := os.Remove(w.configFilePath); err != nil { - w.log.Warn("failed to remove load-test config", "path", w.configFilePath, "err", err) + if w.renderedConfigPath != "" { + if err := os.Remove(w.renderedConfigPath); err != nil { + w.log.Warn("failed to remove load-test config", "path", w.renderedConfigPath, "err", err) } } return nil } -func (w *loadTestPayloadWorker) SendTxs(ctx context.Context, _ int) (int, error) { - w.log.Info("Collecting txs from load test") - pendingTxs := w.proxyServer.PendingTxs() - w.proxyServer.ClearPendingTxs() - - w.mempool.AddTransactions(pendingTxs) - return len(pendingTxs), nil -} - -// defaultTransactions returns the default transaction mix as a yaml.Node. -func defaultTransactions() yaml.Node { - var node yaml.Node - // Default: 70% transfer, 20% calldata, 10% precompile - defaultYAML := ` -- weight: 70 - type: transfer -- weight: 20 - type: calldata - max_size: 256 -- weight: 10 - type: precompile - target: sha256 -` - if err := yaml.Unmarshal([]byte(defaultYAML), &node); err != nil { - panic(fmt.Sprintf("failed to parse default transactions YAML: %v", err)) - } - // yaml.Unmarshal wraps in a document node; return the inner sequence - if node.Kind == yaml.DocumentNode && len(node.Content) > 0 { - return *node.Content[0] - } - return node +func (w *loadTestPayloadWorker) SendTxs(_ context.Context, _ int) (int, error) { + return 0, nil } -// randomSeed returns a cryptographically random uint64 seed. -func randomSeed() uint64 { - var b [8]byte - if _, err := cryptorand.Read(b[:]); err != nil { - return 42 +func resolveConfigFilePath(benchmarkConfigPath string, loadTestConfigPath string) (string, error) { + if loadTestConfigPath == "" { + return "", errors.New("load-test payload requires config_file") + } + if filepath.IsAbs(loadTestConfigPath) { + return loadTestConfigPath, nil } - return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 | - uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56 + return filepath.Join(filepath.Dir(benchmarkConfigPath), loadTestConfigPath), nil } -// writeConfig generates a temporary YAML config file for the load-test binary -// with the RPC URL pointing to the proxy server. -func (w *loadTestPayloadWorker) writeConfig() (string, error) { - senderCount := w.params.SenderCount - if senderCount == 0 { - senderCount = 10 +func (w *loadTestPayloadWorker) buildConfig() (*yaml.Node, error) { + data, err := os.ReadFile(w.sourceConfigPath) + if err != nil { + return nil, errors.Wrap(err, "failed to read load-test config file") + } + + var doc yaml.Node + if err := yaml.Unmarshal(data, &doc); err != nil { + return nil, errors.Wrap(err, "failed to parse load-test config file") + } + + config, err := mappingRoot(&doc) + if err != nil { + return nil, err } - fundingAmount := w.params.FundingAmount - if fundingAmount == "" { - fundingAmount = "10000000000000000000" + setMappingValue(config, "transaction_submission_rpcs", stringSequenceNode(w.elRPCURL)) + setMappingValue(config, "query_rpc", stringNode(w.elRPCURL)) + + flashblocksURL := w.flashblocksURL + if flashblocksURL == "" { + flashblocksURL = "ws://localhost:7111" + } + setMappingValue(config, "flashblocks_ws", stringNode(flashblocksURL)) + if w.targetGPS > 0 { + setMappingValue(config, "target_gps", uintNode(w.targetGPS)) + } + + return config, nil +} + +func mappingRoot(doc *yaml.Node) (*yaml.Node, error) { + root := doc + if doc.Kind == yaml.DocumentNode { + if len(doc.Content) == 0 { + return nil, errors.New("load-test config file is empty") + } + root = doc.Content[0] } - // Compute target GPS from gas limit and block time - targetGPS := w.gasLimit / w.blockTimeSec + if root.Kind != yaml.MappingNode { + return nil, fmt.Errorf("load-test config file must be a YAML mapping, got kind %d", root.Kind) + } + return root, nil +} - transactions := w.params.Transactions - if transactions.Kind == 0 { - transactions = defaultTransactions() +func setMappingValue(mapping *yaml.Node, key string, value *yaml.Node) { + for i := 0; i < len(mapping.Content)-1; i += 2 { + if mapping.Content[i].Value == key { + mapping.Content[i+1] = value + return + } } + mapping.Content = append(mapping.Content, stringNode(key), value) +} + +func stringNode(value string) *yaml.Node { + return &yaml.Node{Kind: yaml.ScalarNode, Tag: "!!str", Value: value} +} + +func uintNode(value uint64) *yaml.Node { + return &yaml.Node{Kind: yaml.ScalarNode, Tag: "!!int", Value: strconv.FormatUint(value, 10)} +} - config := loadTestConfig{ - RPC: w.proxyServer.ClientURL(), - SenderCount: senderCount, - TargetGPS: targetGPS, - Duration: "99999s", - Seed: randomSeed(), - FundingAmount: fundingAmount, - Transactions: transactions, +func stringSequenceNode(values ...string) *yaml.Node { + node := &yaml.Node{Kind: yaml.SequenceNode, Tag: "!!seq"} + for _, value := range values { + node.Content = append(node.Content, stringNode(value)) } + return node +} - data, err := yaml.Marshal(&config) +// writeConfig generates a temporary YAML config file for the load-test binary +// with benchmark-controlled RPC, timing, and report fields. +func (w *loadTestPayloadWorker) writeConfig() (string, error) { + config, err := w.buildConfig() + if err != nil { + return "", err + } + data, err := yaml.Marshal(config) if err != nil { return "", errors.Wrap(err, "failed to marshal load-test config") } @@ -236,10 +314,8 @@ func (w *loadTestPayloadWorker) writeConfig() (string, error) { } w.log.Info("Generated load-test config", - "sender_count", senderCount, - "target_gps", targetGPS, - "gas_limit", w.gasLimit, - "block_time_sec", w.blockTimeSec, + "source_config", w.sourceConfigPath, + "target_gps", w.targetGPS, ) return tmpFile.Name(), nil diff --git a/runner/payload/loadtest/load_test_worker_test.go b/runner/payload/loadtest/load_test_worker_test.go new file mode 100644 index 00000000..7bef24bb --- /dev/null +++ b/runner/payload/loadtest/load_test_worker_test.go @@ -0,0 +1,154 @@ +package loadtest + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" +) + +func TestBuildConfigOverlaysBenchmarkFieldsAndPreservesLoadTestConfig(t *testing.T) { + configPath := filepath.Join(t.TempDir(), "mainnet-state-weth-usdc-swaps.yaml") + err := os.WriteFile(configPath, []byte(` +transaction_submission_rpcs: + - "http://standalone-submitter.invalid" +query_rpc: "http://standalone-query.invalid" +flashblocks_ws: "ws://standalone-flashblocks.invalid" +target_gps: 123 +duration: "60s" +chain_id: 8453 +sender_count: 250 +in_flight_per_sender: 64 +batch_size: 20 +batch_timeout: "10ms" +seed: 654789 +funding_amount: "200000000000000000" +real_token_setup: + enabled: true + allow_chain_id_8453: true + weth: "0x4200000000000000000000000000000000000006" + weth_amount_per_sender: "50000000000000000" + pair_token: + token: "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" + amount_per_sender: "10000000" + acquisition: + type: uniswap_v3_exact_input + router: "0x2626664c2603336E57B271c5C0b26F421741e481" + fee: 500 + amount_in: "10000000000000000" + min_amount_out: "0" +transactions: + - weight: 50 + type: uniswap_v3 + router: "0x2626664c2603336E57B271c5C0b26F421741e481" + token_in: "0x4200000000000000000000000000000000000006" + token_out: "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" + fee: 500 + min_amount: "10000000000000" + max_amount: "100000000000000" + reverse_min_amount: "100000" + reverse_max_amount: "1000000" + - weight: 50 + type: aerodrome_cl + router: "0xBE6D8f0d05cC4be24d5167a3eF062215bE6D18a5" + token_in: "0x4200000000000000000000000000000000000006" + token_out: "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" + tick_spacing: 100 + min_amount: "10000000000000" + max_amount: "100000000000000" + reverse_min_amount: "100000" + reverse_max_amount: "1000000" +`), 0644) + require.NoError(t, err) + + worker := &loadTestPayloadWorker{ + flashblocksURL: "ws://benchmark-flashblocks.example", + targetGPS: 75_000_000, + elRPCURL: "http://sequencer.example", + sourceConfigPath: configPath, + } + + config, err := worker.buildConfig() + require.NoError(t, err) + + encoded, err := yaml.Marshal(config) + require.NoError(t, err) + output := string(encoded) + + for _, want := range []string{ + "transaction_submission_rpcs:\n - http://sequencer.example", + "query_rpc: http://sequencer.example", + "flashblocks_ws: ws://benchmark-flashblocks.example", + "target_gps: 75000000", + "duration: \"60s\"", + "chain_id: 8453", + "sender_count: 250", + "in_flight_per_sender: 64", + "batch_size: 20", + "batch_timeout: \"10ms\"", + "seed: 654789", + "real_token_setup:", + "allow_chain_id_8453: true", + "type: uniswap_v3", + "type: aerodrome_cl", + "reverse_min_amount: \"100000\"", + } { + require.Contains(t, output, want) + } + for _, oldValue := range []string{ + "standalone-submitter.invalid", + "standalone-query.invalid", + "standalone-flashblocks.invalid", + "target_gps: 123", + } { + require.NotContains(t, output, oldValue) + } +} + +func TestBuildConfigPreservesNativeTargetGPSWhenBenchmarkTargetUnset(t *testing.T) { + configPath := filepath.Join(t.TempDir(), "load-test.yaml") + err := os.WriteFile(configPath, []byte(` +transaction_submission_rpcs: + - "http://standalone-submitter.invalid" +query_rpc: "http://standalone-query.invalid" +flashblocks_ws: "ws://standalone-flashblocks.invalid" +target_gps: 123 +duration: "60s" +transactions: + - weight: 100 + type: transfer +`), 0644) + require.NoError(t, err) + + worker := &loadTestPayloadWorker{ + flashblocksURL: "ws://benchmark-flashblocks.example", + elRPCURL: "http://sequencer.example", + sourceConfigPath: configPath, + } + + config, err := worker.buildConfig() + require.NoError(t, err) + + encoded, err := yaml.Marshal(config) + require.NoError(t, err) + output := string(encoded) + + require.Contains(t, output, "target_gps: 123") + require.Contains(t, output, "duration: \"60s\"") +} + +func TestResolveConfigFilePath(t *testing.T) { + resolved, err := resolveConfigFilePath("/tmp/configs/benchmark.yml", "load-tests/mainnet.yaml") + require.NoError(t, err) + require.Equal(t, "/tmp/configs/load-tests/mainnet.yaml", resolved) + + resolved, err = resolveConfigFilePath("/tmp/configs/benchmark.yml", "/var/load-tests/mainnet.yaml") + require.NoError(t, err) + require.Equal(t, "/var/load-tests/mainnet.yaml", resolved) + + _, err = resolveConfigFilePath("/tmp/configs/benchmark.yml", "") + require.Error(t, err) + require.Contains(t, err.Error(), "config_file") +} diff --git a/runner/payload/txfuzz/tx_fuzz_worker.go b/runner/payload/txfuzz/tx_fuzz_worker.go index b5167d42..87fd5010 100644 --- a/runner/payload/txfuzz/tx_fuzz_worker.go +++ b/runner/payload/txfuzz/tx_fuzz_worker.go @@ -84,8 +84,7 @@ func (t *txFuzzPayloadWorker) Stop(ctx context.Context) error { func (t *txFuzzPayloadWorker) SendTxs(ctx context.Context, _ int) (int, error) { t.log.Info("Sending txs in tx-fuzz mode") - pending := t.proxyServer.PendingTxs() - t.proxyServer.ClearPendingTxs() + pending := t.proxyServer.DrainPendingTxs() t.mempool.AddTransactions(pending) return len(pending), nil diff --git a/runner/payload/worker/types.go b/runner/payload/worker/types.go index 69906ffe..942df001 100644 --- a/runner/payload/worker/types.go +++ b/runner/payload/worker/types.go @@ -18,3 +18,17 @@ type Worker interface { Stop(ctx context.Context) error Mempool() mempool.FakeMempool } + +// GracefulShutdownWorker can stop generating transactions while the benchmark +// sequencer keeps producing settlement blocks. +type GracefulShutdownWorker interface { + BeginGracefulShutdown(ctx context.Context) error + Done() <-chan struct{} +} + +// CompletionWorker owns its own run duration. The benchmark sequencer keeps +// producing blocks until Done closes, then treats Err as the worker result. +type CompletionWorker interface { + Done() <-chan struct{} + Err() error +} diff --git a/runner/service.go b/runner/service.go index 2e1c0309..8916094c 100644 --- a/runner/service.go +++ b/runner/service.go @@ -26,6 +26,7 @@ import ( "github.com/base/base-bench/runner/network" "github.com/base/base-bench/runner/network/types" "github.com/base/base-bench/runner/payload" + "github.com/base/base-bench/runner/payload/loadtest" "github.com/base/base-bench/runner/utils" "github.com/ethereum/go-ethereum/core" ethparams "github.com/ethereum/go-ethereum/params" @@ -368,6 +369,54 @@ func (s *service) setupBlobsDir(workingDir string) error { return nil } +func loadTestNetwork(genesis *core.Genesis, transactionPayload payload.Definition) string { + if envNetwork := os.Getenv("BASE_BENCH_LOAD_TEST_NETWORK"); envNetwork != "" { + return envNetwork + } + + if transactionPayload.Type == "load-test" { + if def, ok := transactionPayload.Params.(*loadtest.LoadTestPayloadDefinition); ok && def.Network != "" { + return def.Network + } + } + + if genesis == nil || genesis.Config == nil || genesis.Config.ChainID == nil { + return "unknown" + } + + switch genesis.Config.ChainID.Uint64() { + case 8453: + return "mainnet" + case 84532: + return "sepolia" + case 13371337: + return "devnet" + default: + return fmt.Sprintf("chain-%s", genesis.Config.ChainID.String()) + } +} + +func (s *service) loadTestOutputPath(genesis *core.Genesis, transactionPayload payload.Definition) string { + if transactionPayload.Type != "load-test" { + return "" + } + + network := loadTestNetwork(genesis, transactionPayload) + baseTime := time.Now().UTC() + for i := 0; ; i++ { + timestamp := baseTime.Add(time.Duration(i) * time.Second).Format(benchmark.LoadTestTimestampLayout) + outputPath := path.Join( + s.config.OutputDir(), + benchmark.LoadTestResultsDir, + network, + fmt.Sprintf("%s.json", timestamp), + ) + if _, err := os.Stat(outputPath); err != nil { + return outputPath + } + } +} + func (s *service) runTest(ctx context.Context, params types.RunParams, workingDir string, outputDir string, snapshotConfig *benchmark.SnapshotDefinition, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, datadirsConfig *benchmark.DatadirConfig, flashblocksBlockTime string) (*benchmark.RunResult, error) { s.log.Info(fmt.Sprintf("Running benchmark with params: %+v", params)) @@ -423,12 +472,13 @@ func (s *service) runTest(ctx context.Context, params types.RunParams, workingDi prefundAmount := new(big.Int).Mul(big.NewInt(1e6), big.NewInt(ethparams.Ether)) config := &types.TestConfig{ - Params: params, - Config: s.config, - Genesis: *genesis, - BatcherKey: *batcherKey, - PrefundPrivateKey: *prefundKey, - PrefundAmount: *prefundAmount, + Params: params, + Config: s.config, + Genesis: *genesis, + BatcherKey: *batcherKey, + PrefundPrivateKey: *prefundKey, + PrefundAmount: *prefundAmount, + LoadTestOutputPath: s.loadTestOutputPath(genesis, transactionPayload), } // Run benchmark