-
Notifications
You must be signed in to change notification settings - Fork 0
Implement Phase 1 FlowQueue engine #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| import 'package:flow_queue/flow_queue.dart'; | ||
| import 'package:flutter/material.dart'; | ||
|
|
||
| void main() async { | ||
| WidgetsFlutterBinding.ensureInitialized(); | ||
|
|
||
| final queue = FlowQueue('post_queue'); | ||
|
|
||
| await queue.init(); | ||
|
|
||
| runApp(MyApp(queue)); | ||
| } | ||
|
|
||
| class MyApp extends StatelessWidget { | ||
| final FlowQueue queue; | ||
|
|
||
| const MyApp(this.queue, {super.key}); | ||
|
|
||
| @override | ||
| Widget build(BuildContext context) { | ||
| return MaterialApp( | ||
| home: HomePage(queue: queue), | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| class HomePage extends StatelessWidget { | ||
| final FlowQueue queue; | ||
|
|
||
| const HomePage({ | ||
| super.key, | ||
| required this.queue, | ||
| }); | ||
|
|
||
| @override | ||
| Widget build(BuildContext context) { | ||
| return Scaffold( | ||
| appBar: AppBar( | ||
| title: const Text('Flow Queue'), | ||
| ), | ||
| body: Center( | ||
| child: ElevatedButton( | ||
| onPressed: () async { | ||
| final id = await queue.add( | ||
| processName: 'upload_post', | ||
| priority: QueuePriority.high, | ||
| function: () async { | ||
| await Future<void>.delayed( | ||
| const Duration(seconds: 5), | ||
| ); | ||
|
|
||
| debugPrint('Uploaded'); | ||
| }, | ||
| ); | ||
|
|
||
| queue.listen(id).listen((task) { | ||
| debugPrint(task.state.name); | ||
| }); | ||
| }, | ||
| child: const Text('Upload'), | ||
| ), | ||
| ), | ||
| ); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,181 @@ | ||
| import 'dart:async'; | ||
|
|
||
| import 'package:sqflite/sqflite.dart'; | ||
|
|
||
| import '../database/database_service.dart'; | ||
| import '../database/queue_queries.dart'; | ||
| import '../enums/queue_priority.dart'; | ||
| import '../enums/queue_state.dart'; | ||
| import '../models/queue_task.dart'; | ||
| import '../services/id_service.dart'; | ||
| import '../streams/queue_stream.dart'; | ||
| import '../utils/priority_mapper.dart'; | ||
| import 'queue_worker.dart'; | ||
| import 'task_executor.dart'; | ||
|
|
||
| /// Lightweight persistent SQLite-backed queue engine. | ||
| class FlowQueue { | ||
| /// SQLite table used by this queue instance. | ||
| late final String tableName = DatabaseService.normalizeTableName(_tableName); | ||
|
|
||
| final String _tableName; | ||
|
|
||
| Database? _database; | ||
|
|
||
| final QueueWorker _worker = QueueWorker(); | ||
| final TaskExecutor _executor = TaskExecutor(); | ||
| final Map<String, Future<void> Function()> _memoryFunctions = {}; | ||
|
|
||
| /// Creates a queue that persists tasks in [tableName]. | ||
| FlowQueue(String tableName) : _tableName = tableName; | ||
|
|
||
| /// Opens the SQLite database and prepares this queue table. | ||
| Future<void> init() async { | ||
| _database = await DatabaseService.init(tableName); | ||
| } | ||
|
Comment on lines
+23
to
+35
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improvement: Guard Concurrent InitializationIf Database? _database;
Future<void>? _initFuture;
final QueueWorker _worker = QueueWorker();
final TaskExecutor _executor = TaskExecutor();
final Map<String, Future<void> Function()> _memoryFunctions = {};
/// Creates a queue that persists tasks in [tableName].
FlowQueue(String tableName) : _tableName = tableName;
/// Opens the SQLite database and prepares this queue table.
Future<void> init() async {
_initFuture ??= _doInit();
await _initFuture;
}
Future<void> _doInit() async {
_database = await DatabaseService.init(tableName);
} |
||
|
|
||
| /// Adds a task to the queue and starts processing pending work. | ||
| Future<String> add({ | ||
| required String processName, | ||
| required QueuePriority priority, | ||
| required Future<void> Function() function, | ||
| }) async { | ||
| final database = await _ensureInitialized(); | ||
| final id = IdService.generate(); | ||
|
|
||
| _memoryFunctions[id] = function; | ||
|
|
||
|
|
||
| await database.insert( | ||
| tableName, | ||
| { | ||
| 'process_id': id, | ||
| 'parent_process_id': null, | ||
| 'process_name': processName, | ||
| 'state': QueueState.pending.name, | ||
| 'retry_count': 0, | ||
| 'priority': getPriorityValue(priority), | ||
| 'created_at': DateTime.now().millisecondsSinceEpoch, | ||
| }, | ||
| ); | ||
|
|
||
| unawaited(_startProcessing()); | ||
|
|
||
|
|
||
| return id; | ||
| } | ||
|
|
||
| /// Returns the persisted task for [processId]. | ||
| Future<QueueTask> getTask(String processId) async { | ||
| final database = await _ensureInitialized(); | ||
| final result = await database.query( | ||
| tableName, | ||
| where: 'process_id = ?', | ||
| whereArgs: [processId], | ||
| limit: 1, | ||
| ); | ||
|
|
||
| if (result.isEmpty) { | ||
| throw StateError('Queue task not found: $processId'); | ||
| } | ||
|
|
||
| return QueueTask.fromMap(result.first); | ||
| } | ||
|
|
||
| /// Returns the current state for [processId]. | ||
| Future<QueueState> getState(String processId) async { | ||
| final task = await getTask(processId); | ||
|
|
||
| return task.state; | ||
| } | ||
|
|
||
| /// Streams state updates for [processId]. | ||
| Stream<QueueTask> listen(String processId) { | ||
| return QueueStream.controller.stream.where( | ||
| (task) => task.processId == processId, | ||
| ); | ||
| } | ||
|
|
||
| /// Creates a new pending task from an existing task. | ||
| Future<String> retry(String processId) async { | ||
| final database = await _ensureInitialized(); | ||
| final oldTask = await getTask(processId); | ||
| final newId = IdService.generate(); | ||
| final function = _memoryFunctions[processId]; | ||
|
|
||
| if (function != null) { | ||
| _memoryFunctions[newId] = function; | ||
| } | ||
|
|
||
| await database.insert( | ||
| tableName, | ||
| { | ||
| 'process_id': newId, | ||
| 'parent_process_id': processId, | ||
| 'process_name': oldTask.processName, | ||
| 'state': QueueState.pending.name, | ||
| 'retry_count': oldTask.retryCount + 1, | ||
| 'priority': getPriorityValue(oldTask.priority), | ||
| 'created_at': DateTime.now().millisecondsSinceEpoch, | ||
| }, | ||
| ); | ||
|
|
||
| unawaited(_startProcessing()); | ||
|
|
||
|
|
||
| return newId; | ||
| } | ||
|
|
||
| Future<void> _startProcessing() async { | ||
| final database = await _ensureInitialized(); | ||
|
|
||
| await _worker.start(() async { | ||
|
Comment on lines
+126
to
+129
|
||
| while (true) { | ||
| final result = await database.rawQuery( | ||
| QueueQueries.fetchNextTask(tableName), | ||
| ); | ||
|
Comment on lines
+126
to
+133
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: Tasks Stuck in
|
||
|
|
||
| if (result.isEmpty) { | ||
| break; | ||
| } | ||
|
|
||
| final task = result.first; | ||
| final processId = task['process_id'].toString(); | ||
|
|
||
| await _setState(processId, QueueState.inProgress); | ||
|
|
||
| try { | ||
| final function = _memoryFunctions[processId]; | ||
|
|
||
| if (function != null) { | ||
| await _executor.execute(function); | ||
| } | ||
|
|
||
| await _setState(processId, QueueState.success); | ||
|
Comment on lines
+147
to
+151
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When a pending row was persisted by an earlier app/session, Useful? React with 👍 / 👎.
Comment on lines
+145
to
+151
|
||
| } catch (_) { | ||
| await _setState(processId, QueueState.failed); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| Future<void> _setState(String processId, QueueState state) async { | ||
| final database = await _ensureInitialized(); | ||
|
|
||
| await database.update( | ||
| tableName, | ||
| {'state': state.name}, | ||
| where: 'process_id = ?', | ||
| whereArgs: [processId], | ||
| ); | ||
|
|
||
| QueueStream.controller.add(await getTask(processId)); | ||
| } | ||
|
Comment on lines
+126
to
+170
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: Correctness & Performance Optimization in Task Processing
SolutionWe can convert the database query result to a Future<void> _startProcessing() async {
final database = await _ensureInitialized();
await _worker.start(() async {
while (true) {
final result = await database.rawQuery(
QueueQueries.fetchNextTask(tableName),
);
if (result.isEmpty) {
break;
}
final task = QueueTask.fromMap(result.first);
await _updateTaskState(task, QueueState.inProgress);
try {
final function = _memoryFunctions[task.processId];
if (function == null) {
throw StateError('No executable function registered for task ${task.processId}');
}
await _executor.execute(function);
await _updateTaskState(task, QueueState.success);
} catch (_) {
await _updateTaskState(task, QueueState.failed);
}
}
});
}
Future<void> _updateTaskState(QueueTask task, QueueState newState) async {
final database = await _ensureInitialized();
await database.update(
tableName,
{'state': newState.name},
where: 'process_id = ?',
whereArgs: [task.processId],
);
final updatedTask = QueueTask(
processId: task.processId,
parentProcessId: task.parentProcessId,
processName: task.processName,
state: newState,
retryCount: task.retryCount,
priority: task.priority,
createdAt: task.createdAt,
);
QueueStream.controller.add(updatedTask);
} |
||
|
|
||
| Future<Database> _ensureInitialized() async { | ||
| if (_database != null) { | ||
| return _database!; | ||
| } | ||
|
|
||
| await init(); | ||
|
|
||
| return _database!; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| /// Ensures queue processing runs on a single sequential worker. | ||
| class QueueWorker { | ||
| bool _isRunning = false; | ||
| bool _shouldRunAgain = false; | ||
|
|
||
| /// Whether the worker is currently processing tasks. | ||
| bool get isRunning => _isRunning; | ||
|
|
||
| /// Starts the worker if it is not already running. | ||
| /// | ||
| /// When start is requested while the worker is already active, one extra pass | ||
| /// is queued so tasks added near the end of the current pass are not stranded. | ||
| Future<void> start(Future<void> Function() callback) async { | ||
| if (_isRunning) { | ||
| _shouldRunAgain = true; | ||
| return; | ||
| } | ||
|
|
||
| _isRunning = true; | ||
|
|
||
| try { | ||
| do { | ||
| _shouldRunAgain = false; | ||
| await callback(); | ||
| } while (_shouldRunAgain); | ||
| } finally { | ||
| _isRunning = false; | ||
| } | ||
| } | ||
|
Comment on lines
+13
to
+29
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: Leaked
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| /// Executes an in-memory queue callback. | ||
| class TaskExecutor { | ||
| /// Runs [function] and lets callers handle any thrown errors. | ||
| Future<void> execute(Future<void> Function() function) { | ||
| return function(); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| import 'package:path/path.dart'; | ||
| import 'package:sqflite/sqflite.dart'; | ||
|
|
||
| /// Opens and prepares the SQLite database used by flow queues. | ||
| class DatabaseService { | ||
| static Database? _database; | ||
|
|
||
| /// Opens the database and ensures [tableName] exists. | ||
| static Future<Database> init(String tableName) async { | ||
| final normalizedTableName = normalizeTableName(tableName); | ||
|
|
||
| _database ??= await openDatabase( | ||
| join(await getDatabasesPath(), 'flow_queue.db'), | ||
| version: 1, | ||
| ); | ||
|
Comment on lines
+6
to
+15
|
||
|
|
||
| await _database!.execute(''' | ||
| CREATE TABLE IF NOT EXISTS $normalizedTableName ( | ||
| process_id TEXT PRIMARY KEY, | ||
| parent_process_id TEXT, | ||
| process_name TEXT NOT NULL, | ||
| state TEXT NOT NULL, | ||
| retry_count INTEGER DEFAULT 0, | ||
| priority INTEGER NOT NULL, | ||
| created_at INTEGER NOT NULL | ||
| ) | ||
| '''); | ||
|
|
||
| return _database!; | ||
| } | ||
|
Comment on lines
+5
to
+30
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: Race Condition in Concurrent Database InitializationSince SolutionWe should use a static class DatabaseService {
static Database? _database;
static Future<Database>? _initFuture;
/// Opens the database and ensures [tableName] exists.
static Future<Database> init(String tableName) async {
final normalizedTableName = normalizeTableName(tableName);
if (_database != null) {
await _createTable(_database!, normalizedTableName);
return _database!;
}
_initFuture ??= openDatabase(
join(await getDatabasesPath(), 'flow_queue.db'),
version: 1,
);
_database = await _initFuture;
await _createTable(_database!, normalizedTableName);
return _database!;
}
static Future<void> _createTable(Database db, String tableName) async {
await db.execute('''
CREATE TABLE IF NOT EXISTS $tableName (
process_id TEXT PRIMARY KEY,
parent_process_id TEXT,
process_name TEXT NOT NULL,
state TEXT NOT NULL,
retry_count INTEGER DEFAULT 0,
priority INTEGER NOT NULL,
created_at INTEGER NOT NULL
)
''');
} |
||
|
|
||
| /// Returns a validated SQLite table identifier for [tableName]. | ||
| static String normalizeTableName(String tableName) { | ||
| final trimmed = tableName.trim(); | ||
|
|
||
| if (trimmed.isEmpty) { | ||
| throw ArgumentError.value(tableName, 'tableName', 'Must not be empty.'); | ||
| } | ||
|
|
||
| final validIdentifier = RegExp(r'^[A-Za-z_][A-Za-z0-9_]*$'); | ||
|
|
||
| if (!validIdentifier.hasMatch(trimmed)) { | ||
| throw ArgumentError.value( | ||
| tableName, | ||
| 'tableName', | ||
| 'Must be a valid SQLite identifier containing only letters, numbers, and underscores, and must not start with a number.', | ||
| ); | ||
| } | ||
|
|
||
| return trimmed; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| /// SQL query builders used by the queue database layer. | ||
| class QueueQueries { | ||
| /// Returns a query that fetches the highest-priority pending task using FIFO | ||
| /// ordering when priorities match. | ||
| static String fetchNextTask(String tableName) { | ||
| return ''' | ||
| SELECT * FROM $tableName | ||
| WHERE state = 'pending' | ||
| ORDER BY priority DESC, created_at ASC | ||
| LIMIT 1 | ||
| '''; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| /// Priority levels used when selecting the next pending task. | ||
| enum QueuePriority { | ||
| /// Lowest priority, selected after moderate and high priority tasks. | ||
| defaultPriority, | ||
|
|
||
| /// Medium priority, selected after high priority tasks. | ||
| moderate, | ||
|
|
||
| /// Highest priority, selected before moderate and default tasks. | ||
| high, | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| /// The lifecycle state of a queued task. | ||
| enum QueueState { | ||
| /// The task is waiting to be executed. | ||
| pending, | ||
|
|
||
| /// The task is currently being executed by the queue worker. | ||
| inProgress, | ||
|
|
||
| /// The task completed successfully. | ||
| success, | ||
|
|
||
| /// The task failed while executing. | ||
| failed, | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| /// A Calculator. | ||
| class Calculator { | ||
| /// Returns [value] plus 1. | ||
| int addOne(int value) => value + 1; | ||
| } | ||
| library flow_queue; | ||
|
|
||
| export 'core/flow_queue.dart'; | ||
| export 'enums/queue_priority.dart'; | ||
| export 'enums/queue_state.dart'; | ||
| export 'models/queue_task.dart'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Architectural Concern: Memory Leak & Persistence Limitations
_memoryFunctionsmap stores task closures indefinitely. As more tasks are added and executed, this map grows without bound, preventing garbage collection of any captured variables in the closures.Recommended Long-Term Solution
Instead of passing closures directly to
add(), implement a registry-based pattern where task executors are registered globally on startup byprocessName:This allows the queue to look up the executor by
processName(which is persisted in SQLite) upon app restart, enabling true persistence and eliminating the memory leak of keeping individual closures in memory.