177 lines
5.9 KiB
Python
177 lines
5.9 KiB
Python
import logging
|
|
from django import forms
|
|
from django.apps import apps
|
|
from django.core.exceptions import ImproperlyConfigured
|
|
from django.db.models.manager import Manager
|
|
|
|
from .. import converters
|
|
|
|
logger = logging.getLogger(__name__)
|
|
app_config = apps.get_containing_app_config(__package__)
|
|
|
|
|
|
class ChainedForm(forms.Form):
|
|
_form_title = None
|
|
_initial_form_name = None
|
|
_next_form_name = None
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self._session_data = dict()
|
|
self._request = kwargs.pop('request', None)
|
|
self.load_session_data()
|
|
|
|
if 'instance' in kwargs:
|
|
if hasattr(self, 'load_from_instance'):
|
|
self._session_data.update(self.load_from_instance(kwargs.pop('instance')))
|
|
else:
|
|
raise TypeError('Keyword argument \'instance\' requires method \'load_from_instance\'')
|
|
|
|
if 'initial' not in kwargs:
|
|
kwargs['initial'] = self._session_data
|
|
else:
|
|
kwargs['initial'].update(self._session_data)
|
|
|
|
super(ChainedForm, self).__init__(*args, **kwargs)
|
|
|
|
self.proceed_session_data(self._session_data)
|
|
|
|
def _serialize_value(self, value):
|
|
return converters.Iso8601Serializer.serialize(value, ignore_unsupported_input=True)
|
|
|
|
def _deserialize_value(self, value):
|
|
return converters.Iso8601Serializer.deserialize(value, ignore_unsupported_input=True)
|
|
|
|
def _post_clean(self):
|
|
self._session_data.update(self.cleaned_data)
|
|
self.save_session_data()
|
|
|
|
@property
|
|
def form_name(self):
|
|
return self.__class__.__name__
|
|
|
|
@property
|
|
def form_title(self):
|
|
return self.__class__.get_form_title()
|
|
|
|
@property
|
|
def initial_form_name(self):
|
|
return self.__class__.get_initial_form_name()
|
|
|
|
@property
|
|
def next_form_name(self):
|
|
return self.__class__.get_next_form_name()
|
|
|
|
@property
|
|
def session_data(self):
|
|
return self._session_data
|
|
|
|
@classmethod
|
|
def get_form_name(cls):
|
|
return cls.__name__
|
|
|
|
@classmethod
|
|
def get_form_title(cls):
|
|
if cls._form_title is not None:
|
|
return cls._form_title
|
|
n = cls.get_form_name()
|
|
if n.endswith('Form'):
|
|
n = n[:-len('Form')]
|
|
return n
|
|
|
|
@classmethod
|
|
def get_initial_form_name(cls):
|
|
return cls._initial_form_name or cls.__name__
|
|
|
|
@classmethod
|
|
def get_next_form_name(cls):
|
|
return cls._next_form_name
|
|
|
|
def get_initial_for_field(self, field, field_name):
|
|
value = super(ChainedForm, self).get_initial_for_field(field, field_name)
|
|
if value is None:
|
|
form_name = self.__class__.__name__
|
|
form_initials = app_config.settings.form_initials
|
|
if form_name in form_initials and field_name in form_initials[form_name]:
|
|
value = app_config.settings.form_initials[form_name][field_name].get_value(self._session_data)
|
|
return value
|
|
|
|
def load_session_data(self, data=None, request=None):
|
|
if data is None:
|
|
if request is None:
|
|
if self._request is None:
|
|
logger.warning('ChainedForm.load_session_data(): Cannot access request.')
|
|
return
|
|
request = self._request
|
|
if not hasattr(request, 'session'):
|
|
logger.warning('ChainedForm.load_session_data(): Cannot access session.')
|
|
return
|
|
session_var_name = '{}_chained_form_session_data'.format(request.resolver_match.url_name)
|
|
data = request.session.get(session_var_name, dict())
|
|
for k in data:
|
|
self._session_data[k] = self._deserialize_value(data[k])
|
|
|
|
def save_session_data(self, request=None):
|
|
if request is None:
|
|
if self._request is None:
|
|
logger.warning('ChainedForm.save_session_data(): Cannot access request.')
|
|
return
|
|
request = self._request
|
|
if not hasattr(request, 'session'):
|
|
logger.warning('ChainedForm.save_session_data(): Cannot access session.')
|
|
return
|
|
|
|
session_var_name = '{}_chained_form_session_data'.format(request.resolver_match.url_name)
|
|
session_data = dict()
|
|
for k in self._session_data:
|
|
session_data[k] = self._serialize_value(self._session_data[k])
|
|
request.session[session_var_name] = session_data
|
|
return session_data
|
|
|
|
def proceed_session_data(self, session_data):
|
|
pass
|
|
|
|
def flush_session_data(self):
|
|
if self._request is not None and hasattr(self._request, 'session'):
|
|
session_var_name = '{}_chained_form_session_data'.format(self._request.resolver_match.url_name)
|
|
if session_var_name in self._request.session:
|
|
del self._request.session[session_var_name]
|
|
self._session_data = dict()
|
|
|
|
|
|
class ModelMixin(object):
|
|
_model = None
|
|
|
|
def _get_model(self):
|
|
if not self._model:
|
|
raise ImproperlyConfigured('{cls} is missing a Model.'
|
|
' Define {cls}._model'.format(cls=self.__class__.__name__))
|
|
return self._model
|
|
|
|
def _get_instance_kwargs(self):
|
|
return self.cleaned_data
|
|
|
|
def get_instance(self):
|
|
model = self._get_model()
|
|
kwargs1 = self._get_instance_kwargs()
|
|
kwargs2 = dict()
|
|
for attr in kwargs1:
|
|
if hasattr(model, attr):
|
|
kwargs2[attr] = kwargs1[attr]
|
|
return model(**kwargs2)
|
|
|
|
def load_from_instance(self, instance):
|
|
model = self._get_model()
|
|
if not isinstance(instance, model):
|
|
raise TypeError('Expected %s' % model.__class__.__name__)
|
|
data = {}
|
|
for field in instance._meta.get_fields():
|
|
v = getattr(instance, field.name)
|
|
if v is None:
|
|
continue
|
|
if isinstance(v, Manager):
|
|
continue
|
|
data[field.name] = getattr(instance, field.name)
|
|
self.is_bound = True
|
|
self.data = data
|
|
return data
|