Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions lib/src/sftp/sftp_client.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:io';
import 'dart:math';
import 'dart:typed_data';

Expand Down Expand Up @@ -705,6 +706,162 @@ class SftpFile {
return bytesRead;
}

/// Downloads this file into a random-access local file.
///
/// Unlike [read] and [downloadTo], this method does not require SFTP read
/// replies to be yielded in offset order. Replies are written to
/// [destination] at the same offset, allowing pipelined reads to make
/// progress even when later offsets complete before earlier ones.
///
/// Returns the total number of bytes written.
Future<int> downloadToRandomAccess(
RandomAccessFile destination, {
int? length,
int offset = 0,
void Function(int bytesRead)? onProgress,
int chunkSize = _kDownloadChunkSize,
int maxPendingRequests = _kDownloadMaxPendingRequests,
}) async {
_mustNotBeClosed();
if (chunkSize <= 0) {
throw ArgumentError.value(chunkSize, 'chunkSize', 'must be positive');
}
if (maxPendingRequests <= 0) {
throw ArgumentError.value(
maxPendingRequests,
'maxPendingRequests',
'must be positive',
);
}

if (length == null) {
final fileSize = (await stat()).size;
if (fileSize == null) {
throw SftpError('Can not get file size');
}
length = fileSize - offset;
}

if (length == 0) return 0;
if (length < 0) {
throw SftpError('Length must be positive: $length');
}

final endOffset = offset + length;
final completionQueue = <_ReadCompletion>[];
var reservedOffset = offset;
var bytesWritten = 0;
var pendingReadCount = 0;
var activeReadLimit = 1;
var effectiveChunkSize = chunkSize;
Object? pendingError;
StackTrace? pendingStackTrace;
Completer<void>? completionSignal;

void notifyReadComplete() {
final signal = completionSignal;
if (signal != null && !signal.isCompleted) {
signal.complete();
}
}

Future<void> waitForReadComplete() {
if (completionQueue.isNotEmpty || pendingError != null) {
return Future.value();
}
final signal = completionSignal = Completer<void>();
return signal.future.whenComplete(() {
if (identical(completionSignal, signal)) {
completionSignal = null;
}
});
}

void issueRead(int startOffset, int requestLength) {
pendingReadCount++;
_readChunk(requestLength, startOffset).then(
(chunk) {
pendingReadCount--;
if (chunk != null && chunk.isNotEmpty) {
activeReadLimit = min(maxPendingRequests, activeReadLimit + 1);
}
completionQueue.add(_ReadCompletion(startOffset, chunk));
if (chunk != null &&
chunk.isNotEmpty &&
chunk.length < requestLength &&
startOffset + chunk.length < endOffset) {
effectiveChunkSize = max(1, min(effectiveChunkSize, chunk.length));
issueRead(
startOffset + chunk.length,
min(
requestLength - chunk.length,
endOffset - startOffset - chunk.length,
),
);
}
notifyReadComplete();
},
onError: (Object error, StackTrace stackTrace) {
pendingReadCount--;
pendingError = error;
pendingStackTrace = stackTrace;
notifyReadComplete();
},
);
}

void scheduleReads() {
while (reservedOffset < endOffset && pendingReadCount < activeReadLimit) {
final startOffset = reservedOffset;
final requestLength =
min(effectiveChunkSize, endOffset - reservedOffset);
issueRead(startOffset, requestLength);
reservedOffset += requestLength;
}
}

scheduleReads();

while (bytesWritten < length) {
if (pendingError != null) {
Error.throwWithStackTrace(pendingError!, pendingStackTrace!);
}

if (completionQueue.isEmpty) {
if (pendingReadCount == 0) break;
await waitForReadComplete();
continue;
}

final completion = completionQueue.removeAt(0);
final startOffset = completion.startOffset;
final chunk = completion.chunk;
if (chunk == null) break;
if (chunk.isEmpty) {
throw SftpError('Unexpected empty data chunk before EOF');
}

final remaining = length - (startOffset - offset);
final outputChunk = chunk.length <= remaining
? chunk
: Uint8List.sublistView(chunk, 0, remaining);
await destination.setPosition(startOffset);
await destination.writeFrom(outputChunk);

bytesWritten += outputChunk.length;
onProgress?.call(bytesWritten);
scheduleReads();
}

if (bytesWritten != length) {
throw SftpError(
'Incomplete download: received $bytesWritten of $length bytes',
);
}

return bytesWritten;
}

/// Reads at most [length] bytes from the file starting at [offset]. If
/// [length] is null, reads until end of the file.
/// Use [read] if you want to stream large file in chunks.
Expand Down Expand Up @@ -801,3 +958,11 @@ class SftpHandsake {
@override
String toString() => 'SftpHandsake($version, $extensions)';
}

/// Tracks a pending SFTP read completion for [SftpFile.downloadToRandomAccess].
class _ReadCompletion {
_ReadCompletion(this.startOffset, this.chunk);

final int startOffset;
final Uint8List? chunk;
}
65 changes: 54 additions & 11 deletions test/src/sftp/sftp_client_protocol_test.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:io';
import 'dart:typed_data';

import 'package:dartssh2/src/message/msg_channel.dart';
Expand Down Expand Up @@ -407,31 +408,73 @@ void main() {
harness.dispose();
});

test(
'rename falls back to SSH_FXP_RENAME when extension version is mismatched',
test('downloadToRandomAccess writes out-of-order chunks at offset',
() async {
final harness = _SftpHarness();
await harness.nextOutgoingPacket();
harness.sendResponsePacket(SftpVersionPacket(3));
await harness.client.handshake;

final fileFuture = harness.client.open('/tmp/file');
final open = SftpOpenPacket.decode(await harness.nextOutgoingPacket());
harness.sendResponsePacket(
SftpVersionPacket(3, {'posix-rename@openssh.com': '2'}),
SftpHandlePacket(open.requestId, Uint8List.fromList([1, 2, 3])),
);
await harness.client.handshake;
final file = await fileFuture;

final renameFuture = harness.client.rename('/tmp/a', '/tmp/b');
final packet = await harness.nextOutgoingPacket();
final rename = SftpRenamePacket.decode(packet);
expect(rename.oldPath, '/tmp/a');
expect(rename.newPath, '/tmp/b');
final tempDir = await Directory.systemTemp.createTemp('sftp_ra_test');
final tempFile = File('${tempDir.path}/ra.bin');
final raf = await tempFile.open(mode: FileMode.write);

final downloadFuture = file.downloadToRandomAccess(
raf,
length: 12,
chunkSize: 4,
maxPendingRequests: 2,
);

// activeReadLimit starts at 1, so only read1 is issued initially.
final read1 = SftpReadPacket.decode(await harness.nextOutgoingPacket());
expect(read1.offset, 0);
// Replying to read1 bumps activeReadLimit to 2, which schedules read2
// and read3 together (both fit within the limit now).
harness.sendResponsePacket(
SftpDataPacket(read1.requestId, Uint8List.fromList('ABCD'.codeUnits)),
);
final read2 = SftpReadPacket.decode(await harness.nextOutgoingPacket());
final read3 = SftpReadPacket.decode(await harness.nextOutgoingPacket());
expect(read2.offset, 4);
expect(read3.offset, 8);

// Reply out of order: read3 before read2. downloadToRandomAccess must
// write each chunk at its own offset regardless of arrival order.
harness.sendResponsePacket(
SftpDataPacket(read3.requestId, Uint8List.fromList('IJKL'.codeUnits)),
);
harness.sendResponsePacket(
SftpDataPacket(read2.requestId, Uint8List.fromList('EFGH'.codeUnits)),
);

final bytes = await downloadFuture;
expect(bytes, 12);

final closeFuture = file.close();
final close = SftpClosePacket.decode(await harness.nextOutgoingPacket());
harness.sendResponsePacket(
SftpStatusPacket(
requestId: rename.requestId,
requestId: close.requestId,
code: SftpStatusCode.ok,
message: 'ok',
),
);
await closeFuture;

await renameFuture;
await raf.close();

final contents = await tempFile.readAsBytes();
expect(contents, Uint8List.fromList('ABCDEFGHIJKL'.codeUnits));

await tempDir.delete(recursive: true);
harness.dispose();
});
});
Expand Down
Loading