-
-
Notifications
You must be signed in to change notification settings - Fork 16
feat: implement JSON-RPC 2.0 server using aiohttp #98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -26,6 +26,7 @@ | |||||||||||||||||||||||||
| from nacl.encoding import HexEncoder | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| from minichain import Transaction, Blockchain, Block, State, Mempool, P2PNetwork, mine_block | ||||||||||||||||||||||||||
| from minichain.rpc import JSONRPCServer | ||||||||||||||||||||||||||
| from minichain.validators import is_valid_receiver | ||||||||||||||||||||||||||
| from minichain.block import calculate_receipt_root | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
@@ -113,7 +114,7 @@ def mine_and_process_block(chain, mempool, miner_pk): | |||||||||||||||||||||||||
| # Network message handler | ||||||||||||||||||||||||||
| # ────────────────────────────────────────────── | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| def make_network_handler(chain, mempool): | ||||||||||||||||||||||||||
| def make_network_handler(chain, mempool, network): | ||||||||||||||||||||||||||
| """Return an async callback that processes incoming P2P messages.""" | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| async def handler(data): | ||||||||||||||||||||||||||
|
|
@@ -159,7 +160,33 @@ async def handler(data): | |||||||||||||||||||||||||
| # Drop only confirmed transactions so higher nonces can remain queued. | ||||||||||||||||||||||||||
| mempool.remove_transactions(block.transactions) | ||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||
| logger.warning("📥 Received Block #%s — rejected", block.index) | ||||||||||||||||||||||||||
| if block.index > chain.last_block.index: | ||||||||||||||||||||||||||
| logger.warning("📥 Received Block #%s — ahead of us (tip: %s). Requesting chain sync...", block.index, chain.last_block.index) | ||||||||||||||||||||||||||
| asyncio.create_task(network.broadcast_chain_request()) | ||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||
| logger.warning("📥 Received Block #%s — rejected", block.index) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| elif msg_type == "chain_request": | ||||||||||||||||||||||||||
| logger.info("📡 Peer requested chain sync. Broadcasting our chain...") | ||||||||||||||||||||||||||
| blocks_dicts = [b.to_dict() for b in chain.chain] | ||||||||||||||||||||||||||
| payload = {"type": "chain_response", "data": {"blocks": blocks_dicts}} | ||||||||||||||||||||||||||
| asyncio.create_task(network._broadcast_raw(payload)) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| elif msg_type == "chain_response": | ||||||||||||||||||||||||||
| blocks_payload = payload.get("blocks", []) | ||||||||||||||||||||||||||
| new_chain = [] | ||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||
| new_chain = [Block.from_dict(b) for b in blocks_payload] | ||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||
| logger.warning("❌ Failed to parse chain_response: %s", e) | ||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if new_chain: | ||||||||||||||||||||||||||
| success, orphans = chain.resolve_conflicts(new_chain) | ||||||||||||||||||||||||||
| if success: | ||||||||||||||||||||||||||
| logger.info("🔄 Reorg complete! Restoring %d orphaned txs to mempool.", len(orphans)) | ||||||||||||||||||||||||||
| for tx in orphans: | ||||||||||||||||||||||||||
| mempool.add_transaction(tx) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| return handler | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
@@ -389,8 +416,10 @@ async def run_node(port: int, host: str, connect_to: str | None, fund: int, data | |||||||||||||||||||||||||
| mempool = Mempool() | ||||||||||||||||||||||||||
| network = P2PNetwork() | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| handler = make_network_handler(chain, mempool) | ||||||||||||||||||||||||||
| handler = make_network_handler(chain, mempool, network) | ||||||||||||||||||||||||||
| network.register_handler(handler) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| rpc_server = JSONRPCServer(chain, mempool, network) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # When a new peer connects, send our state so they can sync | ||||||||||||||||||||||||||
| async def on_peer_connected(writer): | ||||||||||||||||||||||||||
|
|
@@ -406,6 +435,10 @@ async def on_peer_connected(writer): | |||||||||||||||||||||||||
| network.register_on_peer_connected(on_peer_connected) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| await network.start(port=port, host=host) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Start RPC server on a port correlated to the node port (e.g. 8545 if P2P is 9000) | ||||||||||||||||||||||||||
| rpc_port = 8545 + (port - 9000) | ||||||||||||||||||||||||||
| rpc_task = asyncio.create_task(rpc_server.start(host="127.0.0.1", port=rpc_port)) | ||||||||||||||||||||||||||
|
Comment on lines
+438
to
+441
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. RPC port calculation can produce invalid or unexpected ports. The formula Consider adding a separate Proposed fix with validation # Start RPC server on a port correlated to the node port (e.g. 8545 if P2P is 9000)
rpc_port = 8545 + (port - 9000)
+ if rpc_port < 1 or rpc_port > 65535:
+ rpc_port = 8545 # fallback to default
+ logger.warning("Computed RPC port out of range, using default %d", rpc_port)
rpc_task = asyncio.create_task(rpc_server.start(host="127.0.0.1", port=rpc_port))🤖 Prompt for AI Agents |
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Fund this node's wallet so it can transact in the demo | ||||||||||||||||||||||||||
| if fund > 0: | ||||||||||||||||||||||||||
|
|
@@ -431,6 +464,9 @@ async def on_peer_connected(writer): | |||||||||||||||||||||||||
| logger.info("Chain saved to '%s'", datadir) | ||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||
| logger.error("Failed to save chain during shutdown: %s", e) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if rpc_task: | ||||||||||||||||||||||||||
| rpc_task.cancel() | ||||||||||||||||||||||||||
| await network.stop() | ||||||||||||||||||||||||||
|
Comment on lines
+467
to
470
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. RPC task cancellation is not awaited — incomplete cleanup. Calling Proposed fix if rpc_task:
rpc_task.cancel()
+ try:
+ await rpc_task
+ except asyncio.CancelledError:
+ pass
await network.stop()Ideally, combine this with a proper 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -89,6 +89,9 @@ def _create_genesis_block(self, genesis_path): | |||||||||||||
| genesis_block.hash = computed_hash | ||||||||||||||
|
|
||||||||||||||
| self.chain.append(genesis_block) | ||||||||||||||
|
|
||||||||||||||
| # Snapshot the state exactly after genesis allocation for clean reorg rebuilds | ||||||||||||||
| self._genesis_state_snapshot = self.state.snapshot() | ||||||||||||||
|
|
||||||||||||||
| @property | ||||||||||||||
| def last_block(self): | ||||||||||||||
|
|
@@ -98,6 +101,16 @@ def last_block(self): | |||||||||||||
| with self._lock: # Acquire lock for thread-safe access | ||||||||||||||
| return self.chain[-1] | ||||||||||||||
|
|
||||||||||||||
| def get_total_work(self, chain_list=None): | ||||||||||||||
| """ | ||||||||||||||
| Calculates the cumulative PoW of a chain. | ||||||||||||||
| Work is proportional to 2^difficulty. | ||||||||||||||
| """ | ||||||||||||||
| if chain_list is None: | ||||||||||||||
| with self._lock: | ||||||||||||||
| chain_list = self.chain | ||||||||||||||
| return sum(2 ** (block.difficulty or 1) for block in chain_list) | ||||||||||||||
|
|
||||||||||||||
| def add_block(self, block): | ||||||||||||||
| """ | ||||||||||||||
| Validates and adds a block to the chain if all transactions succeed. | ||||||||||||||
|
|
@@ -147,3 +160,77 @@ def add_block(self, block): | |||||||||||||
| self.state = temp_state | ||||||||||||||
| self.chain.append(block) | ||||||||||||||
| return True | ||||||||||||||
|
|
||||||||||||||
| def resolve_conflicts(self, new_chain_list) -> tuple[bool, list]: | ||||||||||||||
| """ | ||||||||||||||
| Evaluates a competing chain. If it has strictly greater cumulative work, | ||||||||||||||
| attempts a reorg. Rebuilds state from genesis to guarantee validity. | ||||||||||||||
| Returns: (success_bool, list_of_orphaned_transactions) | ||||||||||||||
| """ | ||||||||||||||
| if not new_chain_list: | ||||||||||||||
| return False, [] | ||||||||||||||
|
|
||||||||||||||
| with self._lock: | ||||||||||||||
| current_work = self.get_total_work() | ||||||||||||||
| new_work = self.get_total_work(new_chain_list) | ||||||||||||||
|
|
||||||||||||||
| if new_work <= current_work: | ||||||||||||||
| logger.debug("Incoming chain (work: %s) is not heavier than local chain (work: %s). Rejecting.", new_work, current_work) | ||||||||||||||
| return False, [] | ||||||||||||||
|
|
||||||||||||||
| # 1. Verify genesis block matches | ||||||||||||||
| if new_chain_list[0].hash != self.chain[0].hash: | ||||||||||||||
| logger.warning("Reorg failed: Genesis hash mismatch.") | ||||||||||||||
| return False, [] | ||||||||||||||
|
|
||||||||||||||
| logger.info("Incoming chain is heavier (%s > %s). Attempting reorg...", new_work, current_work) | ||||||||||||||
|
|
||||||||||||||
| # 2. Snapshot current state and chain in case reorg fails validation | ||||||||||||||
| state_snapshot = self.state.snapshot() | ||||||||||||||
| original_chain = list(self.chain) | ||||||||||||||
|
Comment on lines
+188
to
+190
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial | 💤 Low value Unused snapshots can be removed.
🧹 Remove dead code logger.info("Incoming chain is heavier (%s > %s). Attempting reorg...", new_work, current_work)
- # 2. Snapshot current state and chain in case reorg fails validation
- state_snapshot = self.state.snapshot()
- original_chain = list(self.chain)
-
- # 3. Rebuild state entirely from genesis using the new chain
+ # 2. Rebuild state entirely from genesis using the new chain
temp_state = State()
temp_state.restore(self._genesis_state_snapshot)📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||
|
|
||||||||||||||
| # 3. Rebuild state entirely from genesis using the new chain | ||||||||||||||
| temp_state = State() | ||||||||||||||
| temp_state.restore(self._genesis_state_snapshot) | ||||||||||||||
|
|
||||||||||||||
| # Verify and apply blocks 1 to N | ||||||||||||||
| for i in range(1, len(new_chain_list)): | ||||||||||||||
| prev_block = new_chain_list[i-1] | ||||||||||||||
| block = new_chain_list[i] | ||||||||||||||
|
|
||||||||||||||
| try: | ||||||||||||||
| validate_block_link_and_hash(prev_block, block) | ||||||||||||||
| except ValueError as exc: | ||||||||||||||
| logger.warning("Reorg failed at block %s: %s", block.index, exc) | ||||||||||||||
| return False, [] | ||||||||||||||
|
|
||||||||||||||
| receipts = [] | ||||||||||||||
| for tx in block.transactions: | ||||||||||||||
| receipt = temp_state.validate_and_apply(tx) | ||||||||||||||
| if receipt is None: | ||||||||||||||
| logger.warning("Reorg failed: Transaction validation failed in block %s", block.index) | ||||||||||||||
| return False, [] | ||||||||||||||
| receipts.append(receipt) | ||||||||||||||
|
|
||||||||||||||
| total_fees = sum(getattr(r, 'gas_used', 0) for r in receipts) | ||||||||||||||
| if block.miner: | ||||||||||||||
| temp_state.credit_mining_reward(block.miner, reward=temp_state.DEFAULT_MINING_REWARD + total_fees) | ||||||||||||||
|
|
||||||||||||||
| computed_receipt_root = calculate_receipt_root(receipts) | ||||||||||||||
| if block.receipt_root != computed_receipt_root: | ||||||||||||||
| logger.warning("Reorg failed: Invalid receipt root at block %s. Expected %s, got %s", block.index, computed_receipt_root, block.receipt_root) | ||||||||||||||
| return False, [] | ||||||||||||||
|
|
||||||||||||||
| if block.state_root != temp_state.state_root(): | ||||||||||||||
| logger.warning("Reorg failed: Invalid state root at block %s", block.index) | ||||||||||||||
| return False, [] | ||||||||||||||
|
|
||||||||||||||
| # 4. Success! Compute orphaned transactions. | ||||||||||||||
| old_txs = {tx.tx_id: tx for b in original_chain[1:] for tx in b.transactions} | ||||||||||||||
| new_tx_ids = {tx.tx_id for b in new_chain_list[1:] for tx in b.transactions} | ||||||||||||||
| orphans = [tx for tx_id, tx in old_txs.items() if tx_id not in new_tx_ids] | ||||||||||||||
|
|
||||||||||||||
| self.chain = new_chain_list | ||||||||||||||
| self.state = temp_state | ||||||||||||||
|
Comment on lines
+233
to
+234
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider copying the incoming chain to avoid aliasing. Directly assigning 🛡️ Proposed fix- self.chain = new_chain_list
+ self.chain = list(new_chain_list)
self.state = temp_state📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||
| logger.info("Reorg successful! Switched to new chain tip: Block %s", self.last_block.index) | ||||||||||||||
| return True, orphans | ||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -5,8 +5,7 @@ | |||||||||||||||||
|
|
||||||||||||||||||
| class Mempool: | ||||||||||||||||||
| def __init__(self, max_size=1000, transactions_per_block=100): | ||||||||||||||||||
| self._pool = {} | ||||||||||||||||||
| self._size = 0 | ||||||||||||||||||
| self._list = [] # Single sorted list | ||||||||||||||||||
| self._lock = threading.Lock() | ||||||||||||||||||
| self.max_size = max_size | ||||||||||||||||||
| self.transactions_per_block = transactions_per_block | ||||||||||||||||||
|
|
@@ -17,64 +16,62 @@ def add_transaction(self, tx): | |||||||||||||||||
| return False | ||||||||||||||||||
|
|
||||||||||||||||||
| with self._lock: | ||||||||||||||||||
| existing = self._pool.get(tx.sender, {}).get(tx.nonce) | ||||||||||||||||||
| existing_idx = None | ||||||||||||||||||
| i_min = 0 | ||||||||||||||||||
| i_max = len(self._list) | ||||||||||||||||||
|
|
||||||||||||||||||
| for i, existing_tx in enumerate(self._list): | ||||||||||||||||||
| if existing_tx.sender == tx.sender: | ||||||||||||||||||
| if existing_tx.nonce == tx.nonce: | ||||||||||||||||||
| existing_idx = i | ||||||||||||||||||
| elif existing_tx.nonce < tx.nonce: | ||||||||||||||||||
| # Must insert AFTER the largest lower-nonce transaction | ||||||||||||||||||
| i_min = max(i_min, i + 1) | ||||||||||||||||||
| elif existing_tx.nonce > tx.nonce: | ||||||||||||||||||
| # Must insert BEFORE the smallest higher-nonce transaction | ||||||||||||||||||
| i_max = min(i_max, i) | ||||||||||||||||||
|
|
||||||||||||||||||
| if existing: | ||||||||||||||||||
| if existing.tx_id == tx.tx_id: | ||||||||||||||||||
| if existing_idx is not None: | ||||||||||||||||||
| existing_tx = self._list[existing_idx] | ||||||||||||||||||
| if existing_tx.tx_id == tx.tx_id: | ||||||||||||||||||
| logger.warning("Mempool: Duplicate transaction rejected %s", tx.tx_id) | ||||||||||||||||||
| return False | ||||||||||||||||||
| # Fix: Guard against older replacements (e.g. rejected block restore) | ||||||||||||||||||
| # Only allow overwrite if it's a genuinely newer replacement | ||||||||||||||||||
| if tx.timestamp <= existing.timestamp: | ||||||||||||||||||
| if tx.timestamp <= existing_tx.timestamp: | ||||||||||||||||||
| logger.warning("Mempool: Ignoring older replacement %s", tx.tx_id) | ||||||||||||||||||
| return False | ||||||||||||||||||
|
|
||||||||||||||||||
| self._list.pop(existing_idx) | ||||||||||||||||||
| if i_max > existing_idx: | ||||||||||||||||||
| i_max -= 1 | ||||||||||||||||||
| if i_min > existing_idx: | ||||||||||||||||||
| i_min -= 1 | ||||||||||||||||||
| else: | ||||||||||||||||||
| if self._size >= self.max_size: | ||||||||||||||||||
| if len(self._list) >= self.max_size: | ||||||||||||||||||
| logger.warning("Mempool: Full, rejecting transaction") | ||||||||||||||||||
| return False | ||||||||||||||||||
|
Comment on lines
48
to
51
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial | 💤 Low value Use Per static analysis (PLR5501), convert ♻️ Proposed refactor- else:
- if len(self._list) >= self.max_size:
- logger.warning("Mempool: Full, rejecting transaction")
- return False
+ elif len(self._list) >= self.max_size:
+ logger.warning("Mempool: Full, rejecting transaction")
+ return False📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.15.15)[warning] 48-49: Use Convert to (PLR5501) 🤖 Prompt for AI AgentsSource: Linters/SAST tools |
||||||||||||||||||
| self._size += 1 | ||||||||||||||||||
| self._pool.setdefault(tx.sender, {})[tx.nonce] = tx | ||||||||||||||||||
| return True | ||||||||||||||||||
|
|
||||||||||||||||||
| def get_transactions_for_block(self): | ||||||||||||||||||
| with self._lock: | ||||||||||||||||||
| snapshot = {s: list(pool.values()) for s, pool in self._pool.items()} | ||||||||||||||||||
|
|
||||||||||||||||||
| for txs in snapshot.values(): | ||||||||||||||||||
| txs.sort(key=lambda t: t.nonce) | ||||||||||||||||||
| i_min = min(i_min, i_max) | ||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Safety clamp silently masks broken invariant. When Consider failing fast or logging when this condition occurs: Proposed fix to detect invariant violation- i_min = min(i_min, i_max)
+ if i_min > i_max:
+ logger.error("Mempool: Nonce ordering invariant violated for sender %s (i_min=%d, i_max=%d)", tx.sender[:12], i_min, i_max)
+ # Force valid range to allow recovery; consider raising in debug builds
+ i_min = i_max🤖 Prompt for AI Agents |
||||||||||||||||||
|
|
||||||||||||||||||
| selected = [] | ||||||||||||||||||
| while len(selected) < self.transactions_per_block: | ||||||||||||||||||
| best_tx = None | ||||||||||||||||||
| best_sender = None | ||||||||||||||||||
| # Insert before the first tx in [i_min, i_max] that has a lower fee | ||||||||||||||||||
| insert_idx = i_max | ||||||||||||||||||
| for j in range(i_min, i_max): | ||||||||||||||||||
| if getattr(self._list[j], 'fee', 0) < getattr(tx, 'fee', 0): | ||||||||||||||||||
| insert_idx = j | ||||||||||||||||||
| break | ||||||||||||||||||
|
|
||||||||||||||||||
| for sender, txs in snapshot.items(): | ||||||||||||||||||
| if txs: | ||||||||||||||||||
| current_criteria = (-getattr(txs[0], 'fee', 0), txs[0].timestamp, sender, txs[0].nonce) | ||||||||||||||||||
| best_criteria = (-getattr(best_tx, 'fee', 0), best_tx.timestamp, best_sender, best_tx.nonce) if best_tx else None | ||||||||||||||||||
| if best_tx is None or current_criteria < best_criteria: | ||||||||||||||||||
| best_tx = txs[0] | ||||||||||||||||||
| best_sender = sender | ||||||||||||||||||
|
|
||||||||||||||||||
| if not best_tx: | ||||||||||||||||||
| break | ||||||||||||||||||
|
|
||||||||||||||||||
| selected.append(best_tx) | ||||||||||||||||||
| snapshot[best_sender].pop(0) | ||||||||||||||||||
| self._list.insert(insert_idx, tx) | ||||||||||||||||||
| return True | ||||||||||||||||||
|
|
||||||||||||||||||
| return selected | ||||||||||||||||||
| def get_transactions_for_block(self): | ||||||||||||||||||
| with self._lock: | ||||||||||||||||||
| # O(1) retrieval! The list is strictly ordered upon insertion. | ||||||||||||||||||
| return list(self._list[:self.transactions_per_block]) | ||||||||||||||||||
|
|
||||||||||||||||||
| def remove_transactions(self, transactions): | ||||||||||||||||||
| with self._lock: | ||||||||||||||||||
| for tx in transactions: | ||||||||||||||||||
| pool = self._pool.get(tx.sender) | ||||||||||||||||||
| if pool and tx.nonce in pool: | ||||||||||||||||||
| del pool[tx.nonce] | ||||||||||||||||||
| self._size -= 1 | ||||||||||||||||||
| if not pool: | ||||||||||||||||||
| del self._pool[tx.sender] | ||||||||||||||||||
| keys_to_remove = {(tx.sender, tx.nonce) for tx in transactions} | ||||||||||||||||||
| self._list = [tx for tx in self._list if (tx.sender, tx.nonce) not in keys_to_remove] | ||||||||||||||||||
|
|
||||||||||||||||||
| def __len__(self): | ||||||||||||||||||
| with self._lock: | ||||||||||||||||||
| return self._size | ||||||||||||||||||
| return len(self._list) | ||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chain response broadcasts to all peers instead of just the requester — amplification risk.
When handling
chain_request, the code broadcasts the entire chain to all connected peers via_broadcast_raw, not just the peer who requested it. This is:The root cause is that the handler only receives
_peer_addr(a string), not thewriterobject needed to callsend_chain_response. Consider passing the writer through the handler callback or storing a peer address → writer mapping.🧰 Tools
🪛 Ruff (0.15.15)
[warning] 173-173: Store a reference to the return value of
asyncio.create_task(RUF006)
🤖 Prompt for AI Agents