Skip to content
43 changes: 42 additions & 1 deletion docs/architecture/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,48 @@ In Kolibri, on the ``FacilityDataset`` model, we generate the certificate as a f
There's flexibility in the application layer for determining the validity of a root certificate, and it's specified on a per-profile basis. For the ``facilitydata`` profile, Kolibri leverages its ``auth`` models for this.


Streaming architecture
----------------------

Morango includes a streaming architecture for memory-efficient processing of sync data. This architecture is implemented in the ``morango.sync.stream`` module and provides a modular, ETL-like pipeline pattern for processing data records one-by-one, significantly reducing memory overhead compared to batch processing approaches.

The streaming architecture is built around several core concepts:

**Stream modules**
Abstract base classes that form the foundation of the streaming pipeline:

- ``Source``: The starting point of a pipeline that yields data items
- ``PipelineModule``: Transform-like modules that process data items
- ``Sink``: Terminal modules that consume data items without yielding further output
- ``ReaderModule``: Modules that can be connected to other modules via the ``pipe()`` method

**Pipeline composition**
Modules are connected using a fluent interface via the ``pipe()`` method, creating a directed flow of data:

.. code-block:: python

source.pipe(transform1).pipe(transform2).end(sink)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not at all necessary, but the thought of being able to construct the pipeline with pipe operators amused me!

source | transform1 | transform2 | sink


**Key pipeline modules**
Several specialized pipeline modules are provided:

- ``Transform``: Applies a 1:1 transformation to each item
- ``FlatMap``: Maps each item to zero or more output items
- ``Buffer``: Collects items into fixed-size chunks for batch operations
- ``Unbuffer``: Flattens chunks back into individual items

**Serialization pipeline**
The serialization process uses this streaming architecture through the ``serialize_into_store()`` function, which constructs a pipeline that:

1. Reads dirty app models from the database (``AppModelSource``)
2. Buffers records for efficient database lookups (``Buffer``)
3. Looks up corresponding store records (``StoreLookup``)
4. Updates store records with new data (``StoreUpdate``)
5. Buffers by model type for efficient bulk operations (``ModelPartitionBuffer``)
6. Writes changes to the database (``WriteSink``)

This streaming approach ensures that memory usage remains constant regardless of dataset size, making Morango suitable for large-scale deployments with limited resources.

Session controller, contexts, and operations
--------------------------------------------

Expand All @@ -142,4 +184,3 @@ A unidirectional sync has several stages: ``INITIALIZING``, ``SERIALIZING``, ``Q
.. image:: ./session-controller-seq.png

The list of operations for each stage are configured through Django settings. The configuration key for each stage follows the pattern ``MORANGO_%STAGE%_OPERATIONS``, so the list/tuple of operations for the ``QUEUING`` stage access the ``MORANGO_QUEUING_OPERATIONS`` configuration value. Built-in operations implement a callable ``BaseOperation`` class by overriding a ``handle`` method. The ``BaseOperation`` class supports raising an ``AssertionError`` to defer responsibility to the next operation.

3 changes: 1 addition & 2 deletions docs/syncing/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Process

Syncing is the actual exchange of data in a sync session. The general steps for syncing data are:

1. **Serialization** - serializing data that is associated with Django models in the Application layer, and storing it in JSON format in a record in the Store
1. **Serialization** - serializing data that is associated with Django models in the Application layer, and storing it in JSON format in a record in the Store. This process uses a streaming architecture that processes records one-by-one through a modular pipeline, ensuring constant memory usage regardless of dataset size.
2. **Queuing/Buffering** - storing serialized records and their modification history to a separate Buffers data structure
3. **Transfer/chunking of data** - the actual transfer of data over a request/response cycle in chunks of 500 records at a time
4. **Dequeuing** - merging the data received in the receiving buffers to the receiving store and record-max counter
Expand Down Expand Up @@ -70,4 +70,3 @@ For a push or pull sync lifecycle, the order of the fired signals would be as fo
7) Dequeuing started
8) Dequeuing completed
9) Session completed

4 changes: 4 additions & 0 deletions morango/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,10 @@ class RecordMaxCounter(AbstractCounter):

store_model = models.ForeignKey(Store, on_delete=models.CASCADE)

@property
def unique_key(self):
return f"{self.instance_id}:{self.store_model_id}"

class Meta:
unique_together = ("store_model", "instance_id")

Expand Down
10 changes: 10 additions & 0 deletions morango/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import inspect
import sys
from collections import OrderedDict
from typing import Generator

from django.db.models import QuerySet
from django.db.models.fields.related import ForeignKey

from morango.constants import transfer_stages
Expand Down Expand Up @@ -82,6 +84,14 @@ def get_models(self, profile):
self.check_models_ready(profile)
return list(self.profile_models.get(profile, {}).values())

def get_model_querysets(self, profile) -> Generator[QuerySet, None, None]:
"""
Method for future enhancement to iterate over model's and their querysets in a fashion
(particularly, an order) that is aware of FK dependencies.
"""
for model in self.get_models(profile):
yield model.syncing_objects.all()

def _insert_model_in_dependency_order(self, model, profile):
# When we add models to be synced, we need to make sure
# that models that depend on other models are synced AFTER
Expand Down
26 changes: 7 additions & 19 deletions morango/sync/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,35 @@
from morango.constants import transfer_statuses
from morango.registry import session_middleware
from morango.sync.operations import _deserialize_from_store
from morango.sync.operations import _serialize_into_store
from morango.sync.operations import OperationLogger
from morango.sync.stream.serialize import serialize_into_store
from morango.sync.utils import SyncSignalGroup
from morango.utils import _assert


logger = logging.getLogger(__name__)


def _self_referential_fk(klass_model):
"""
Return whether this model has a self ref FK, and the name for the field
"""
for f in klass_model._meta.concrete_fields:
if f.related_model:
if issubclass(klass_model, f.related_model):
return f.attname
return None


class MorangoProfileController(object):
def __init__(self, profile):
_assert(profile, "profile needs to be defined.")
self.profile = profile

def serialize_into_store(self, filter=None):
def serialize_into_store(self, sync_filter=None):
"""
Takes data from app layer and serializes the models into the store.
"""
with OperationLogger("Serializing records", "Serialization complete"):
_serialize_into_store(self.profile, filter=filter)
serialize_into_store(self.profile, sync_filter=sync_filter)

def deserialize_from_store(self, skip_erroring=False, filter=None):
def deserialize_from_store(self, skip_erroring=False, sync_filter=None):
"""
Takes data from the store and integrates into the application.
"""
with OperationLogger("Deserializing records", "Deserialization complete"):
# we first serialize to avoid deserialization merge conflicts
_serialize_into_store(self.profile, filter=filter)
serialize_into_store(self.profile, sync_filter=sync_filter)
_deserialize_from_store(
self.profile, filter=filter, skip_erroring=skip_erroring
self.profile, filter=sync_filter, skip_erroring=skip_erroring
)

def create_network_connection(self, base_url, **kwargs):
Expand Down Expand Up @@ -217,7 +205,7 @@ def proceed_to_and_wait_for(
if tries >= max_interval_tries:
sleep(max_interval)
else:
sleep(0.3 * (2 ** tries - 1))
sleep(0.3 * (2**tries - 1))
result = self.proceed_to(target_stage, context=context)
tries += 1
if callable(callback):
Expand Down
39 changes: 39 additions & 0 deletions morango/sync/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import logging
from contextlib import contextmanager

from django.db import connection
from django.db import transaction

from morango.sync.backends.utils import load_backend
from morango.sync.utils import lock_partitions


logger = logging.getLogger(__name__)

DBBackend = load_backend(connection)


@contextmanager
def begin_transaction(sync_filter, isolated=False, shared_lock=False):
"""
Starts a transaction, sets the transaction isolation level to repeatable read, and locks
affected partitions

:param sync_filter: The filter for filtering applicable records of the sync
:type sync_filter: morango.models.certificates.Filter|None
:param isolated: Whether to alter the transaction isolation to repeatable-read
:type isolated: bool
:param shared_lock: Whether the advisory lock should be exclusive or shared
:type shared_lock: bool
"""
if isolated:
# when isolation is requested, we modify the transaction isolation of the connection for the
# duration of the transaction
with DBBackend._set_transaction_repeatable_read():
with transaction.atomic(savepoint=False):
lock_partitions(DBBackend, sync_filter=sync_filter, shared=shared_lock)
yield
else:
with transaction.atomic():
lock_partitions(DBBackend, sync_filter=sync_filter, shared=shared_lock)
yield
Loading