Implement Phase 1 FlowQueue engine#1
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c40c8b8a1f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if (function != null) { | ||
| await _executor.execute(function); | ||
| } | ||
|
|
||
| await _setState(processId, QueueState.success); |
There was a problem hiding this comment.
Do not mark tasks successful without a callback
When a pending row was persisted by an earlier app/session, _memoryFunctions is empty for that process_id; the next call that starts processing, such as adding a new task, will fetch that old pending row, skip execution because function == null, and still persist success. That silently reports work as completed even though the callback never ran, which breaks the persistent-queue behavior and retrying persisted tasks after restart.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Code Review
This pull request introduces FlowQueue, a lightweight, persistent SQLite-backed queue engine for Flutter, along with an example application and unit tests. The review highlights several critical issues and improvement opportunities: a correctness bug where tasks with missing memory functions on restart are falsely marked as successful; a database read overhead in state updates; a potential race condition during concurrent database initialization; architectural concerns regarding memory leaks and persistence limitations of closures; tasks getting stuck in the inProgress state after an app crash; and a state leak in QueueWorker upon callback failures. Implementing the suggested registry-based pattern, synchronization futures, and state resets will significantly improve the reliability, performance, and robustness of the queue.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| Future<void> _startProcessing() async { | ||
| final database = await _ensureInitialized(); | ||
|
|
||
| await _worker.start(() async { | ||
| while (true) { | ||
| final result = await database.rawQuery( | ||
| QueueQueries.fetchNextTask(tableName), | ||
| ); | ||
|
|
||
| if (result.isEmpty) { | ||
| break; | ||
| } | ||
|
|
||
| final task = result.first; | ||
| final processId = task['process_id'].toString(); | ||
|
|
||
| await _setState(processId, QueueState.inProgress); | ||
|
|
||
| try { | ||
| final function = _memoryFunctions[processId]; | ||
|
|
||
| if (function != null) { | ||
| await _executor.execute(function); | ||
| } | ||
|
|
||
| await _setState(processId, QueueState.success); | ||
| } catch (_) { | ||
| await _setState(processId, QueueState.failed); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| Future<void> _setState(String processId, QueueState state) async { | ||
| final database = await _ensureInitialized(); | ||
|
|
||
| await database.update( | ||
| tableName, | ||
| {'state': state.name}, | ||
| where: 'process_id = ?', | ||
| whereArgs: [processId], | ||
| ); | ||
|
|
||
| QueueStream.controller.add(await getTask(processId)); | ||
| } |
There was a problem hiding this comment.
Issue: Correctness & Performance Optimization in Task Processing
- Correctness Bug: If the app restarts, any pending tasks in the database will be processed. However, since
_memoryFunctionsis empty on startup,functionwill benull. Currently, the code silently skips execution and marks the task assuccess. This is a critical correctness issue because tasks will be falsely reported as completed without ever running. We should throw an error if the function is missing so the task is marked asfailed(or handled appropriately). - Performance Overhead: In
_setState, the code queries the database again viagetTask(processId)just to emit the updated task to the stream. Since state updates happen twice per task (frompendingtoinProgress, and then tosuccess/failed), this doubles the read load on SQLite.
Solution
We can convert the database query result to a QueueTask immediately, update its state in memory, and pass it to an optimized _updateTaskState method. This avoids the extra database read and ensures that missing functions throw an error.
Future<void> _startProcessing() async {
final database = await _ensureInitialized();
await _worker.start(() async {
while (true) {
final result = await database.rawQuery(
QueueQueries.fetchNextTask(tableName),
);
if (result.isEmpty) {
break;
}
final task = QueueTask.fromMap(result.first);
await _updateTaskState(task, QueueState.inProgress);
try {
final function = _memoryFunctions[task.processId];
if (function == null) {
throw StateError('No executable function registered for task ${task.processId}');
}
await _executor.execute(function);
await _updateTaskState(task, QueueState.success);
} catch (_) {
await _updateTaskState(task, QueueState.failed);
}
}
});
}
Future<void> _updateTaskState(QueueTask task, QueueState newState) async {
final database = await _ensureInitialized();
await database.update(
tableName,
{'state': newState.name},
where: 'process_id = ?',
whereArgs: [task.processId],
);
final updatedTask = QueueTask(
processId: task.processId,
parentProcessId: task.parentProcessId,
processName: task.processName,
state: newState,
retryCount: task.retryCount,
priority: task.priority,
createdAt: task.createdAt,
);
QueueStream.controller.add(updatedTask);
}| class DatabaseService { | ||
| static Database? _database; | ||
|
|
||
| /// Opens the database and ensures [tableName] exists. | ||
| static Future<Database> init(String tableName) async { | ||
| final normalizedTableName = normalizeTableName(tableName); | ||
|
|
||
| _database ??= await openDatabase( | ||
| join(await getDatabasesPath(), 'flow_queue.db'), | ||
| version: 1, | ||
| ); | ||
|
|
||
| await _database!.execute(''' | ||
| CREATE TABLE IF NOT EXISTS $normalizedTableName ( | ||
| process_id TEXT PRIMARY KEY, | ||
| parent_process_id TEXT, | ||
| process_name TEXT NOT NULL, | ||
| state TEXT NOT NULL, | ||
| retry_count INTEGER DEFAULT 0, | ||
| priority INTEGER NOT NULL, | ||
| created_at INTEGER NOT NULL | ||
| ) | ||
| '''); | ||
|
|
||
| return _database!; | ||
| } |
There was a problem hiding this comment.
Issue: Race Condition in Concurrent Database Initialization
Since DatabaseService._database is a static variable, if multiple FlowQueue instances or concurrent operations call DatabaseService.init() at the same time when the database is not yet opened, both will execute openDatabase concurrently. This can lead to multiple open connections to the same SQLite file, database locked errors, or database corruption.
Solution
We should use a static Future<Database>? _initFuture to synchronize concurrent initialization requests and ensure openDatabase is called exactly once.
class DatabaseService {
static Database? _database;
static Future<Database>? _initFuture;
/// Opens the database and ensures [tableName] exists.
static Future<Database> init(String tableName) async {
final normalizedTableName = normalizeTableName(tableName);
if (_database != null) {
await _createTable(_database!, normalizedTableName);
return _database!;
}
_initFuture ??= openDatabase(
join(await getDatabasesPath(), 'flow_queue.db'),
version: 1,
);
_database = await _initFuture;
await _createTable(_database!, normalizedTableName);
return _database!;
}
static Future<void> _createTable(Database db, String tableName) async {
await db.execute('''
CREATE TABLE IF NOT EXISTS $tableName (
process_id TEXT PRIMARY KEY,
parent_process_id TEXT,
process_name TEXT NOT NULL,
state TEXT NOT NULL,
retry_count INTEGER DEFAULT 0,
priority INTEGER NOT NULL,
created_at INTEGER NOT NULL
)
''');
}|
|
||
| final QueueWorker _worker = QueueWorker(); | ||
| final TaskExecutor _executor = TaskExecutor(); | ||
| final Map<String, Future<void> Function()> _memoryFunctions = {}; |
There was a problem hiding this comment.
Architectural Concern: Memory Leak & Persistence Limitations
- Memory Leak: The
_memoryFunctionsmap stores task closures indefinitely. As more tasks are added and executed, this map grows without bound, preventing garbage collection of any captured variables in the closures. - Persistence Limitations: Since Dart closures cannot be serialized to SQLite, any pending tasks in the database will lose their executable functions when the app restarts. This means the queue cannot truly recover and execute pending tasks across app sessions.
Recommended Long-Term Solution
Instead of passing closures directly to add(), implement a registry-based pattern where task executors are registered globally on startup by processName:
queue.registerExecutor('upload_post', (task) async {
// task execution logic
});This allows the queue to look up the executor by processName (which is persisted in SQLite) upon app restart, enabling true persistence and eliminating the memory leak of keeping individual closures in memory.
| Future<void> _startProcessing() async { | ||
| final database = await _ensureInitialized(); | ||
|
|
||
| await _worker.start(() async { | ||
| while (true) { | ||
| final result = await database.rawQuery( | ||
| QueueQueries.fetchNextTask(tableName), | ||
| ); |
There was a problem hiding this comment.
Issue: Tasks Stuck in inProgress After App Crash
If the app crashes or is terminated while a task is running, that task's state remains inProgress in the SQLite database. Since QueueQueries.fetchNextTask only queries for tasks with state = 'pending', any interrupted tasks will be stuck in inProgress forever and never retried or resumed.
Recommendation
On queue initialization (e.g., in init()), reset any tasks with inProgress state back to pending (or mark them as failed so they can be retried) to ensure the queue can recover from unexpected app terminations.
| Database? _database; | ||
|
|
||
| final QueueWorker _worker = QueueWorker(); | ||
| final TaskExecutor _executor = TaskExecutor(); | ||
| final Map<String, Future<void> Function()> _memoryFunctions = {}; | ||
|
|
||
| /// Creates a queue that persists tasks in [tableName]. | ||
| FlowQueue(String tableName) : _tableName = tableName; | ||
|
|
||
| /// Opens the SQLite database and prepares this queue table. | ||
| Future<void> init() async { | ||
| _database = await DatabaseService.init(tableName); | ||
| } |
There was a problem hiding this comment.
Improvement: Guard Concurrent Initialization
If init() is called multiple times concurrently (or if multiple operations trigger _ensureInitialized() before the first initialization completes), it can lead to redundant database initialization calls. We can guard this by caching the initialization future.
Database? _database;
Future<void>? _initFuture;
final QueueWorker _worker = QueueWorker();
final TaskExecutor _executor = TaskExecutor();
final Map<String, Future<void> Function()> _memoryFunctions = {};
/// Creates a queue that persists tasks in [tableName].
FlowQueue(String tableName) : _tableName = tableName;
/// Opens the SQLite database and prepares this queue table.
Future<void> init() async {
_initFuture ??= _doInit();
await _initFuture;
}
Future<void> _doInit() async {
_database = await DatabaseService.init(tableName);
}| Future<void> start(Future<void> Function() callback) async { | ||
| if (_isRunning) { | ||
| _shouldRunAgain = true; | ||
| return; | ||
| } | ||
|
|
||
| _isRunning = true; | ||
|
|
||
| try { | ||
| do { | ||
| _shouldRunAgain = false; | ||
| await callback(); | ||
| } while (_shouldRunAgain); | ||
| } finally { | ||
| _isRunning = false; | ||
| } | ||
| } |
There was a problem hiding this comment.
Issue: Leaked _shouldRunAgain State on Callback Failure
If the callback() throws an exception, the loop terminates immediately and the worker stops. However, if _shouldRunAgain was set to true concurrently, it is never reset to false. The next time start() is called, it will run an extra redundant pass even if no concurrent requests occurred during that run.
Solution
Reset _shouldRunAgain to false in the finally block to ensure clean state transitions.
Future<void> start(Future<void> Function() callback) async {
if (_isRunning) {
_shouldRunAgain = true;
return;
}
_isRunning = true;
try {
do {
_shouldRunAgain = false;
await callback();
} while (_shouldRunAgain);
} finally {
_isRunning = false;
_shouldRunAgain = false;
}
}There was a problem hiding this comment.
Pull request overview
This PR replaces the starter template code with a Phase 1 implementation of a lightweight, SQLite-backed task queue for Flutter. It introduces the core FlowQueue engine, persistence layer, task model + enums, and a basic example app, along with initial unit tests for task (de)serialization.
Changes:
- Added a SQLite persistence layer (DB init + per-queue tables) and query helpers to select next pending task by
priority DESC, created_at ASC. - Implemented the
FlowQueueengine withadd,getTask,getState,listen,retry, and sequential processing via a single worker. - Added supporting models/enums/utilities (task serialization, priority mapping, UUID generation, state stream) plus an example app and basic serialization tests.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| test/flow_queue_test.dart | Replaced template test with QueueTask.toMap/fromMap tests. |
| pubspec.yaml | Added sqflite, path, and uuid dependencies for persistence and IDs. |
| lib/flow_queue.dart | Converted package entrypoint to export the new public API surface. |
| lib/core/flow_queue.dart | Added the main queue engine: persistence, APIs, and sequential worker-driven processing. |
| lib/core/queue_worker.dart | Added single-worker runner to prevent overlapping processing loops per instance. |
| lib/core/task_executor.dart | Added thin execution wrapper for task callbacks. |
| lib/database/database_service.dart | Added DB open + table creation and table-name normalization helper. |
| lib/database/queue_queries.dart | Added SQL builder for selecting the next pending task by priority/FIFO. |
| lib/models/queue_task.dart | Added persisted task model with SQLite serialization and state mapping. |
| lib/enums/queue_state.dart | Added task lifecycle states used by the engine and persistence. |
| lib/enums/queue_priority.dart | Added priority levels used in selection ordering. |
| lib/utils/priority_mapper.dart | Added mapping between enum priority and persisted integer values. |
| lib/services/id_service.dart | Added UUID v4 ID generation for tasks. |
| lib/streams/queue_stream.dart | Added shared broadcast stream controller for task state updates. |
| example/main.dart | Added a minimal Flutter example that enqueues and listens for task updates. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| final function = _memoryFunctions[processId]; | ||
|
|
||
| if (function != null) { | ||
| await _executor.execute(function); | ||
| } | ||
|
|
||
| await _setState(processId, QueueState.success); |
| final result = await database.rawQuery( | ||
| QueueQueries.fetchNextTask(tableName), | ||
| ); |
| final database = await _ensureInitialized(); | ||
| final id = IdService.generate(); | ||
|
|
||
| _memoryFunctions[id] = function; |
| class QueueStream { | ||
| static final StreamController<QueueTask> controller = | ||
| StreamController<QueueTask>.broadcast(); |
| static Database? _database; | ||
|
|
||
| /// Opens the database and ensures [tableName] exists. | ||
| static Future<Database> init(String tableName) async { | ||
| final normalizedTableName = normalizeTableName(tableName); | ||
|
|
||
| _database ??= await openDatabase( | ||
| join(await getDatabasesPath(), 'flow_queue.db'), | ||
| version: 1, | ||
| ); |
| Future<void> _startProcessing() async { | ||
| final database = await _ensureInitialized(); | ||
|
|
||
| await _worker.start(() async { |
| }, | ||
| ); | ||
|
|
||
| unawaited(_startProcessing()); |
| }, | ||
| ); | ||
|
|
||
| unawaited(_startProcessing()); |
Motivation
Description
FlowQueueengine inlib/core/flow_queue.dartwith lazy database initialization,add,getTask,getState,listen,retry, and sequential processing driven by_startProcessingandQueueWorker.lib/database/database_service.dartandlib/database/queue_queries.dartto create per-queue tables and fetch the next pending task ordered bypriority DESC, created_at ASC.QueueTaskserialization (toMap/fromMap) inlib/models/queue_task.dart,QueueStateandQueuePriorityenums,priority_mapper,IdService, and a broadcastQueueStreamfor state updates.TaskExecutor,QueueWorker), an example app atexample/main.dart, updatedpubspec.yamlto includesqflite,path, anduuid, and added unit tests intest/flow_queue_test.dartfor task (de)serialization.Testing
git diff --checkto validate the working tree and found no issues.dart formatbut thedartexecutable is not available in the environment so formatting could not be run here.flutter pub getandflutter testbut theflutterexecutable is not available in the environment so unit tests were not executed here.test/flow_queue_test.dartwhich verifiesQueueTask.toMap()andQueueTask.fromMap()and can be run in a local environment with Flutter installed.Codex Task