From 388044e03b64332ff9b7e9accc6ae906bf119d97 Mon Sep 17 00:00:00 2001 From: GT610 Date: Tue, 30 Jun 2026 19:53:59 +0800 Subject: [PATCH] feat(sftp): add downloadToRandomAccess for offset-based download Adds SftpFile.downloadToRandomAccess, which downloads a file into a dart:io RandomAccessFile, writing each SFTP read reply at its own offset rather than requiring replies to arrive in order. Unlike downloadTo (which streams sequentially into a StreamSink), downloadToRandomAccess uses a completion queue that decouples the order in which pipelined read replies arrive from the order in which they are written to disk. This lets pipelined reads keep making progress when a later offset completes before an earlier one, improving throughput on high-latency or reordering links. Implementation notes: - Adaptive concurrency: starts with one outstanding read and ramps up to maxPendingRequests as replies succeed, backing off the chunk size on short reads to avoid over-requesting past EOF. - Validates chunkSize and maxPendingRequests as positive. - Throws SftpError on incomplete downloads (byte count mismatch). Add a protocol-level test that issues reads in pipelined fashion and replies out of order, asserting the final file contents are written at the correct offsets. --- lib/src/sftp/sftp_client.dart | 165 +++++++++++++++++++ test/src/sftp/sftp_client_protocol_test.dart | 71 ++++++++ 2 files changed, 236 insertions(+) diff --git a/lib/src/sftp/sftp_client.dart b/lib/src/sftp/sftp_client.dart index 26173b4..be6102d 100644 --- a/lib/src/sftp/sftp_client.dart +++ b/lib/src/sftp/sftp_client.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:io'; import 'dart:math'; import 'dart:typed_data'; @@ -705,6 +706,162 @@ class SftpFile { return bytesRead; } + /// Downloads this file into a random-access local file. + /// + /// Unlike [read] and [downloadTo], this method does not require SFTP read + /// replies to be yielded in offset order. Replies are written to + /// [destination] at the same offset, allowing pipelined reads to make + /// progress even when later offsets complete before earlier ones. + /// + /// Returns the total number of bytes written. + Future downloadToRandomAccess( + RandomAccessFile destination, { + int? length, + int offset = 0, + void Function(int bytesRead)? onProgress, + int chunkSize = _kDownloadChunkSize, + int maxPendingRequests = _kDownloadMaxPendingRequests, + }) async { + _mustNotBeClosed(); + if (chunkSize <= 0) { + throw ArgumentError.value(chunkSize, 'chunkSize', 'must be positive'); + } + if (maxPendingRequests <= 0) { + throw ArgumentError.value( + maxPendingRequests, + 'maxPendingRequests', + 'must be positive', + ); + } + + if (length == null) { + final fileSize = (await stat()).size; + if (fileSize == null) { + throw SftpError('Can not get file size'); + } + length = fileSize - offset; + } + + if (length == 0) return 0; + if (length < 0) { + throw SftpError('Length must be positive: $length'); + } + + final endOffset = offset + length; + final completionQueue = <_ReadCompletion>[]; + var reservedOffset = offset; + var bytesWritten = 0; + var pendingReadCount = 0; + var activeReadLimit = 1; + var effectiveChunkSize = chunkSize; + Object? pendingError; + StackTrace? pendingStackTrace; + Completer? completionSignal; + + void notifyReadComplete() { + final signal = completionSignal; + if (signal != null && !signal.isCompleted) { + signal.complete(); + } + } + + Future waitForReadComplete() { + if (completionQueue.isNotEmpty || pendingError != null) { + return Future.value(); + } + final signal = completionSignal = Completer(); + return signal.future.whenComplete(() { + if (identical(completionSignal, signal)) { + completionSignal = null; + } + }); + } + + void issueRead(int startOffset, int requestLength) { + pendingReadCount++; + _readChunk(requestLength, startOffset).then( + (chunk) { + pendingReadCount--; + if (chunk != null && chunk.isNotEmpty) { + activeReadLimit = min(maxPendingRequests, activeReadLimit + 1); + } + completionQueue.add(_ReadCompletion(startOffset, chunk)); + if (chunk != null && + chunk.isNotEmpty && + chunk.length < requestLength && + startOffset + chunk.length < endOffset) { + effectiveChunkSize = max(1, min(effectiveChunkSize, chunk.length)); + issueRead( + startOffset + chunk.length, + min( + requestLength - chunk.length, + endOffset - startOffset - chunk.length, + ), + ); + } + notifyReadComplete(); + }, + onError: (Object error, StackTrace stackTrace) { + pendingReadCount--; + pendingError = error; + pendingStackTrace = stackTrace; + notifyReadComplete(); + }, + ); + } + + void scheduleReads() { + while (reservedOffset < endOffset && pendingReadCount < activeReadLimit) { + final startOffset = reservedOffset; + final requestLength = + min(effectiveChunkSize, endOffset - reservedOffset); + issueRead(startOffset, requestLength); + reservedOffset += requestLength; + } + } + + scheduleReads(); + + while (bytesWritten < length) { + if (pendingError != null) { + Error.throwWithStackTrace(pendingError!, pendingStackTrace!); + } + + if (completionQueue.isEmpty) { + if (pendingReadCount == 0) break; + await waitForReadComplete(); + continue; + } + + final completion = completionQueue.removeAt(0); + final startOffset = completion.startOffset; + final chunk = completion.chunk; + if (chunk == null) break; + if (chunk.isEmpty) { + throw SftpError('Unexpected empty data chunk before EOF'); + } + + final remaining = length - (startOffset - offset); + final outputChunk = chunk.length <= remaining + ? chunk + : Uint8List.sublistView(chunk, 0, remaining); + await destination.setPosition(startOffset); + await destination.writeFrom(outputChunk); + + bytesWritten += outputChunk.length; + onProgress?.call(bytesWritten); + scheduleReads(); + } + + if (bytesWritten != length) { + throw SftpError( + 'Incomplete download: received $bytesWritten of $length bytes', + ); + } + + return bytesWritten; + } + /// Reads at most [length] bytes from the file starting at [offset]. If /// [length] is null, reads until end of the file. /// Use [read] if you want to stream large file in chunks. @@ -801,3 +958,11 @@ class SftpHandsake { @override String toString() => 'SftpHandsake($version, $extensions)'; } + +/// Tracks a pending SFTP read completion for [SftpFile.downloadToRandomAccess]. +class _ReadCompletion { + _ReadCompletion(this.startOffset, this.chunk); + + final int startOffset; + final Uint8List? chunk; +} diff --git a/test/src/sftp/sftp_client_protocol_test.dart b/test/src/sftp/sftp_client_protocol_test.dart index 0165b3b..f619945 100644 --- a/test/src/sftp/sftp_client_protocol_test.dart +++ b/test/src/sftp/sftp_client_protocol_test.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:io'; import 'dart:typed_data'; import 'package:dartssh2/src/message/msg_channel.dart'; @@ -406,6 +407,76 @@ void main() { await renameFuture; harness.dispose(); }); + + test('downloadToRandomAccess writes out-of-order chunks at offset', + () async { + final harness = _SftpHarness(); + await harness.nextOutgoingPacket(); + harness.sendResponsePacket(SftpVersionPacket(3)); + await harness.client.handshake; + + final fileFuture = harness.client.open('/tmp/file'); + final open = SftpOpenPacket.decode(await harness.nextOutgoingPacket()); + harness.sendResponsePacket( + SftpHandlePacket(open.requestId, Uint8List.fromList([1, 2, 3])), + ); + final file = await fileFuture; + + final tempDir = await Directory.systemTemp.createTemp('sftp_ra_test'); + final tempFile = File('${tempDir.path}/ra.bin'); + final raf = await tempFile.open(mode: FileMode.write); + + final downloadFuture = file.downloadToRandomAccess( + raf, + length: 12, + chunkSize: 4, + maxPendingRequests: 2, + ); + + // activeReadLimit starts at 1, so only read1 is issued initially. + final read1 = SftpReadPacket.decode(await harness.nextOutgoingPacket()); + expect(read1.offset, 0); + // Replying to read1 bumps activeReadLimit to 2, which schedules read2 + // and read3 together (both fit within the limit now). + harness.sendResponsePacket( + SftpDataPacket(read1.requestId, Uint8List.fromList('ABCD'.codeUnits)), + ); + final read2 = SftpReadPacket.decode(await harness.nextOutgoingPacket()); + final read3 = SftpReadPacket.decode(await harness.nextOutgoingPacket()); + expect(read2.offset, 4); + expect(read3.offset, 8); + + // Reply out of order: read3 before read2. downloadToRandomAccess must + // write each chunk at its own offset regardless of arrival order. + harness.sendResponsePacket( + SftpDataPacket(read3.requestId, Uint8List.fromList('IJKL'.codeUnits)), + ); + harness.sendResponsePacket( + SftpDataPacket(read2.requestId, Uint8List.fromList('EFGH'.codeUnits)), + ); + + final bytes = await downloadFuture; + expect(bytes, 12); + + final closeFuture = file.close(); + final close = SftpClosePacket.decode(await harness.nextOutgoingPacket()); + harness.sendResponsePacket( + SftpStatusPacket( + requestId: close.requestId, + code: SftpStatusCode.ok, + message: 'ok', + ), + ); + await closeFuture; + + await raf.close(); + + final contents = await tempFile.readAsBytes(); + expect(contents, Uint8List.fromList('ABCDEFGHIJKL'.codeUnits)); + + await tempDir.delete(recursive: true); + harness.dispose(); + }); }); }