diff --git a/lib/src/sftp/sftp_client.dart b/lib/src/sftp/sftp_client.dart index 26173b4..be6102d 100644 --- a/lib/src/sftp/sftp_client.dart +++ b/lib/src/sftp/sftp_client.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:io'; import 'dart:math'; import 'dart:typed_data'; @@ -705,6 +706,162 @@ class SftpFile { return bytesRead; } + /// Downloads this file into a random-access local file. + /// + /// Unlike [read] and [downloadTo], this method does not require SFTP read + /// replies to be yielded in offset order. Replies are written to + /// [destination] at the same offset, allowing pipelined reads to make + /// progress even when later offsets complete before earlier ones. + /// + /// Returns the total number of bytes written. + Future downloadToRandomAccess( + RandomAccessFile destination, { + int? length, + int offset = 0, + void Function(int bytesRead)? onProgress, + int chunkSize = _kDownloadChunkSize, + int maxPendingRequests = _kDownloadMaxPendingRequests, + }) async { + _mustNotBeClosed(); + if (chunkSize <= 0) { + throw ArgumentError.value(chunkSize, 'chunkSize', 'must be positive'); + } + if (maxPendingRequests <= 0) { + throw ArgumentError.value( + maxPendingRequests, + 'maxPendingRequests', + 'must be positive', + ); + } + + if (length == null) { + final fileSize = (await stat()).size; + if (fileSize == null) { + throw SftpError('Can not get file size'); + } + length = fileSize - offset; + } + + if (length == 0) return 0; + if (length < 0) { + throw SftpError('Length must be positive: $length'); + } + + final endOffset = offset + length; + final completionQueue = <_ReadCompletion>[]; + var reservedOffset = offset; + var bytesWritten = 0; + var pendingReadCount = 0; + var activeReadLimit = 1; + var effectiveChunkSize = chunkSize; + Object? pendingError; + StackTrace? pendingStackTrace; + Completer? completionSignal; + + void notifyReadComplete() { + final signal = completionSignal; + if (signal != null && !signal.isCompleted) { + signal.complete(); + } + } + + Future waitForReadComplete() { + if (completionQueue.isNotEmpty || pendingError != null) { + return Future.value(); + } + final signal = completionSignal = Completer(); + return signal.future.whenComplete(() { + if (identical(completionSignal, signal)) { + completionSignal = null; + } + }); + } + + void issueRead(int startOffset, int requestLength) { + pendingReadCount++; + _readChunk(requestLength, startOffset).then( + (chunk) { + pendingReadCount--; + if (chunk != null && chunk.isNotEmpty) { + activeReadLimit = min(maxPendingRequests, activeReadLimit + 1); + } + completionQueue.add(_ReadCompletion(startOffset, chunk)); + if (chunk != null && + chunk.isNotEmpty && + chunk.length < requestLength && + startOffset + chunk.length < endOffset) { + effectiveChunkSize = max(1, min(effectiveChunkSize, chunk.length)); + issueRead( + startOffset + chunk.length, + min( + requestLength - chunk.length, + endOffset - startOffset - chunk.length, + ), + ); + } + notifyReadComplete(); + }, + onError: (Object error, StackTrace stackTrace) { + pendingReadCount--; + pendingError = error; + pendingStackTrace = stackTrace; + notifyReadComplete(); + }, + ); + } + + void scheduleReads() { + while (reservedOffset < endOffset && pendingReadCount < activeReadLimit) { + final startOffset = reservedOffset; + final requestLength = + min(effectiveChunkSize, endOffset - reservedOffset); + issueRead(startOffset, requestLength); + reservedOffset += requestLength; + } + } + + scheduleReads(); + + while (bytesWritten < length) { + if (pendingError != null) { + Error.throwWithStackTrace(pendingError!, pendingStackTrace!); + } + + if (completionQueue.isEmpty) { + if (pendingReadCount == 0) break; + await waitForReadComplete(); + continue; + } + + final completion = completionQueue.removeAt(0); + final startOffset = completion.startOffset; + final chunk = completion.chunk; + if (chunk == null) break; + if (chunk.isEmpty) { + throw SftpError('Unexpected empty data chunk before EOF'); + } + + final remaining = length - (startOffset - offset); + final outputChunk = chunk.length <= remaining + ? chunk + : Uint8List.sublistView(chunk, 0, remaining); + await destination.setPosition(startOffset); + await destination.writeFrom(outputChunk); + + bytesWritten += outputChunk.length; + onProgress?.call(bytesWritten); + scheduleReads(); + } + + if (bytesWritten != length) { + throw SftpError( + 'Incomplete download: received $bytesWritten of $length bytes', + ); + } + + return bytesWritten; + } + /// Reads at most [length] bytes from the file starting at [offset]. If /// [length] is null, reads until end of the file. /// Use [read] if you want to stream large file in chunks. @@ -801,3 +958,11 @@ class SftpHandsake { @override String toString() => 'SftpHandsake($version, $extensions)'; } + +/// Tracks a pending SFTP read completion for [SftpFile.downloadToRandomAccess]. +class _ReadCompletion { + _ReadCompletion(this.startOffset, this.chunk); + + final int startOffset; + final Uint8List? chunk; +} diff --git a/test/src/sftp/sftp_client_protocol_test.dart b/test/src/sftp/sftp_client_protocol_test.dart index 1df6aa2..f619945 100644 --- a/test/src/sftp/sftp_client_protocol_test.dart +++ b/test/src/sftp/sftp_client_protocol_test.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:io'; import 'dart:typed_data'; import 'package:dartssh2/src/message/msg_channel.dart'; @@ -407,31 +408,73 @@ void main() { harness.dispose(); }); - test( - 'rename falls back to SSH_FXP_RENAME when extension version is mismatched', + test('downloadToRandomAccess writes out-of-order chunks at offset', () async { final harness = _SftpHarness(); await harness.nextOutgoingPacket(); + harness.sendResponsePacket(SftpVersionPacket(3)); + await harness.client.handshake; + + final fileFuture = harness.client.open('/tmp/file'); + final open = SftpOpenPacket.decode(await harness.nextOutgoingPacket()); harness.sendResponsePacket( - SftpVersionPacket(3, {'posix-rename@openssh.com': '2'}), + SftpHandlePacket(open.requestId, Uint8List.fromList([1, 2, 3])), ); - await harness.client.handshake; + final file = await fileFuture; - final renameFuture = harness.client.rename('/tmp/a', '/tmp/b'); - final packet = await harness.nextOutgoingPacket(); - final rename = SftpRenamePacket.decode(packet); - expect(rename.oldPath, '/tmp/a'); - expect(rename.newPath, '/tmp/b'); + final tempDir = await Directory.systemTemp.createTemp('sftp_ra_test'); + final tempFile = File('${tempDir.path}/ra.bin'); + final raf = await tempFile.open(mode: FileMode.write); + + final downloadFuture = file.downloadToRandomAccess( + raf, + length: 12, + chunkSize: 4, + maxPendingRequests: 2, + ); + // activeReadLimit starts at 1, so only read1 is issued initially. + final read1 = SftpReadPacket.decode(await harness.nextOutgoingPacket()); + expect(read1.offset, 0); + // Replying to read1 bumps activeReadLimit to 2, which schedules read2 + // and read3 together (both fit within the limit now). + harness.sendResponsePacket( + SftpDataPacket(read1.requestId, Uint8List.fromList('ABCD'.codeUnits)), + ); + final read2 = SftpReadPacket.decode(await harness.nextOutgoingPacket()); + final read3 = SftpReadPacket.decode(await harness.nextOutgoingPacket()); + expect(read2.offset, 4); + expect(read3.offset, 8); + + // Reply out of order: read3 before read2. downloadToRandomAccess must + // write each chunk at its own offset regardless of arrival order. + harness.sendResponsePacket( + SftpDataPacket(read3.requestId, Uint8List.fromList('IJKL'.codeUnits)), + ); + harness.sendResponsePacket( + SftpDataPacket(read2.requestId, Uint8List.fromList('EFGH'.codeUnits)), + ); + + final bytes = await downloadFuture; + expect(bytes, 12); + + final closeFuture = file.close(); + final close = SftpClosePacket.decode(await harness.nextOutgoingPacket()); harness.sendResponsePacket( SftpStatusPacket( - requestId: rename.requestId, + requestId: close.requestId, code: SftpStatusCode.ok, message: 'ok', ), ); + await closeFuture; - await renameFuture; + await raf.close(); + + final contents = await tempFile.readAsBytes(); + expect(contents, Uint8List.fromList('ABCDEFGHIJKL'.codeUnits)); + + await tempDir.delete(recursive: true); harness.dispose(); }); });