Skip to content

Implement Phase 1 FlowQueue engine#1

Merged
Harsh4114 merged 1 commit into
mainfrom
codex/build-lightweight-flutter-queue-engine
Jun 4, 2026
Merged

Implement Phase 1 FlowQueue engine#1
Harsh4114 merged 1 commit into
mainfrom
codex/build-lightweight-flutter-queue-engine

Conversation

@Harsh4114

Copy link
Copy Markdown
Owner

Motivation

  • Provide a lightweight, persistent Flutter queue engine that stores tasks in SQLite, executes them sequentially, supports priority + FIFO ordering, persists queue state, exposes a small API, supports retry-by-new-task, and streams state updates.

Description

  • Implemented the core FlowQueue engine in lib/core/flow_queue.dart with lazy database initialization, add, getTask, getState, listen, retry, and sequential processing driven by _startProcessing and QueueWorker.
  • Added the database layer lib/database/database_service.dart and lib/database/queue_queries.dart to create per-queue tables and fetch the next pending task ordered by priority DESC, created_at ASC.
  • Added models, enums and helpers: QueueTask serialization (toMap/fromMap) in lib/models/queue_task.dart, QueueState and QueuePriority enums, priority_mapper, IdService, and a broadcast QueueStream for state updates.
  • Added lightweight execution scaffolding (TaskExecutor, QueueWorker), an example app at example/main.dart, updated pubspec.yaml to include sqflite, path, and uuid, and added unit tests in test/flow_queue_test.dart for task (de)serialization.

Testing

  • Ran git diff --check to validate the working tree and found no issues.
  • Attempted to run dart format but the dart executable is not available in the environment so formatting could not be run here.
  • Attempted flutter pub get and flutter test but the flutter executable is not available in the environment so unit tests were not executed here.
  • The repository includes test/flow_queue_test.dart which verifies QueueTask.toMap() and QueueTask.fromMap() and can be run in a local environment with Flutter installed.

Codex Task

Copilot AI review requested due to automatic review settings June 4, 2026 11:44
@Harsh4114 Harsh4114 merged commit a3aacda into main Jun 4, 2026
1 check passed

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: c40c8b8a1f

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread lib/core/flow_queue.dart
Comment on lines +147 to +151
if (function != null) {
await _executor.execute(function);
}

await _setState(processId, QueueState.success);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Do not mark tasks successful without a callback

When a pending row was persisted by an earlier app/session, _memoryFunctions is empty for that process_id; the next call that starts processing, such as adding a new task, will fetch that old pending row, skip execution because function == null, and still persist success. That silently reports work as completed even though the callback never ran, which breaks the persistent-queue behavior and retrying persisted tasks after restart.

Useful? React with 👍 / 👎.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces FlowQueue, a lightweight, persistent SQLite-backed queue engine for Flutter, along with an example application and unit tests. The review highlights several critical issues and improvement opportunities: a correctness bug where tasks with missing memory functions on restart are falsely marked as successful; a database read overhead in state updates; a potential race condition during concurrent database initialization; architectural concerns regarding memory leaks and persistence limitations of closures; tasks getting stuck in the inProgress state after an app crash; and a state leak in QueueWorker upon callback failures. Implementing the suggested registry-based pattern, synchronization futures, and state resets will significantly improve the reliability, performance, and robustness of the queue.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread lib/core/flow_queue.dart
Comment on lines +126 to +170
Future<void> _startProcessing() async {
final database = await _ensureInitialized();

await _worker.start(() async {
while (true) {
final result = await database.rawQuery(
QueueQueries.fetchNextTask(tableName),
);

if (result.isEmpty) {
break;
}

final task = result.first;
final processId = task['process_id'].toString();

await _setState(processId, QueueState.inProgress);

try {
final function = _memoryFunctions[processId];

if (function != null) {
await _executor.execute(function);
}

await _setState(processId, QueueState.success);
} catch (_) {
await _setState(processId, QueueState.failed);
}
}
});
}

Future<void> _setState(String processId, QueueState state) async {
final database = await _ensureInitialized();

await database.update(
tableName,
{'state': state.name},
where: 'process_id = ?',
whereArgs: [processId],
);

QueueStream.controller.add(await getTask(processId));
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Issue: Correctness & Performance Optimization in Task Processing

  1. Correctness Bug: If the app restarts, any pending tasks in the database will be processed. However, since _memoryFunctions is empty on startup, function will be null. Currently, the code silently skips execution and marks the task as success. This is a critical correctness issue because tasks will be falsely reported as completed without ever running. We should throw an error if the function is missing so the task is marked as failed (or handled appropriately).
  2. Performance Overhead: In _setState, the code queries the database again via getTask(processId) just to emit the updated task to the stream. Since state updates happen twice per task (from pending to inProgress, and then to success/failed), this doubles the read load on SQLite.

Solution

We can convert the database query result to a QueueTask immediately, update its state in memory, and pass it to an optimized _updateTaskState method. This avoids the extra database read and ensures that missing functions throw an error.

  Future<void> _startProcessing() async {
    final database = await _ensureInitialized();

    await _worker.start(() async {
      while (true) {
        final result = await database.rawQuery(
          QueueQueries.fetchNextTask(tableName),
        );

        if (result.isEmpty) {
          break;
        }

        final task = QueueTask.fromMap(result.first);

        await _updateTaskState(task, QueueState.inProgress);

        try {
          final function = _memoryFunctions[task.processId];

          if (function == null) {
            throw StateError('No executable function registered for task ${task.processId}');
          }

          await _executor.execute(function);
          await _updateTaskState(task, QueueState.success);
        } catch (_) {
          await _updateTaskState(task, QueueState.failed);
        }
      }
    });
  }

  Future<void> _updateTaskState(QueueTask task, QueueState newState) async {
    final database = await _ensureInitialized();

    await database.update(
      tableName,
      {'state': newState.name},
      where: 'process_id = ?',
      whereArgs: [task.processId],
    );

    final updatedTask = QueueTask(
      processId: task.processId,
      parentProcessId: task.parentProcessId,
      processName: task.processName,
      state: newState,
      retryCount: task.retryCount,
      priority: task.priority,
      createdAt: task.createdAt,
    );

    QueueStream.controller.add(updatedTask);
  }

Comment on lines +5 to +30
class DatabaseService {
static Database? _database;

/// Opens the database and ensures [tableName] exists.
static Future<Database> init(String tableName) async {
final normalizedTableName = normalizeTableName(tableName);

_database ??= await openDatabase(
join(await getDatabasesPath(), 'flow_queue.db'),
version: 1,
);

await _database!.execute('''
CREATE TABLE IF NOT EXISTS $normalizedTableName (
process_id TEXT PRIMARY KEY,
parent_process_id TEXT,
process_name TEXT NOT NULL,
state TEXT NOT NULL,
retry_count INTEGER DEFAULT 0,
priority INTEGER NOT NULL,
created_at INTEGER NOT NULL
)
''');

return _database!;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Issue: Race Condition in Concurrent Database Initialization

Since DatabaseService._database is a static variable, if multiple FlowQueue instances or concurrent operations call DatabaseService.init() at the same time when the database is not yet opened, both will execute openDatabase concurrently. This can lead to multiple open connections to the same SQLite file, database locked errors, or database corruption.

Solution

We should use a static Future<Database>? _initFuture to synchronize concurrent initialization requests and ensure openDatabase is called exactly once.

class DatabaseService {
  static Database? _database;
  static Future<Database>? _initFuture;

  /// Opens the database and ensures [tableName] exists.
  static Future<Database> init(String tableName) async {
    final normalizedTableName = normalizeTableName(tableName);

    if (_database != null) {
      await _createTable(_database!, normalizedTableName);
      return _database!;
    }

    _initFuture ??= openDatabase(
      join(await getDatabasesPath(), 'flow_queue.db'),
      version: 1,
    );

    _database = await _initFuture;
    await _createTable(_database!, normalizedTableName);
    return _database!;
  }

  static Future<void> _createTable(Database db, String tableName) async {
    await db.execute('''
      CREATE TABLE IF NOT EXISTS $tableName (
        process_id TEXT PRIMARY KEY,
        parent_process_id TEXT,
        process_name TEXT NOT NULL,
        state TEXT NOT NULL,
        retry_count INTEGER DEFAULT 0,
        priority INTEGER NOT NULL,
        created_at INTEGER NOT NULL
      )
    ''');
  }

Comment thread lib/core/flow_queue.dart

final QueueWorker _worker = QueueWorker();
final TaskExecutor _executor = TaskExecutor();
final Map<String, Future<void> Function()> _memoryFunctions = {};

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Architectural Concern: Memory Leak & Persistence Limitations

  1. Memory Leak: The _memoryFunctions map stores task closures indefinitely. As more tasks are added and executed, this map grows without bound, preventing garbage collection of any captured variables in the closures.
  2. Persistence Limitations: Since Dart closures cannot be serialized to SQLite, any pending tasks in the database will lose their executable functions when the app restarts. This means the queue cannot truly recover and execute pending tasks across app sessions.

Recommended Long-Term Solution

Instead of passing closures directly to add(), implement a registry-based pattern where task executors are registered globally on startup by processName:

queue.registerExecutor('upload_post', (task) async {
  // task execution logic
});

This allows the queue to look up the executor by processName (which is persisted in SQLite) upon app restart, enabling true persistence and eliminating the memory leak of keeping individual closures in memory.

Comment thread lib/core/flow_queue.dart
Comment on lines +126 to +133
Future<void> _startProcessing() async {
final database = await _ensureInitialized();

await _worker.start(() async {
while (true) {
final result = await database.rawQuery(
QueueQueries.fetchNextTask(tableName),
);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Issue: Tasks Stuck in inProgress After App Crash

If the app crashes or is terminated while a task is running, that task's state remains inProgress in the SQLite database. Since QueueQueries.fetchNextTask only queries for tasks with state = 'pending', any interrupted tasks will be stuck in inProgress forever and never retried or resumed.

Recommendation

On queue initialization (e.g., in init()), reset any tasks with inProgress state back to pending (or mark them as failed so they can be retried) to ensure the queue can recover from unexpected app terminations.

Comment thread lib/core/flow_queue.dart
Comment on lines +23 to +35
Database? _database;

final QueueWorker _worker = QueueWorker();
final TaskExecutor _executor = TaskExecutor();
final Map<String, Future<void> Function()> _memoryFunctions = {};

/// Creates a queue that persists tasks in [tableName].
FlowQueue(String tableName) : _tableName = tableName;

/// Opens the SQLite database and prepares this queue table.
Future<void> init() async {
_database = await DatabaseService.init(tableName);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Improvement: Guard Concurrent Initialization

If init() is called multiple times concurrently (or if multiple operations trigger _ensureInitialized() before the first initialization completes), it can lead to redundant database initialization calls. We can guard this by caching the initialization future.

  Database? _database;
  Future<void>? _initFuture;

  final QueueWorker _worker = QueueWorker();
  final TaskExecutor _executor = TaskExecutor();
  final Map<String, Future<void> Function()> _memoryFunctions = {};

  /// Creates a queue that persists tasks in [tableName].
  FlowQueue(String tableName) : _tableName = tableName;

  /// Opens the SQLite database and prepares this queue table.
  Future<void> init() async {
    _initFuture ??= _doInit();
    await _initFuture;
  }

  Future<void> _doInit() async {
    _database = await DatabaseService.init(tableName);
  }

Comment on lines +13 to +29
Future<void> start(Future<void> Function() callback) async {
if (_isRunning) {
_shouldRunAgain = true;
return;
}

_isRunning = true;

try {
do {
_shouldRunAgain = false;
await callback();
} while (_shouldRunAgain);
} finally {
_isRunning = false;
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Issue: Leaked _shouldRunAgain State on Callback Failure

If the callback() throws an exception, the loop terminates immediately and the worker stops. However, if _shouldRunAgain was set to true concurrently, it is never reset to false. The next time start() is called, it will run an extra redundant pass even if no concurrent requests occurred during that run.

Solution

Reset _shouldRunAgain to false in the finally block to ensure clean state transitions.

  Future<void> start(Future<void> Function() callback) async {
    if (_isRunning) {
      _shouldRunAgain = true;
      return;
    }

    _isRunning = true;

    try {
      do {
        _shouldRunAgain = false;
        await callback();
      } while (_shouldRunAgain);
    } finally {
      _isRunning = false;
      _shouldRunAgain = false;
    }
  }

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR replaces the starter template code with a Phase 1 implementation of a lightweight, SQLite-backed task queue for Flutter. It introduces the core FlowQueue engine, persistence layer, task model + enums, and a basic example app, along with initial unit tests for task (de)serialization.

Changes:

  • Added a SQLite persistence layer (DB init + per-queue tables) and query helpers to select next pending task by priority DESC, created_at ASC.
  • Implemented the FlowQueue engine with add, getTask, getState, listen, retry, and sequential processing via a single worker.
  • Added supporting models/enums/utilities (task serialization, priority mapping, UUID generation, state stream) plus an example app and basic serialization tests.

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
test/flow_queue_test.dart Replaced template test with QueueTask.toMap/fromMap tests.
pubspec.yaml Added sqflite, path, and uuid dependencies for persistence and IDs.
lib/flow_queue.dart Converted package entrypoint to export the new public API surface.
lib/core/flow_queue.dart Added the main queue engine: persistence, APIs, and sequential worker-driven processing.
lib/core/queue_worker.dart Added single-worker runner to prevent overlapping processing loops per instance.
lib/core/task_executor.dart Added thin execution wrapper for task callbacks.
lib/database/database_service.dart Added DB open + table creation and table-name normalization helper.
lib/database/queue_queries.dart Added SQL builder for selecting the next pending task by priority/FIFO.
lib/models/queue_task.dart Added persisted task model with SQLite serialization and state mapping.
lib/enums/queue_state.dart Added task lifecycle states used by the engine and persistence.
lib/enums/queue_priority.dart Added priority levels used in selection ordering.
lib/utils/priority_mapper.dart Added mapping between enum priority and persisted integer values.
lib/services/id_service.dart Added UUID v4 ID generation for tasks.
lib/streams/queue_stream.dart Added shared broadcast stream controller for task state updates.
example/main.dart Added a minimal Flutter example that enqueues and listens for task updates.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread lib/core/flow_queue.dart
Comment on lines +145 to +151
final function = _memoryFunctions[processId];

if (function != null) {
await _executor.execute(function);
}

await _setState(processId, QueueState.success);
Comment thread lib/core/flow_queue.dart
Comment on lines +131 to +133
final result = await database.rawQuery(
QueueQueries.fetchNextTask(tableName),
);
Comment thread lib/core/flow_queue.dart
final database = await _ensureInitialized();
final id = IdService.generate();

_memoryFunctions[id] = function;
Comment on lines +6 to +8
class QueueStream {
static final StreamController<QueueTask> controller =
StreamController<QueueTask>.broadcast();
Comment on lines +6 to +15
static Database? _database;

/// Opens the database and ensures [tableName] exists.
static Future<Database> init(String tableName) async {
final normalizedTableName = normalizeTableName(tableName);

_database ??= await openDatabase(
join(await getDatabasesPath(), 'flow_queue.db'),
version: 1,
);
Comment thread lib/core/flow_queue.dart
Comment on lines +126 to +129
Future<void> _startProcessing() async {
final database = await _ensureInitialized();

await _worker.start(() async {
Comment thread lib/core/flow_queue.dart
},
);

unawaited(_startProcessing());
Comment thread lib/core/flow_queue.dart
},
);

unawaited(_startProcessing());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants