From c40c8b8a1f382d6c17f490059d96f51578d57a4b Mon Sep 17 00:00:00 2001 From: Harsh Kumar <71206223+Harsh4114@users.noreply.github.com> Date: Thu, 4 Jun 2026 17:14:25 +0530 Subject: [PATCH] Implement phase 1 flow queue --- example/main.dart | 65 +++++++++++ lib/core/flow_queue.dart | 181 +++++++++++++++++++++++++++++ lib/core/queue_worker.dart | 30 +++++ lib/core/task_executor.dart | 7 ++ lib/database/database_service.dart | 52 +++++++++ lib/database/queue_queries.dart | 13 +++ lib/enums/queue_priority.dart | 11 ++ lib/enums/queue_state.dart | 14 +++ lib/flow_queue.dart | 11 +- lib/models/queue_task.dart | 77 ++++++++++++ lib/services/id_service.dart | 11 ++ lib/streams/queue_stream.dart | 9 ++ lib/utils/priority_mapper.dart | 25 ++++ pubspec.yaml | 3 + test/flow_queue_test.dart | 48 ++++++-- 15 files changed, 545 insertions(+), 12 deletions(-) create mode 100644 example/main.dart create mode 100644 lib/core/flow_queue.dart create mode 100644 lib/core/queue_worker.dart create mode 100644 lib/core/task_executor.dart create mode 100644 lib/database/database_service.dart create mode 100644 lib/database/queue_queries.dart create mode 100644 lib/enums/queue_priority.dart create mode 100644 lib/enums/queue_state.dart create mode 100644 lib/models/queue_task.dart create mode 100644 lib/services/id_service.dart create mode 100644 lib/streams/queue_stream.dart create mode 100644 lib/utils/priority_mapper.dart diff --git a/example/main.dart b/example/main.dart new file mode 100644 index 0000000..a06a4ea --- /dev/null +++ b/example/main.dart @@ -0,0 +1,65 @@ +import 'package:flow_queue/flow_queue.dart'; +import 'package:flutter/material.dart'; + +void main() async { + WidgetsFlutterBinding.ensureInitialized(); + + final queue = FlowQueue('post_queue'); + + await queue.init(); + + runApp(MyApp(queue)); +} + +class MyApp extends StatelessWidget { + final FlowQueue queue; + + const MyApp(this.queue, {super.key}); + + @override + Widget build(BuildContext context) { + return MaterialApp( + home: HomePage(queue: queue), + ); + } +} + +class HomePage extends StatelessWidget { + final FlowQueue queue; + + const HomePage({ + super.key, + required this.queue, + }); + + @override + Widget build(BuildContext context) { + return Scaffold( + appBar: AppBar( + title: const Text('Flow Queue'), + ), + body: Center( + child: ElevatedButton( + onPressed: () async { + final id = await queue.add( + processName: 'upload_post', + priority: QueuePriority.high, + function: () async { + await Future.delayed( + const Duration(seconds: 5), + ); + + debugPrint('Uploaded'); + }, + ); + + queue.listen(id).listen((task) { + debugPrint(task.state.name); + }); + }, + child: const Text('Upload'), + ), + ), + ); + } +} diff --git a/lib/core/flow_queue.dart b/lib/core/flow_queue.dart new file mode 100644 index 0000000..f265a47 --- /dev/null +++ b/lib/core/flow_queue.dart @@ -0,0 +1,181 @@ +import 'dart:async'; + +import 'package:sqflite/sqflite.dart'; + +import '../database/database_service.dart'; +import '../database/queue_queries.dart'; +import '../enums/queue_priority.dart'; +import '../enums/queue_state.dart'; +import '../models/queue_task.dart'; +import '../services/id_service.dart'; +import '../streams/queue_stream.dart'; +import '../utils/priority_mapper.dart'; +import 'queue_worker.dart'; +import 'task_executor.dart'; + +/// Lightweight persistent SQLite-backed queue engine. +class FlowQueue { + /// SQLite table used by this queue instance. + late final String tableName = DatabaseService.normalizeTableName(_tableName); + + final String _tableName; + + Database? _database; + + final QueueWorker _worker = QueueWorker(); + final TaskExecutor _executor = TaskExecutor(); + final Map Function()> _memoryFunctions = {}; + + /// Creates a queue that persists tasks in [tableName]. + FlowQueue(String tableName) : _tableName = tableName; + + /// Opens the SQLite database and prepares this queue table. + Future init() async { + _database = await DatabaseService.init(tableName); + } + + /// Adds a task to the queue and starts processing pending work. + Future add({ + required String processName, + required QueuePriority priority, + required Future Function() function, + }) async { + final database = await _ensureInitialized(); + final id = IdService.generate(); + + _memoryFunctions[id] = function; + + await database.insert( + tableName, + { + 'process_id': id, + 'parent_process_id': null, + 'process_name': processName, + 'state': QueueState.pending.name, + 'retry_count': 0, + 'priority': getPriorityValue(priority), + 'created_at': DateTime.now().millisecondsSinceEpoch, + }, + ); + + unawaited(_startProcessing()); + + return id; + } + + /// Returns the persisted task for [processId]. + Future getTask(String processId) async { + final database = await _ensureInitialized(); + final result = await database.query( + tableName, + where: 'process_id = ?', + whereArgs: [processId], + limit: 1, + ); + + if (result.isEmpty) { + throw StateError('Queue task not found: $processId'); + } + + return QueueTask.fromMap(result.first); + } + + /// Returns the current state for [processId]. + Future getState(String processId) async { + final task = await getTask(processId); + + return task.state; + } + + /// Streams state updates for [processId]. + Stream listen(String processId) { + return QueueStream.controller.stream.where( + (task) => task.processId == processId, + ); + } + + /// Creates a new pending task from an existing task. + Future retry(String processId) async { + final database = await _ensureInitialized(); + final oldTask = await getTask(processId); + final newId = IdService.generate(); + final function = _memoryFunctions[processId]; + + if (function != null) { + _memoryFunctions[newId] = function; + } + + await database.insert( + tableName, + { + 'process_id': newId, + 'parent_process_id': processId, + 'process_name': oldTask.processName, + 'state': QueueState.pending.name, + 'retry_count': oldTask.retryCount + 1, + 'priority': getPriorityValue(oldTask.priority), + 'created_at': DateTime.now().millisecondsSinceEpoch, + }, + ); + + unawaited(_startProcessing()); + + return newId; + } + + Future _startProcessing() async { + final database = await _ensureInitialized(); + + await _worker.start(() async { + while (true) { + final result = await database.rawQuery( + QueueQueries.fetchNextTask(tableName), + ); + + if (result.isEmpty) { + break; + } + + final task = result.first; + final processId = task['process_id'].toString(); + + await _setState(processId, QueueState.inProgress); + + try { + final function = _memoryFunctions[processId]; + + if (function != null) { + await _executor.execute(function); + } + + await _setState(processId, QueueState.success); + } catch (_) { + await _setState(processId, QueueState.failed); + } + } + }); + } + + Future _setState(String processId, QueueState state) async { + final database = await _ensureInitialized(); + + await database.update( + tableName, + {'state': state.name}, + where: 'process_id = ?', + whereArgs: [processId], + ); + + QueueStream.controller.add(await getTask(processId)); + } + + Future _ensureInitialized() async { + if (_database != null) { + return _database!; + } + + await init(); + + return _database!; + } +} diff --git a/lib/core/queue_worker.dart b/lib/core/queue_worker.dart new file mode 100644 index 0000000..f8ebc20 --- /dev/null +++ b/lib/core/queue_worker.dart @@ -0,0 +1,30 @@ +/// Ensures queue processing runs on a single sequential worker. +class QueueWorker { + bool _isRunning = false; + bool _shouldRunAgain = false; + + /// Whether the worker is currently processing tasks. + bool get isRunning => _isRunning; + + /// Starts the worker if it is not already running. + /// + /// When start is requested while the worker is already active, one extra pass + /// is queued so tasks added near the end of the current pass are not stranded. + Future start(Future Function() callback) async { + if (_isRunning) { + _shouldRunAgain = true; + return; + } + + _isRunning = true; + + try { + do { + _shouldRunAgain = false; + await callback(); + } while (_shouldRunAgain); + } finally { + _isRunning = false; + } + } +} diff --git a/lib/core/task_executor.dart b/lib/core/task_executor.dart new file mode 100644 index 0000000..5b3fa56 --- /dev/null +++ b/lib/core/task_executor.dart @@ -0,0 +1,7 @@ +/// Executes an in-memory queue callback. +class TaskExecutor { + /// Runs [function] and lets callers handle any thrown errors. + Future execute(Future Function() function) { + return function(); + } +} diff --git a/lib/database/database_service.dart b/lib/database/database_service.dart new file mode 100644 index 0000000..43a13e4 --- /dev/null +++ b/lib/database/database_service.dart @@ -0,0 +1,52 @@ +import 'package:path/path.dart'; +import 'package:sqflite/sqflite.dart'; + +/// Opens and prepares the SQLite database used by flow queues. +class DatabaseService { + static Database? _database; + + /// Opens the database and ensures [tableName] exists. + static Future init(String tableName) async { + final normalizedTableName = normalizeTableName(tableName); + + _database ??= await openDatabase( + join(await getDatabasesPath(), 'flow_queue.db'), + version: 1, + ); + + await _database!.execute(''' + CREATE TABLE IF NOT EXISTS $normalizedTableName ( + process_id TEXT PRIMARY KEY, + parent_process_id TEXT, + process_name TEXT NOT NULL, + state TEXT NOT NULL, + retry_count INTEGER DEFAULT 0, + priority INTEGER NOT NULL, + created_at INTEGER NOT NULL + ) + '''); + + return _database!; + } + + /// Returns a validated SQLite table identifier for [tableName]. + static String normalizeTableName(String tableName) { + final trimmed = tableName.trim(); + + if (trimmed.isEmpty) { + throw ArgumentError.value(tableName, 'tableName', 'Must not be empty.'); + } + + final validIdentifier = RegExp(r'^[A-Za-z_][A-Za-z0-9_]*$'); + + if (!validIdentifier.hasMatch(trimmed)) { + throw ArgumentError.value( + tableName, + 'tableName', + 'Must be a valid SQLite identifier containing only letters, numbers, and underscores, and must not start with a number.', + ); + } + + return trimmed; + } +} diff --git a/lib/database/queue_queries.dart b/lib/database/queue_queries.dart new file mode 100644 index 0000000..d2ceeed --- /dev/null +++ b/lib/database/queue_queries.dart @@ -0,0 +1,13 @@ +/// SQL query builders used by the queue database layer. +class QueueQueries { + /// Returns a query that fetches the highest-priority pending task using FIFO + /// ordering when priorities match. + static String fetchNextTask(String tableName) { + return ''' + SELECT * FROM $tableName + WHERE state = 'pending' + ORDER BY priority DESC, created_at ASC + LIMIT 1 + '''; + } +} diff --git a/lib/enums/queue_priority.dart b/lib/enums/queue_priority.dart new file mode 100644 index 0000000..61a7019 --- /dev/null +++ b/lib/enums/queue_priority.dart @@ -0,0 +1,11 @@ +/// Priority levels used when selecting the next pending task. +enum QueuePriority { + /// Lowest priority, selected after moderate and high priority tasks. + defaultPriority, + + /// Medium priority, selected after high priority tasks. + moderate, + + /// Highest priority, selected before moderate and default tasks. + high, +} diff --git a/lib/enums/queue_state.dart b/lib/enums/queue_state.dart new file mode 100644 index 0000000..ef4b923 --- /dev/null +++ b/lib/enums/queue_state.dart @@ -0,0 +1,14 @@ +/// The lifecycle state of a queued task. +enum QueueState { + /// The task is waiting to be executed. + pending, + + /// The task is currently being executed by the queue worker. + inProgress, + + /// The task completed successfully. + success, + + /// The task failed while executing. + failed, +} diff --git a/lib/flow_queue.dart b/lib/flow_queue.dart index 298576d..6ec59ff 100644 --- a/lib/flow_queue.dart +++ b/lib/flow_queue.dart @@ -1,5 +1,6 @@ -/// A Calculator. -class Calculator { - /// Returns [value] plus 1. - int addOne(int value) => value + 1; -} +library flow_queue; + +export 'core/flow_queue.dart'; +export 'enums/queue_priority.dart'; +export 'enums/queue_state.dart'; +export 'models/queue_task.dart'; diff --git a/lib/models/queue_task.dart b/lib/models/queue_task.dart new file mode 100644 index 0000000..f76ae56 --- /dev/null +++ b/lib/models/queue_task.dart @@ -0,0 +1,77 @@ +import '../enums/queue_priority.dart'; +import '../enums/queue_state.dart'; +import '../utils/priority_mapper.dart'; + +/// A persisted queue task record. +class QueueTask { + /// Unique identifier for this task. + final String processId; + + /// The task id that created this task via retry, if any. + final String? parentProcessId; + + /// Developer-provided task name. + final String processName; + + /// Current task state. + final QueueState state; + + /// Number of retry generations before this task. + final int retryCount; + + /// Task priority. + final QueuePriority priority; + + /// Creation timestamp in milliseconds since epoch. + final int createdAt; + + /// Creates a queue task model. + const QueueTask({ + required this.processId, + required this.parentProcessId, + required this.processName, + required this.state, + required this.retryCount, + required this.priority, + required this.createdAt, + }); + + /// Creates a [QueueTask] from a SQLite row. + factory QueueTask.fromMap(Map map) { + return QueueTask( + processId: map['process_id'].toString(), + parentProcessId: map['parent_process_id']?.toString(), + processName: map['process_name'].toString(), + state: _mapState(map['state'].toString()), + retryCount: map['retry_count'] as int, + priority: mapPriorityValue(map['priority'] as int), + createdAt: map['created_at'] as int, + ); + } + + /// Converts this task to a SQLite row map. + Map toMap() { + return { + 'process_id': processId, + 'parent_process_id': parentProcessId, + 'process_name': processName, + 'state': state.name, + 'retry_count': retryCount, + 'priority': getPriorityValue(priority), + 'created_at': createdAt, + }; + } + + static QueueState _mapState(String state) { + switch (state) { + case 'pending': + return QueueState.pending; + case 'inProgress': + return QueueState.inProgress; + case 'success': + return QueueState.success; + default: + return QueueState.failed; + } + } +} diff --git a/lib/services/id_service.dart b/lib/services/id_service.dart new file mode 100644 index 0000000..3de3652 --- /dev/null +++ b/lib/services/id_service.dart @@ -0,0 +1,11 @@ +import 'package:uuid/uuid.dart'; + +/// Generates unique ids for queue tasks. +class IdService { + static final Uuid _uuid = const Uuid(); + + /// Generates a v4 UUID string. + static String generate() { + return _uuid.v4(); + } +} diff --git a/lib/streams/queue_stream.dart b/lib/streams/queue_stream.dart new file mode 100644 index 0000000..1a1c93e --- /dev/null +++ b/lib/streams/queue_stream.dart @@ -0,0 +1,9 @@ +import 'dart:async'; + +import '../models/queue_task.dart'; + +/// Shared broadcast stream for queue state changes. +class QueueStream { + static final StreamController controller = + StreamController.broadcast(); +} diff --git a/lib/utils/priority_mapper.dart b/lib/utils/priority_mapper.dart new file mode 100644 index 0000000..cda715e --- /dev/null +++ b/lib/utils/priority_mapper.dart @@ -0,0 +1,25 @@ +import '../enums/queue_priority.dart'; + +/// Converts a [QueuePriority] into its persisted numeric value. +int getPriorityValue(QueuePriority priority) { + switch (priority) { + case QueuePriority.high: + return 3; + case QueuePriority.moderate: + return 2; + case QueuePriority.defaultPriority: + return 1; + } +} + +/// Converts a persisted priority value into a [QueuePriority]. +QueuePriority mapPriorityValue(int value) { + switch (value) { + case 3: + return QueuePriority.high; + case 2: + return QueuePriority.moderate; + default: + return QueuePriority.defaultPriority; + } +} diff --git a/pubspec.yaml b/pubspec.yaml index b589460..5b16fb6 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -10,6 +10,9 @@ environment: dependencies: flutter: sdk: flutter + sqflite: ^2.3.0 + path: ^1.9.0 + uuid: ^4.4.0 dev_dependencies: flutter_test: diff --git a/test/flow_queue_test.dart b/test/flow_queue_test.dart index 44fab55..58264f1 100644 --- a/test/flow_queue_test.dart +++ b/test/flow_queue_test.dart @@ -1,12 +1,46 @@ -import 'package:flutter_test/flutter_test.dart'; - import 'package:flow_queue/flow_queue.dart'; +import 'package:flutter_test/flutter_test.dart'; void main() { - test('adds one to input values', () { - final calculator = Calculator(); - expect(calculator.addOne(2), 3); - expect(calculator.addOne(-7), -6); - expect(calculator.addOne(0), 1); + test('QueueTask serializes to map', () { + final task = QueueTask( + processId: 'task-id', + parentProcessId: null, + processName: 'upload_post', + state: QueueState.pending, + retryCount: 0, + priority: QueuePriority.high, + createdAt: 123, + ); + + expect(task.toMap(), { + 'process_id': 'task-id', + 'parent_process_id': null, + 'process_name': 'upload_post', + 'state': 'pending', + 'retry_count': 0, + 'priority': 3, + 'created_at': 123, + }); + }); + + test('QueueTask deserializes from map', () { + final task = QueueTask.fromMap({ + 'process_id': 'retry-id', + 'parent_process_id': 'task-id', + 'process_name': 'upload_post', + 'state': 'failed', + 'retry_count': 1, + 'priority': 2, + 'created_at': 456, + }); + + expect(task.processId, 'retry-id'); + expect(task.parentProcessId, 'task-id'); + expect(task.processName, 'upload_post'); + expect(task.state, QueueState.failed); + expect(task.retryCount, 1); + expect(task.priority, QueuePriority.moderate); + expect(task.createdAt, 456); }); }