Current behavior
The _deserialize_from_store function in morango/sync/operations.py handles deserialization in a monolithic function that mixes multiple concerns. The serialization side has already been migrated to a streaming architecture using Source > Transform > Sink modules in morango/sync/stream/serialize.py, with AppModelSource and SerializeTask as foundational components.
Desired behavior
The deserialization pipeline should have equivalent foundational components: a DeserializeTask carrier class that holds context as a store model flows through the pipeline, and a StoreModelSource that yields tasks for dirty store models matching a sync filter.
Deliverables
A new file morango/sync/stream/deserialize.py should contain:
DeserializeTask carrier class:
- hold a reference to a
Store model instance
- provide a
model property that resolves the syncable model class from the store's profile and model_name via syncable_models
- track the deserialized app model once processed
- track an FK cache for reuse during deserialization
- track any errors encountered during processing
- provide a
has_errors boolean property
StoreModelSource pipeline source:
- accept the same constructor parameters as
AppModelSource: profile, sync_filter, dirty_only, and partition_order
- iterate through sync filter partitions in the specified order (asc/desc), yielding Q conditions for each prefix
- query
Store models (not app models) filtered by profile, partition, and dirty_bit
- exclude store records that already have a
deserialization_error set
- track seen store IDs to avoid yielding duplicates when partitions overlap
- yield
DeserializeTask objects for each matching store model
- use
.iterator() for memory-efficient streaming
Notes
StoreModelSource should mirror the structure of AppModelSource in morango/sync/stream/serialize.py - same constructor signature, same prefix_conditions() pattern, same duplicate-prevention approach
- The source queries
Store models (the serialized data), whereas AppModelSource queries application models - this is the key difference
- Import
Store from morango.models.core, Source from morango.sync.stream.core, syncable_models from morango.registry
- Unit tests should be added for both classes
- Tests should verify: partition ordering (asc/desc), dirty_bit filtering, duplicate prevention, deserialization_error exclusion
- This is mostly new code-- the existing deserialization process should not be modified, although since the new source class will share some features with the serialization
AppModelSource, it could be worthwhile to consolidate some of that into a shared base class
Value add
Provides the foundational building block for the streaming deserialization pipeline, matching the pattern already established for serialization. Enables modular, testable components instead of a monolithic function.
Possible tradeoffs
- The
deserialization_error="" filter is hardcoded - could be made configurable in future if there's a need to retry erroring records
- The
_seen set grows with the number of store models - should be acceptable for typical sync sizes but worth monitoring for very large datasets
AI Usage
This issue was created with AI assistance under my guidance. I reviewed and directed all content.
Current behavior
The
_deserialize_from_storefunction inmorango/sync/operations.pyhandles deserialization in a monolithic function that mixes multiple concerns. The serialization side has already been migrated to a streaming architecture usingSource > Transform > Sinkmodules inmorango/sync/stream/serialize.py, withAppModelSourceandSerializeTaskas foundational components.Desired behavior
The deserialization pipeline should have equivalent foundational components: a
DeserializeTaskcarrier class that holds context as a store model flows through the pipeline, and aStoreModelSourcethat yields tasks for dirty store models matching a sync filter.Deliverables
A new file
morango/sync/stream/deserialize.pyshould contain:DeserializeTask carrier class:
Storemodel instancemodelproperty that resolves the syncable model class from the store's profile and model_name viasyncable_modelshas_errorsboolean propertyStoreModelSource pipeline source:
AppModelSource:profile,sync_filter,dirty_only, andpartition_orderStoremodels (not app models) filtered by profile, partition, and dirty_bitdeserialization_errorsetDeserializeTaskobjects for each matching store model.iterator()for memory-efficient streamingNotes
StoreModelSourceshould mirror the structure ofAppModelSourceinmorango/sync/stream/serialize.py- same constructor signature, sameprefix_conditions()pattern, same duplicate-prevention approachStoremodels (the serialized data), whereasAppModelSourcequeries application models - this is the key differenceStorefrommorango.models.core,Sourcefrommorango.sync.stream.core,syncable_modelsfrommorango.registryAppModelSource, it could be worthwhile to consolidate some of that into a shared base classValue add
Provides the foundational building block for the streaming deserialization pipeline, matching the pattern already established for serialization. Enables modular, testable components instead of a monolithic function.
Possible tradeoffs
deserialization_error=""filter is hardcoded - could be made configurable in future if there's a need to retry erroring records_seenset grows with the number of store models - should be acceptable for typical sync sizes but worth monitoring for very large datasetsAI Usage
This issue was created with AI assistance under my guidance. I reviewed and directed all content.