Source code for morango.models.core

import functools
import json
import logging
import uuid
from collections import defaultdict
from collections import namedtuple
from functools import reduce

from django.core import exceptions
from django.db import connection
from django.db import models
from django.db import router
from django.db import transaction
from django.db.models import F
from django.db.models import Func
from django.db.models import Max
from django.db.models import Q
from django.db.models import signals
from django.db.models import TextField
from django.db.models import Value
from django.db.models.deletion import Collector
from django.db.models.expressions import CombinedExpression
from django.db.models.fields.related import ForeignKey
from django.db.models.functions import Cast
from django.utils import timezone
from django.utils.functional import cached_property

from morango import proquint
from morango.constants import transfer_stages
from morango.constants import transfer_statuses
from morango.errors import InvalidMorangoSourceId
from morango.models.certificates import Certificate
from morango.models.certificates import Filter
from morango.models.fields.uuids import sha2_uuid
from morango.models.fields.uuids import UUIDField
from morango.models.fields.uuids import UUIDModelMixin
from morango.models.fsic_utils import remove_redundant_instance_counters
from morango.models.manager import SyncableModelManager
from morango.models.utils import get_0_4_system_parameters
from morango.models.utils import get_0_5_mac_address
from morango.models.utils import get_0_5_system_id
from morango.registry import syncable_models
from morango.utils import _assert
from morango.utils import SETTINGS

logger = logging.getLogger(__name__)


class DatabaseIDManager(models.Manager):
    """
    We override ``model.Manager`` in order to wrap creating a new database ID model within a transaction. With the
    creation of a new database ID model, we set all previously created models current flag to False.
    """

    def create(self, **kwargs):

        # do within transaction so we always have a current database ID
        with transaction.atomic():
            # set current flag to false for all database_id models
            DatabaseIDModel.objects.update(current=False)

            return super(DatabaseIDManager, self).create(**kwargs)


[docs] class DatabaseIDModel(UUIDModelMixin): """ Model to be used for tracking database ids. """ uuid_input_fields = "RANDOM" objects = DatabaseIDManager() current = models.BooleanField(default=True) date_generated = models.DateTimeField(default=timezone.now) initial_instance_id = models.CharField(max_length=32, blank=True)
[docs] def save(self, *args, **kwargs): # do within transaction so we always have a current database ID with transaction.atomic(): # set current flag to false for all database_id models if not self.id and self.current: DatabaseIDModel.objects.update(current=False) super(DatabaseIDModel, self).save(*args, **kwargs)
[docs] @classmethod def get_or_create_current_database_id(cls): with transaction.atomic(): try: return cls.objects.get(current=True) except cls.DoesNotExist: return cls.objects.create()
[docs] class InstanceIDModel(models.Model): """ ``InstanceIDModel`` is used to track what the current ID of this Morango instance is based on system properties. If system properties change, the ID used to track the morango instance also changes. During serialization phase, we associate the current instance ID, as well as its counter with all the records that were serialized at the time. """ _cached_instance = None uuid_input_fields = ( "platform", "hostname", "sysversion", "node_id", "database_id", "db_path", ) id = UUIDField(max_length=32, primary_key=True, editable=False) platform = models.TextField() hostname = models.TextField() sysversion = models.TextField() node_id = models.CharField(max_length=20, blank=True) database = models.ForeignKey(DatabaseIDModel, on_delete=models.CASCADE) counter = models.IntegerField(default=0) current = models.BooleanField(default=True) db_path = models.CharField(max_length=1000) system_id = models.CharField(max_length=100, blank=True) @property def instance_info(self): """ Getter to access custom instance info defined in settings :return: dict """ return SETTINGS.MORANGO_INSTANCE_INFO
[docs] @classmethod @transaction.atomic def get_or_create_current_instance(cls, clear_cache=False): """Get the instance model corresponding to the current system, or create a new one if the system is new or its properties have changed (e.g. new MAC address).""" if clear_cache: cls._cached_instance = None if cls._cached_instance: instance = cls._cached_instance # make sure we have the latest counter value and "current" flag try: instance.refresh_from_db(fields=["counter", "current"]) # only use cached instance if it's still marked as current, otherwise skip if instance.current: return cls._cached_instance, False except InstanceIDModel.DoesNotExist: # instance does not exist, so skip here so we create a new one pass with transaction.atomic(): # check if a matching legacy instance ID is already current, and don't mess with it kwargs = get_0_4_system_parameters( database_id=DatabaseIDModel.get_or_create_current_database_id().id ) try: instance = InstanceIDModel.objects.get(current=True, **kwargs) cls._cached_instance = instance return instance, False except InstanceIDModel.DoesNotExist: pass # calculate the new ID based on system ID and mac address kwargs["system_id"] = get_0_5_system_id() kwargs["node_id"] = get_0_5_mac_address() kwargs["id"] = sha2_uuid( kwargs["database_id"], kwargs["system_id"], kwargs["node_id"] ) kwargs["current"] = True # ensure we only ever have 1 current instance ID InstanceIDModel.objects.filter(current=True).exclude( id=kwargs["id"] ).update(current=False) # create the model, or get existing if one already exists with this ID instance, created = InstanceIDModel.objects.update_or_create( id=kwargs["id"], defaults=kwargs ) cls._cached_instance = instance return instance, created
[docs] @classmethod @transaction.atomic def get_current_instance_and_increment_counter(cls): instance, _ = cls.get_or_create_current_instance() cls.objects.filter(id=instance.id).update(counter=F("counter") + 1) instance.refresh_from_db(fields=["counter"]) return instance
[docs] def get_proquint(self): return proquint.from_int(int(self.id[:8], 16))
[docs] class SyncSession(models.Model): """ ``SyncSession`` holds metadata for a sync session which keeps track of initial settings and the current transfer happening for this sync session. """ id = UUIDField(primary_key=True) # track when the session started and the last time there was activity for this session start_timestamp = models.DateTimeField(default=timezone.now) last_activity_timestamp = models.DateTimeField(blank=True) active = models.BooleanField(default=True) # track whether this device is acting as the server for the sync session is_server = models.BooleanField(default=False) # track the certificates being used by each side for this session client_certificate = models.ForeignKey( Certificate, blank=True, null=True, related_name="syncsessions_client", on_delete=models.CASCADE ) server_certificate = models.ForeignKey( Certificate, blank=True, null=True, related_name="syncsessions_server", on_delete=models.CASCADE ) # track the morango profile this sync session is happening for profile = models.CharField(max_length=40) # information about the connection over which this sync session is happening connection_kind = models.CharField( max_length=10, choices=[("network", "Network"), ("disk", "Disk")] ) connection_path = models.CharField( max_length=1000 ) # file path if kind=disk, and base URL of server if kind=network # for network connections, keep track of the IPs on either end client_ip = models.CharField(max_length=100, blank=True) server_ip = models.CharField(max_length=100, blank=True) # track the instance IDs for convenient querying, as well as serialized copies of # the client and server instance model fields for debugging/tracking purposes client_instance_id = models.UUIDField(null=True, blank=True) client_instance_json = models.TextField(default="{}") server_instance_id = models.UUIDField(null=True, blank=True) server_instance_json = models.TextField(default="{}") # used to store other data we may need to know about this sync session extra_fields = models.TextField(default="{}") # system process ID for ensuring same sync session does not run in parallel process_id = models.IntegerField(blank=True, null=True) @cached_property def client_instance_data(self): return json.loads(self.client_instance_json) @cached_property def server_instance_data(self): return json.loads(self.server_instance_json)
[docs] class TransferSession(models.Model): """ ``TransferSession`` holds metadata that is related to a specific transfer (push/pull) session between 2 morango instances. """ id = UUIDField(primary_key=True) filter = ( models.TextField() ) # partition/filter to know what subset of data is to be synced push = models.BooleanField() # is session pushing or pulling data? active = models.BooleanField(default=True) # is this transfer session still active? records_transferred = models.IntegerField( default=0 ) # track how many records have already been transferred records_total = models.IntegerField( blank=True, null=True ) # total number of records to be synced across in this transfer bytes_sent = models.BigIntegerField(default=0, null=True, blank=True) bytes_received = models.BigIntegerField(default=0, null=True, blank=True) sync_session = models.ForeignKey(SyncSession, on_delete=models.CASCADE) # track when the transfer session started and the last time there was activity on it start_timestamp = models.DateTimeField(default=timezone.now) last_activity_timestamp = models.DateTimeField(blank=True) # we keep track of FSICs for both client and server client_fsic = models.TextField(blank=True, default="{}") server_fsic = models.TextField(blank=True, default="{}") # stages and stage status of transfer session transfer_stage = models.CharField( max_length=20, choices=transfer_stages.CHOICES, blank=True, null=True, ) transfer_stage_status = models.CharField( max_length=20, choices=transfer_statuses.CHOICES, blank=True, null=True, ) @property def pull(self): """Getter for `not push` condition, which adds complexity in conditional statements""" return not self.push
[docs] def get_filter(self): return Filter(self.filter)
[docs] def update_state(self, stage=None, stage_status=None): """ :type stage: morango.constants.transfer_stages.*|None :type stage_status: morango.constants.transfer_statuses.*|None """ if stage is not None: if self.transfer_stage and transfer_stages.stage( self.transfer_stage ) > transfer_stages.stage(stage): raise ValueError( "Update stage is behind current stage | current={}, new={}".format( self.transfer_stage, stage ) ) self.transfer_stage = stage if stage_status is not None: self.transfer_stage_status = stage_status if stage is not None or stage_status is not None: self.last_activity_timestamp = timezone.now() self.save() self.sync_session.last_activity_timestamp = timezone.now() self.sync_session.save()
[docs] def delete_buffers(self): """ Deletes `Buffer` and `RecordMaxCounterBuffer` model records by executing SQL directly against the database for better performance """ with connection.cursor() as cursor: cursor.execute( "DELETE FROM morango_buffer WHERE transfer_session_id = %s", (self.id,) ) cursor.execute( "DELETE FROM morango_recordmaxcounterbuffer WHERE transfer_session_id = %s", (self.id,), )
[docs] def get_touched_record_ids_for_model(self, model): if isinstance(model, SyncableModel) or ( isinstance(model, type) and issubclass(model, SyncableModel) ): model = model.morango_model_name _assert(isinstance(model, str), "Model must resolve to string") return Store.objects.filter( model_name=model, last_transfer_session_id=self.id ).values_list("id", flat=True)
[docs] class DeletedModels(models.Model): """ ``DeletedModels`` helps us keep track of models that are deleted prior to serialization. """ id = UUIDField(primary_key=True) profile = models.CharField(max_length=40)
[docs] class HardDeletedModels(models.Model): """ ``HardDeletedModels`` helps us keep track of models where all their data must be purged (`serialized` is nullified). """ id = UUIDField(primary_key=True) profile = models.CharField(max_length=40)
class AbstractStore(models.Model): """ Base abstract model for storing serialized data. Inherited by both ``Store`` and ``Buffer``. """ profile = models.CharField(max_length=40) serialized = models.TextField(blank=True) deleted = models.BooleanField(default=False) # flag to let other devices know to purge this data hard_deleted = models.BooleanField(default=False) # ID of last InstanceIDModel and its corresponding counter at time of serialization last_saved_instance = UUIDField() last_saved_counter = models.IntegerField() # fields used to compute UUIDs, filter data, and load data into the correct app models partition = models.TextField() source_id = models.CharField(max_length=96) model_name = models.CharField(max_length=40) # conflicting data that needs merge conflict resolution conflicting_serialized_data = models.TextField(blank=True) _self_ref_fk = models.CharField(max_length=32, blank=True) class Meta: abstract = True class StoreQueryset(models.QuerySet): def char_ids_list(self): return ( self.annotate(id_cast=Cast("id", TextField())) # remove dashes from char uuid .annotate( fixed_id=Func(F("id_cast"), Value("-"), Value(""), function="replace", output_field=TextField()) ) # return as list .values_list("fixed_id", flat=True) ) class StoreManager(models.Manager): def get_queryset(self): return StoreQueryset(self.model, using=self._db)
[docs] class Store(AbstractStore): """ ``Store`` is the concrete model where serialized data is persisted, along with metadata about counters and history. """ id = UUIDField(primary_key=True) # used to know which store records need to be deserialized into the app layer models dirty_bit = models.BooleanField(default=False) deserialization_error = models.TextField(blank=True) last_transfer_session_id = UUIDField( blank=True, null=True, default=None, db_index=True ) objects = StoreManager() class Meta: indexes = [ models.Index(fields=["partition"], name="idx_morango_store_partition"), ] def _deserialize_store_model(self, fk_cache, defer_fks=False): # noqa: C901 """ When deserializing a store model, we look at the deleted flags to know if we should delete the app model. Upon loading the app model in memory we validate the app models fields, if any errors occurs we follow foreign key relationships to see if the related model has been deleted to propagate that deletion to the target app model. We return: None => if the model was deleted successfully model => if the model validates successfully """ deferred_fks = {} klass_model = syncable_models.get_model(self.profile, self.model_name) # if store model marked as deleted, attempt to delete in app layer if self.deleted: # Don't differentiate between deletion and hard deletion here, # as we don't want to add additional tracking for models in either case, # just to actually delete them. # Import here to avoid circular import, as the utils module # imports core models. from morango.sync.utils import mute_signals with mute_signals(signals.post_delete): klass_model.objects.filter(id=self.id).delete() return None, deferred_fks else: # load model into memory app_model = klass_model.deserialize(json.loads(self.serialized)) app_model._morango_source_id = self.source_id app_model._morango_partition = self.partition app_model._morango_dirty_bit = False try: # validate and return the model if defer_fks: deferred_fks = app_model.deferred_clean_fields() else: app_model.cached_clean_fields(fk_cache) return app_model, deferred_fks except (exceptions.ValidationError, exceptions.ObjectDoesNotExist) as e: logger.warning( "Error deserializing instance of {model} with id {id}: {error}".format( model=klass_model.__name__, id=app_model.id, error=e ) ) if not defer_fks and isinstance(e, exceptions.ObjectDoesNotExist): # check FKs in store to see if any of those models were deleted or hard_deleted to propagate to this model fk_ids = [ getattr(app_model, field.attname) for field in app_model._meta.fields if isinstance(field, ForeignKey) ] for fk_id in fk_ids: try: st_model = Store.objects.get(id=fk_id) if st_model.deleted: # if hard deleted, propagate to store model if st_model.hard_deleted: app_model._update_hard_deleted_models() app_model._update_deleted_models() return None, {} except Store.DoesNotExist: pass # if we got here, it means the validation error wasn't handled by propagating deletion, so re-raise it raise e
[docs] class Buffer(AbstractStore): """ ``Buffer`` is where records from the internal store are queued up temporarily, before being sent to another morango instance, or stored while being received from another instance, before dequeuing into the local store. """ transfer_session = models.ForeignKey(TransferSession, on_delete=models.CASCADE) model_uuid = UUIDField() class Meta: unique_together = ("transfer_session", "model_uuid")
[docs] def rmcb_list(self): return RecordMaxCounterBuffer.objects.filter( model_uuid=self.model_uuid, transfer_session_id=self.transfer_session_id )
class AbstractCounter(models.Model): """ Abstract class which shares fields across multiple counter models. """ # the UUID of the morango instance for which we're tracking the counter instance_id = UUIDField() # the counter of the morango instance at the time of serialization or merge conflict resolution counter = models.IntegerField() class Meta: abstract = True class ValueStartsWithField(CombinedExpression): """ Django expression that's essentially a `startswith` comparison but comparing that a parameter value starts with a table field. This also prevents Django from adding unnecessary SQL for the expression """ def __init__(self, value, field): """ {value} LIKE {field} || '%' :param value: A str of the value LIKE field :param field: A str of the field name comparing with """ # we don't use `Concat` for appending the `%` because it also adds unnecessary SQL super(ValueStartsWithField, self).__init__( Value(value, output_field=models.CharField()), "LIKE", CombinedExpression( F(field), "||", Value("%", output_field=models.CharField()) ), output_field=models.BooleanField(), )
[docs] class DatabaseMaxCounter(AbstractCounter): """ ``DatabaseMaxCounter`` is used to keep track of what data this database already has across all instances for a particular partition prefix. Whenever 2 morango instances sync with each other we keep track of those partition prefixes from the filters, as well as the maximum counter we received for each instance during the sync session. """ partition = models.CharField(max_length=128, default="") class Meta: unique_together = ("instance_id", "partition")
[docs] @classmethod @transaction.atomic def update_fsics(cls, fsics, sync_filter, v2_format=False): internal_fsic = DatabaseMaxCounter.calculate_filter_specific_instance_counters( sync_filter, v2_format=v2_format ) if v2_format: internal_fsic = internal_fsic["sub"] fsics = fsics["sub"] for part, insts in fsics.items(): for inst, counter in insts.items(): if internal_fsic.get("part", {}).get(inst, 0) < counter: DatabaseMaxCounter.objects.update_or_create( instance_id=inst, partition=part, defaults={"counter": counter}, ) else: updated_fsic = {} for key, value in fsics.items(): if key in internal_fsic: # if same instance id, update fsic with larger value if fsics[key] > internal_fsic[key]: updated_fsic[key] = fsics[key] else: # if instance id is not present, add it to updated fsics updated_fsic[key] = fsics[key] # load database max counters for (key, value) in updated_fsic.items(): for f in sync_filter: DatabaseMaxCounter.objects.update_or_create( instance_id=key, partition=f, defaults={"counter": value} )
[docs] @classmethod def get_instance_counters_for_partitions(cls, partitions, is_producer=False): queryset = cls.objects.filter(partition__in=partitions) # filter out instance_ids that are no longer relevant if settings don't prevent it # because these queries can be somewhat intensive if not SETTINGS.MORANGO_DISABLE_FSIC_REDUCTION: if is_producer: # for the producing side, we only need to use instance_ids # that are still on Store records matching_instances = Store.objects.filter( partition__startswith=models.OuterRef("partition"), last_saved_instance=models.OuterRef("instance_id"), ).values("last_saved_instance") else: # for the receiving side, we include all instances # that still exist in the RecordMaxCounters matching_instances = RecordMaxCounter.objects.filter( store_model__partition__startswith=models.OuterRef("partition"), instance_id=models.OuterRef("instance_id"), ).values("instance_id") # manually add EXISTS clause to avoid Django bug which puts `= TRUE` comparison # that kills postgres performance exists = models.Exists(matching_instances).resolve_expression( query=queryset.query, allow_joins=True, reuse=None ) queryset.query.where.add(exists, "AND") queryset = queryset.values_list("partition", "instance_id", "counter") counters = {} for partition, instance_id, counter in queryset: if partition not in counters: counters[partition] = {} counters[partition].update({instance_id: counter}) return counters
[docs] @classmethod def calculate_filter_specific_instance_counters( cls, filters, is_producer=False, v2_format=False, ): if v2_format: queryset = cls.objects.all() # get the DMC records with partitions that fall under the filter prefixes sub_condition = functools.reduce( lambda x, y: x | y, [Q(partition__startswith=prefix) for prefix in filters], ) sub_partitions = set( queryset.filter(sub_condition) .values_list("partition", flat=True) .distinct() ) # get the DMC records with partitions that are prefixes of the filters super_partitions = set() for filt in filters: qs = ( queryset.annotate( filter_matches=ValueStartsWithField(filt, "partition") ) .filter(filter_matches=True) .exclude(partition=filt) ) super_partitions.update( qs.values_list("partition", flat=True).distinct() ) # get the instance counters for the partitions, also filtering out old unnecessary instance_ids super_fsics = cls.get_instance_counters_for_partitions( super_partitions, is_producer=is_producer ) sub_fsics = cls.get_instance_counters_for_partitions( sub_partitions, is_producer=is_producer ) raw_fsic = { "super": super_fsics, "sub": sub_fsics, } # remove instance counters on partitions that are redundant to counters on super partitions remove_redundant_instance_counters(raw_fsic) return raw_fsic else: queryset = cls.objects.all() per_filter_max = [] for filt in filters: # {filt} LIKE partition || '%' qs = queryset.annotate( filter_matches=ValueStartsWithField(filt, "partition") ) qs = qs.filter(filter_matches=True) filt_maxes = qs.values("instance_id").annotate(maxval=Max("counter")) per_filter_max.append( {dmc["instance_id"]: dmc["maxval"] for dmc in filt_maxes} ) instance_id_lists = [maxes.keys() for maxes in per_filter_max] all_instance_ids = reduce(set.union, instance_id_lists, set()) if is_producer: # when we're sending, we want to make sure we include everything result = { instance_id: max([d.get(instance_id, 0) for d in per_filter_max]) for instance_id in all_instance_ids } else: # when we're receiving, we don't want to overpromise on what we have result = { instance_id: min([d.get(instance_id, 0) for d in per_filter_max]) for instance_id in reduce( set.intersection, instance_id_lists, all_instance_ids ) } return result
[docs] class RecordMaxCounter(AbstractCounter): """ ``RecordMaxCounter`` keeps track of the maximum counter each serialized record has been saved at, for each instance that has modified it. This is used to determine fast-forwards and merge conflicts during the sync process. """ store_model = models.ForeignKey(Store, on_delete=models.CASCADE) class Meta: unique_together = ("store_model", "instance_id")
[docs] class RecordMaxCounterBuffer(AbstractCounter): """ ``RecordMaxCounterBuffer`` is where combinations of instance ID and counters (from ``RecordMaxCounter``) are stored temporarily, until they are sent or received by another morango instance. """ transfer_session = models.ForeignKey(TransferSession, on_delete=models.CASCADE) model_uuid = UUIDField(db_index=True)
ForeignKeyReference = namedtuple( "ForeignKeyReference", ["from_field", "from_pk", "to_pk"] )
[docs] class SyncableModel(UUIDModelMixin): """ ``SyncableModel`` is the base model class for syncing. Other models inherit from this class if they want to make their data syncable across devices. """ # constant value to insert into partition strings in place of current model's ID, as needed (to avoid circularity) ID_PLACEHOLDER = "${id}" _morango_internal_fields_not_to_serialize = ("_morango_dirty_bit",) morango_model_dependencies = () morango_fields_not_to_serialize = () morango_profile = None # morango specific field used for tracking model changes _morango_dirty_bit = models.BooleanField(default=True, editable=False) # morango specific field used to store random uuid or unique together fields _morango_source_id = models.CharField(max_length=96, editable=False) # morango specific field used to store the partition on the model _morango_partition = models.CharField(max_length=128, editable=False) objects = SyncableModelManager()
[docs] class Meta: abstract = True
def _update_deleted_models(self): DeletedModels.objects.update_or_create( defaults={"id": self.id, "profile": self.morango_profile}, id=self.id ) def _update_hard_deleted_models(self): HardDeletedModels.objects.update_or_create( defaults={"id": self.id, "profile": self.morango_profile}, id=self.id )
[docs] def save(self, update_dirty_bit_to=True, *args, **kwargs): if update_dirty_bit_to is None: pass # don't do anything with the dirty bit elif update_dirty_bit_to: self._morango_dirty_bit = True elif not update_dirty_bit_to: self._morango_dirty_bit = False super(SyncableModel, self).save(*args, **kwargs)
[docs] def delete( self, using=None, keep_parents=False, hard_delete=False, *args, **kwargs ): using = using or router.db_for_write(self.__class__, instance=self) _assert( self._get_pk_val() is not None, "%s object can't be deleted because its %s attribute is set to None." % (self._meta.object_name, self._meta.pk.attname), ) collector = Collector(using=using) collector.collect([self], keep_parents=keep_parents) with transaction.atomic(): if hard_delete: # set hard deletion for all related models for model, instances in collector.data.items(): if issubclass(model, SyncableModel): for obj in instances: obj._update_hard_deleted_models() return collector.delete()
[docs] def cached_clean_fields(self, fk_lookup_cache): """ Immediately validates all fields, but uses a cache for foreign key (FK) lookups to reduce repeated queries for many records with the same FK :param fk_lookup_cache: A dictionary to use as a cache to prevent querying the database if a FK exists in the cache, having already been validated """ excluded_fields = [] fk_fields = [ field for field in self._meta.fields if isinstance(field, models.ForeignKey) ] for f in fk_fields: raw_value = getattr(self, f.attname) key = "{id}_{db_table}".format( db_table=f.related_model._meta.db_table, id=raw_value ) try: fk_lookup_cache[key] excluded_fields.append(f.name) except KeyError: try: f.validate(raw_value, self) except exceptions.ValidationError: pass else: fk_lookup_cache[key] = 1 excluded_fields.append(f.name) self.clean_fields(exclude=excluded_fields) # after cleaning, we can confidently set ourselves in the fk_lookup_cache self_key = "{id}_{db_table}".format( db_table=self._meta.db_table, id=self.id, ) fk_lookup_cache[self_key] = 1
[docs] def deferred_clean_fields(self): """ Calls `.clean_fields()` but excludes all foreign key fields and instead returns them as a dictionary for deferred batch processing :return: A dictionary containing lists of `ForeignKeyReference`s keyed by the name of the model being referenced by the FK """ excluded_fields = [] deferred_fks = defaultdict(list) for field in self._meta.fields: if not isinstance(field, models.ForeignKey): continue # by not excluding the field if it's null, the default validation logic will apply # and should raise a ValidationError if the FK field is not nullable if getattr(self, field.attname) is None: continue excluded_fields.append(field.name) deferred_fks[field.related_model._meta.verbose_name].append( ForeignKeyReference( from_field=field.attname, from_pk=self.pk, to_pk=getattr(self, field.attname), ) ) self.clean_fields(exclude=excluded_fields) return deferred_fks
[docs] def serialize(self): """All concrete fields of the ``SyncableModel`` subclass, except for those specifically blacklisted, are returned in a dict.""" # NOTE: code adapted from https://github.com/django/django/blob/master/django/forms/models.py#L75 opts = self._meta data = {} for f in opts.concrete_fields: if f.attname in self.morango_fields_not_to_serialize: continue if f.attname in self._morango_internal_fields_not_to_serialize: continue if getattr(f, "morango_serialize_to_string", False): data[f.attname] = f.value_to_string(self) else: data[f.attname] = f.value_from_object(self) return data
[docs] @classmethod def deserialize(cls, dict_model): """Returns an unsaved class object based on the valid properties passed in.""" kwargs = {} for f in cls._meta.concrete_fields: if f.attname in dict_model: if getattr(f, "morango_serialize_to_string", False): kwargs[f.attname] = f.to_python(dict_model[f.attname]) else: kwargs[f.attname] = dict_model[f.attname] return cls(**kwargs)
[docs] @classmethod def merge_conflict(cls, current, push): return push
[docs] def calculate_source_id(self): """Should return a string that uniquely defines the model instance or `None` for a random uuid.""" raise NotImplementedError( "You must define a 'calculate_source_id' method on models that inherit from SyncableModel." )
[docs] def calculate_partition(self): """Should return a string specifying this model instance's partition, using `self.ID_PLACEHOLDER` in place of its own ID, if needed.""" raise NotImplementedError( "You must define a 'calculate_partition' method on models that inherit from SyncableModel." )
[docs] @staticmethod def compute_namespaced_id(partition_value, source_id_value, model_name): return sha2_uuid(partition_value, source_id_value, model_name)
[docs] def calculate_uuid(self): if not self._morango_source_id: self._morango_source_id = self.calculate_source_id() if self._morango_source_id is None: self._morango_source_id = uuid.uuid4().hex elif self._morango_source_id == "": # undefined behavior, which due to dynamic nature of calculating it, this could be an unintentional bug # so we raise an error to strictly enforce this raise InvalidMorangoSourceId( "{}.calculate_source_id() returned empty string".format( self.__class__.__name__ ) ) namespaced_id = self.compute_namespaced_id( self.calculate_partition(), self._morango_source_id, self.morango_model_name ) self._morango_partition = self.calculate_partition().replace( self.ID_PLACEHOLDER, namespaced_id ) return namespaced_id