Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions example/main.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import 'package:flow_queue/flow_queue.dart';
import 'package:flutter/material.dart';

void main() async {
WidgetsFlutterBinding.ensureInitialized();

final queue = FlowQueue('post_queue');

await queue.init();

runApp(MyApp(queue));
}

class MyApp extends StatelessWidget {
final FlowQueue queue;

const MyApp(this.queue, {super.key});

@override
Widget build(BuildContext context) {
return MaterialApp(
home: HomePage(queue: queue),
);
}
}

class HomePage extends StatelessWidget {
final FlowQueue queue;

const HomePage({
super.key,
required this.queue,
});

@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text('Flow Queue'),
),
body: Center(
child: ElevatedButton(
onPressed: () async {
final id = await queue.add(
processName: 'upload_post',
priority: QueuePriority.high,
function: () async {
await Future<void>.delayed(
const Duration(seconds: 5),
);

debugPrint('Uploaded');
},
);

queue.listen(id).listen((task) {
debugPrint(task.state.name);
});
},
child: const Text('Upload'),
),
),
);
}
}
181 changes: 181 additions & 0 deletions lib/core/flow_queue.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import 'dart:async';

import 'package:sqflite/sqflite.dart';

import '../database/database_service.dart';
import '../database/queue_queries.dart';
import '../enums/queue_priority.dart';
import '../enums/queue_state.dart';
import '../models/queue_task.dart';
import '../services/id_service.dart';
import '../streams/queue_stream.dart';
import '../utils/priority_mapper.dart';
import 'queue_worker.dart';
import 'task_executor.dart';

/// Lightweight persistent SQLite-backed queue engine.
class FlowQueue {
/// SQLite table used by this queue instance.
late final String tableName = DatabaseService.normalizeTableName(_tableName);

final String _tableName;

Database? _database;

final QueueWorker _worker = QueueWorker();
final TaskExecutor _executor = TaskExecutor();
final Map<String, Future<void> Function()> _memoryFunctions = {};

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Architectural Concern: Memory Leak & Persistence Limitations

  1. Memory Leak: The _memoryFunctions map stores task closures indefinitely. As more tasks are added and executed, this map grows without bound, preventing garbage collection of any captured variables in the closures.
  2. Persistence Limitations: Since Dart closures cannot be serialized to SQLite, any pending tasks in the database will lose their executable functions when the app restarts. This means the queue cannot truly recover and execute pending tasks across app sessions.

Recommended Long-Term Solution

Instead of passing closures directly to add(), implement a registry-based pattern where task executors are registered globally on startup by processName:

queue.registerExecutor('upload_post', (task) async {
  // task execution logic
});

This allows the queue to look up the executor by processName (which is persisted in SQLite) upon app restart, enabling true persistence and eliminating the memory leak of keeping individual closures in memory.


/// Creates a queue that persists tasks in [tableName].
FlowQueue(String tableName) : _tableName = tableName;

/// Opens the SQLite database and prepares this queue table.
Future<void> init() async {
_database = await DatabaseService.init(tableName);
}
Comment on lines +23 to +35

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Improvement: Guard Concurrent Initialization

If init() is called multiple times concurrently (or if multiple operations trigger _ensureInitialized() before the first initialization completes), it can lead to redundant database initialization calls. We can guard this by caching the initialization future.

  Database? _database;
  Future<void>? _initFuture;

  final QueueWorker _worker = QueueWorker();
  final TaskExecutor _executor = TaskExecutor();
  final Map<String, Future<void> Function()> _memoryFunctions = {};

  /// Creates a queue that persists tasks in [tableName].
  FlowQueue(String tableName) : _tableName = tableName;

  /// Opens the SQLite database and prepares this queue table.
  Future<void> init() async {
    _initFuture ??= _doInit();
    await _initFuture;
  }

  Future<void> _doInit() async {
    _database = await DatabaseService.init(tableName);
  }


/// Adds a task to the queue and starts processing pending work.
Future<String> add({
required String processName,
required QueuePriority priority,
required Future<void> Function() function,
}) async {
final database = await _ensureInitialized();
final id = IdService.generate();

_memoryFunctions[id] = function;

await database.insert(
tableName,
{
'process_id': id,
'parent_process_id': null,
'process_name': processName,
'state': QueueState.pending.name,
'retry_count': 0,
'priority': getPriorityValue(priority),
'created_at': DateTime.now().millisecondsSinceEpoch,
},
);

unawaited(_startProcessing());

return id;
}

/// Returns the persisted task for [processId].
Future<QueueTask> getTask(String processId) async {
final database = await _ensureInitialized();
final result = await database.query(
tableName,
where: 'process_id = ?',
whereArgs: [processId],
limit: 1,
);

if (result.isEmpty) {
throw StateError('Queue task not found: $processId');
}

return QueueTask.fromMap(result.first);
}

/// Returns the current state for [processId].
Future<QueueState> getState(String processId) async {
final task = await getTask(processId);

return task.state;
}

/// Streams state updates for [processId].
Stream<QueueTask> listen(String processId) {
return QueueStream.controller.stream.where(
(task) => task.processId == processId,
);
}

/// Creates a new pending task from an existing task.
Future<String> retry(String processId) async {
final database = await _ensureInitialized();
final oldTask = await getTask(processId);
final newId = IdService.generate();
final function = _memoryFunctions[processId];

if (function != null) {
_memoryFunctions[newId] = function;
}

await database.insert(
tableName,
{
'process_id': newId,
'parent_process_id': processId,
'process_name': oldTask.processName,
'state': QueueState.pending.name,
'retry_count': oldTask.retryCount + 1,
'priority': getPriorityValue(oldTask.priority),
'created_at': DateTime.now().millisecondsSinceEpoch,
},
);

unawaited(_startProcessing());

return newId;
}

Future<void> _startProcessing() async {
final database = await _ensureInitialized();

await _worker.start(() async {
Comment on lines +126 to +129
while (true) {
final result = await database.rawQuery(
QueueQueries.fetchNextTask(tableName),
);
Comment on lines +126 to +133

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Issue: Tasks Stuck in inProgress After App Crash

If the app crashes or is terminated while a task is running, that task's state remains inProgress in the SQLite database. Since QueueQueries.fetchNextTask only queries for tasks with state = 'pending', any interrupted tasks will be stuck in inProgress forever and never retried or resumed.

Recommendation

On queue initialization (e.g., in init()), reset any tasks with inProgress state back to pending (or mark them as failed so they can be retried) to ensure the queue can recover from unexpected app terminations.

Comment on lines +131 to +133

if (result.isEmpty) {
break;
}

final task = result.first;
final processId = task['process_id'].toString();

await _setState(processId, QueueState.inProgress);

try {
final function = _memoryFunctions[processId];

if (function != null) {
await _executor.execute(function);
}

await _setState(processId, QueueState.success);
Comment on lines +147 to +151

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Do not mark tasks successful without a callback

When a pending row was persisted by an earlier app/session, _memoryFunctions is empty for that process_id; the next call that starts processing, such as adding a new task, will fetch that old pending row, skip execution because function == null, and still persist success. That silently reports work as completed even though the callback never ran, which breaks the persistent-queue behavior and retrying persisted tasks after restart.

Useful? React with 👍 / 👎.

Comment on lines +145 to +151
} catch (_) {
await _setState(processId, QueueState.failed);
}
}
});
}

Future<void> _setState(String processId, QueueState state) async {
final database = await _ensureInitialized();

await database.update(
tableName,
{'state': state.name},
where: 'process_id = ?',
whereArgs: [processId],
);

QueueStream.controller.add(await getTask(processId));
}
Comment on lines +126 to +170

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Issue: Correctness & Performance Optimization in Task Processing

  1. Correctness Bug: If the app restarts, any pending tasks in the database will be processed. However, since _memoryFunctions is empty on startup, function will be null. Currently, the code silently skips execution and marks the task as success. This is a critical correctness issue because tasks will be falsely reported as completed without ever running. We should throw an error if the function is missing so the task is marked as failed (or handled appropriately).
  2. Performance Overhead: In _setState, the code queries the database again via getTask(processId) just to emit the updated task to the stream. Since state updates happen twice per task (from pending to inProgress, and then to success/failed), this doubles the read load on SQLite.

Solution

We can convert the database query result to a QueueTask immediately, update its state in memory, and pass it to an optimized _updateTaskState method. This avoids the extra database read and ensures that missing functions throw an error.

  Future<void> _startProcessing() async {
    final database = await _ensureInitialized();

    await _worker.start(() async {
      while (true) {
        final result = await database.rawQuery(
          QueueQueries.fetchNextTask(tableName),
        );

        if (result.isEmpty) {
          break;
        }

        final task = QueueTask.fromMap(result.first);

        await _updateTaskState(task, QueueState.inProgress);

        try {
          final function = _memoryFunctions[task.processId];

          if (function == null) {
            throw StateError('No executable function registered for task ${task.processId}');
          }

          await _executor.execute(function);
          await _updateTaskState(task, QueueState.success);
        } catch (_) {
          await _updateTaskState(task, QueueState.failed);
        }
      }
    });
  }

  Future<void> _updateTaskState(QueueTask task, QueueState newState) async {
    final database = await _ensureInitialized();

    await database.update(
      tableName,
      {'state': newState.name},
      where: 'process_id = ?',
      whereArgs: [task.processId],
    );

    final updatedTask = QueueTask(
      processId: task.processId,
      parentProcessId: task.parentProcessId,
      processName: task.processName,
      state: newState,
      retryCount: task.retryCount,
      priority: task.priority,
      createdAt: task.createdAt,
    );

    QueueStream.controller.add(updatedTask);
  }


Future<Database> _ensureInitialized() async {
if (_database != null) {
return _database!;
}

await init();

return _database!;
}
}
30 changes: 30 additions & 0 deletions lib/core/queue_worker.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/// Ensures queue processing runs on a single sequential worker.
class QueueWorker {
bool _isRunning = false;
bool _shouldRunAgain = false;

/// Whether the worker is currently processing tasks.
bool get isRunning => _isRunning;

/// Starts the worker if it is not already running.
///
/// When start is requested while the worker is already active, one extra pass
/// is queued so tasks added near the end of the current pass are not stranded.
Future<void> start(Future<void> Function() callback) async {
if (_isRunning) {
_shouldRunAgain = true;
return;
}

_isRunning = true;

try {
do {
_shouldRunAgain = false;
await callback();
} while (_shouldRunAgain);
} finally {
_isRunning = false;
}
}
Comment on lines +13 to +29

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Issue: Leaked _shouldRunAgain State on Callback Failure

If the callback() throws an exception, the loop terminates immediately and the worker stops. However, if _shouldRunAgain was set to true concurrently, it is never reset to false. The next time start() is called, it will run an extra redundant pass even if no concurrent requests occurred during that run.

Solution

Reset _shouldRunAgain to false in the finally block to ensure clean state transitions.

  Future<void> start(Future<void> Function() callback) async {
    if (_isRunning) {
      _shouldRunAgain = true;
      return;
    }

    _isRunning = true;

    try {
      do {
        _shouldRunAgain = false;
        await callback();
      } while (_shouldRunAgain);
    } finally {
      _isRunning = false;
      _shouldRunAgain = false;
    }
  }

}
7 changes: 7 additions & 0 deletions lib/core/task_executor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/// Executes an in-memory queue callback.
class TaskExecutor {
/// Runs [function] and lets callers handle any thrown errors.
Future<void> execute(Future<void> Function() function) {
return function();
}
}
52 changes: 52 additions & 0 deletions lib/database/database_service.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import 'package:path/path.dart';
import 'package:sqflite/sqflite.dart';

/// Opens and prepares the SQLite database used by flow queues.
class DatabaseService {
static Database? _database;

/// Opens the database and ensures [tableName] exists.
static Future<Database> init(String tableName) async {
final normalizedTableName = normalizeTableName(tableName);

_database ??= await openDatabase(
join(await getDatabasesPath(), 'flow_queue.db'),
version: 1,
);
Comment on lines +6 to +15

await _database!.execute('''
CREATE TABLE IF NOT EXISTS $normalizedTableName (
process_id TEXT PRIMARY KEY,
parent_process_id TEXT,
process_name TEXT NOT NULL,
state TEXT NOT NULL,
retry_count INTEGER DEFAULT 0,
priority INTEGER NOT NULL,
created_at INTEGER NOT NULL
)
''');

return _database!;
}
Comment on lines +5 to +30

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Issue: Race Condition in Concurrent Database Initialization

Since DatabaseService._database is a static variable, if multiple FlowQueue instances or concurrent operations call DatabaseService.init() at the same time when the database is not yet opened, both will execute openDatabase concurrently. This can lead to multiple open connections to the same SQLite file, database locked errors, or database corruption.

Solution

We should use a static Future<Database>? _initFuture to synchronize concurrent initialization requests and ensure openDatabase is called exactly once.

class DatabaseService {
  static Database? _database;
  static Future<Database>? _initFuture;

  /// Opens the database and ensures [tableName] exists.
  static Future<Database> init(String tableName) async {
    final normalizedTableName = normalizeTableName(tableName);

    if (_database != null) {
      await _createTable(_database!, normalizedTableName);
      return _database!;
    }

    _initFuture ??= openDatabase(
      join(await getDatabasesPath(), 'flow_queue.db'),
      version: 1,
    );

    _database = await _initFuture;
    await _createTable(_database!, normalizedTableName);
    return _database!;
  }

  static Future<void> _createTable(Database db, String tableName) async {
    await db.execute('''
      CREATE TABLE IF NOT EXISTS $tableName (
        process_id TEXT PRIMARY KEY,
        parent_process_id TEXT,
        process_name TEXT NOT NULL,
        state TEXT NOT NULL,
        retry_count INTEGER DEFAULT 0,
        priority INTEGER NOT NULL,
        created_at INTEGER NOT NULL
      )
    ''');
  }


/// Returns a validated SQLite table identifier for [tableName].
static String normalizeTableName(String tableName) {
final trimmed = tableName.trim();

if (trimmed.isEmpty) {
throw ArgumentError.value(tableName, 'tableName', 'Must not be empty.');
}

final validIdentifier = RegExp(r'^[A-Za-z_][A-Za-z0-9_]*$');

if (!validIdentifier.hasMatch(trimmed)) {
throw ArgumentError.value(
tableName,
'tableName',
'Must be a valid SQLite identifier containing only letters, numbers, and underscores, and must not start with a number.',
);
}

return trimmed;
}
}
13 changes: 13 additions & 0 deletions lib/database/queue_queries.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/// SQL query builders used by the queue database layer.
class QueueQueries {
/// Returns a query that fetches the highest-priority pending task using FIFO
/// ordering when priorities match.
static String fetchNextTask(String tableName) {
return '''
SELECT * FROM $tableName
WHERE state = 'pending'
ORDER BY priority DESC, created_at ASC
LIMIT 1
''';
}
}
11 changes: 11 additions & 0 deletions lib/enums/queue_priority.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/// Priority levels used when selecting the next pending task.
enum QueuePriority {
/// Lowest priority, selected after moderate and high priority tasks.
defaultPriority,

/// Medium priority, selected after high priority tasks.
moderate,

/// Highest priority, selected before moderate and default tasks.
high,
}
14 changes: 14 additions & 0 deletions lib/enums/queue_state.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/// The lifecycle state of a queued task.
enum QueueState {
/// The task is waiting to be executed.
pending,

/// The task is currently being executed by the queue worker.
inProgress,

/// The task completed successfully.
success,

/// The task failed while executing.
failed,
}
11 changes: 6 additions & 5 deletions lib/flow_queue.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/// A Calculator.
class Calculator {
/// Returns [value] plus 1.
int addOne(int value) => value + 1;
}
library flow_queue;

export 'core/flow_queue.dart';
export 'enums/queue_priority.dart';
export 'enums/queue_state.dart';
export 'models/queue_task.dart';
Loading