Skip to content

Implement DeserializeTask and StoreModelSource for streaming deserialization #317

@bjester

Description

@bjester

Current behavior

The _deserialize_from_store function in morango/sync/operations.py handles deserialization in a monolithic function that mixes multiple concerns. The serialization side has already been migrated to a streaming architecture using Source > Transform > Sink modules in morango/sync/stream/serialize.py, with AppModelSource and SerializeTask as foundational components.

Desired behavior

The deserialization pipeline should have equivalent foundational components: a DeserializeTask carrier class that holds context as a store model flows through the pipeline, and a StoreModelSource that yields tasks for dirty store models matching a sync filter.

Deliverables

A new file morango/sync/stream/deserialize.py should contain:

DeserializeTask carrier class:

  • hold a reference to a Store model instance
  • provide a model property that resolves the syncable model class from the store's profile and model_name via syncable_models
  • track the deserialized app model once processed
  • track an FK cache for reuse during deserialization
  • track any errors encountered during processing
  • provide a has_errors boolean property

StoreModelSource pipeline source:

  • accept the same constructor parameters as AppModelSource: profile, sync_filter, dirty_only, and partition_order
  • iterate through sync filter partitions in the specified order (asc/desc), yielding Q conditions for each prefix
  • query Store models (not app models) filtered by profile, partition, and dirty_bit
  • exclude store records that already have a deserialization_error set
  • track seen store IDs to avoid yielding duplicates when partitions overlap
  • yield DeserializeTask objects for each matching store model
  • use .iterator() for memory-efficient streaming

Notes

  • StoreModelSource should mirror the structure of AppModelSource in morango/sync/stream/serialize.py - same constructor signature, same prefix_conditions() pattern, same duplicate-prevention approach
  • The source queries Store models (the serialized data), whereas AppModelSource queries application models - this is the key difference
  • Import Store from morango.models.core, Source from morango.sync.stream.core, syncable_models from morango.registry
  • Unit tests should be added for both classes
  • Tests should verify: partition ordering (asc/desc), dirty_bit filtering, duplicate prevention, deserialization_error exclusion
  • This is mostly new code-- the existing deserialization process should not be modified, although since the new source class will share some features with the serialization AppModelSource, it could be worthwhile to consolidate some of that into a shared base class

Value add

Provides the foundational building block for the streaming deserialization pipeline, matching the pattern already established for serialization. Enables modular, testable components instead of a monolithic function.

Possible tradeoffs

  • The deserialization_error="" filter is hardcoded - could be made configurable in future if there's a need to retry erroring records
  • The _seen set grows with the number of store models - should be acceptable for typical sync sizes but worth monitoring for very large datasets

AI Usage

This issue was created with AI assistance under my guidance. I reviewed and directed all content.

Metadata

Metadata

Assignees

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions