197 lines
7.8 KiB
Python
197 lines
7.8 KiB
Python
import datetime
|
|
import json
|
|
from django.apps import apps
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.models import Group
|
|
from django.core.management.base import BaseCommand, CommandError
|
|
from django.utils.termcolors import colorize
|
|
|
|
from dav_events.models import Event, EventStatus, OneClickAction
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = 'Populate database with test data.'
|
|
|
|
def _substitute_data_vars(self, data_dict, vars_dict):
|
|
for k in data_dict:
|
|
try:
|
|
data_dict[k] = data_dict[k].format(**vars_dict)
|
|
except AttributeError:
|
|
pass
|
|
return data_dict
|
|
|
|
def _get_groups_for_role(self, role_name):
|
|
settings = self._app_settings
|
|
var_name = 'groups_{}'.format(role_name)
|
|
group_names = getattr(settings, var_name)
|
|
return group_names
|
|
|
|
def _get_group_for_role(self, role_name):
|
|
group_names = self._get_groups_for_role(role_name)
|
|
group_name = group_names[0]
|
|
return group_name
|
|
|
|
def _grant_role_to_user(self, user, role_name):
|
|
group_name = self._get_group_for_role(role_name)
|
|
group = Group.objects.get(name=group_name)
|
|
user.groups.add(group)
|
|
self.stdout.write(self.style.SUCCESS(
|
|
'Added user \'{}\' to group \'{}\''.format(user, group)
|
|
))
|
|
|
|
def _create_group(self, group_name):
|
|
group, created = Group.objects.get_or_create(name=group_name)
|
|
if created:
|
|
self.stdout.write(self.style.SUCCESS('Created group \'{}\''.format(group)))
|
|
else:
|
|
self.stdout.write(self.style.WARNING('Recycled group \'{}\''.format(group)))
|
|
return group
|
|
|
|
def _create_groups(self):
|
|
roles = (
|
|
'manager_super',
|
|
'manager_w',
|
|
'manager_s',
|
|
'manager_m',
|
|
'manager_k',
|
|
'manager_b',
|
|
'publisher_print',
|
|
'publisher_web',
|
|
'office',
|
|
)
|
|
for role_name in roles:
|
|
group_names = self._get_groups_for_role(role_name)
|
|
for group_name in group_names:
|
|
self._create_group(group_name)
|
|
|
|
def _create_user(self, username, email, first_name, last_name, password=None, is_superuser=False):
|
|
if password is None:
|
|
password = email
|
|
|
|
user_model = get_user_model()
|
|
try:
|
|
user = user_model.objects.get(username=username)
|
|
self.stdout.write(self.style.WARNING('Recycled user \'{}\''.format(user)))
|
|
user.groups.clear()
|
|
except user_model.DoesNotExist:
|
|
user = user_model.objects.create_user(username=username,
|
|
email=email,
|
|
first_name=first_name,
|
|
last_name=last_name,
|
|
password=password)
|
|
self.stdout.write(self.style.SUCCESS('Created user \'{}\''.format(user)))
|
|
|
|
if is_superuser:
|
|
user.is_staff = True
|
|
user.is_superuser = True
|
|
user.save()
|
|
self.stdout.write(self.style.SUCCESS('Made user \'{}\' a super user'.format(user)))
|
|
|
|
return user
|
|
|
|
def _create_users(self, user_list, data_vars=None):
|
|
if data_vars is None:
|
|
data_vars = {}
|
|
|
|
for data_set in user_list:
|
|
if data_vars:
|
|
data_set = self._substitute_data_vars(data_set, data_vars)
|
|
|
|
if 'username' not in data_set:
|
|
data_set['username'] = data_set['email']
|
|
if 'roles' not in data_set:
|
|
data_set['roles'] = ()
|
|
if 'is_superuser' not in data_set:
|
|
data_set['is_superuser'] = False
|
|
if 'password' not in data_set:
|
|
data_set['password'] = None
|
|
|
|
user = self._create_user(username=data_set['username'],
|
|
email=data_set['email'],
|
|
first_name=data_set['first_name'],
|
|
last_name=data_set['last_name'],
|
|
password=data_set['password'],
|
|
is_superuser=data_set['is_superuser'])
|
|
|
|
for role_name in data_set['roles']:
|
|
self._grant_role_to_user(user, role_name)
|
|
|
|
def _create_event(self, event_data):
|
|
event = Event(**event_data)
|
|
event.save()
|
|
self.stdout.write(self.style.SUCCESS('Created event \'{}\''.format(event)))
|
|
return event
|
|
|
|
def _create_events(self, event_list, data_vars=None):
|
|
if data_vars is None:
|
|
data_vars = {}
|
|
|
|
for data_set in event_list:
|
|
if data_vars:
|
|
data_set = self._substitute_data_vars(data_set, data_vars)
|
|
|
|
for date_key in ('first_day', 'last_day', 'alt_first_day', 'alt_last_day', 'deadline'):
|
|
k = '{}_from_today'.format(date_key)
|
|
if k in data_set:
|
|
d = datetime.date.today() + datetime.timedelta(data_set[k])
|
|
data_set[date_key] = d
|
|
del data_set[k]
|
|
if 'pre_meeting_1_from_today' in data_set:
|
|
day = datetime.date.today() + datetime.timedelta(data_set['pre_meeting_1_from_today'])
|
|
data_set['pre_meeting_1'] = datetime.datetime.combine(day, datetime.time(19, 30))
|
|
del data_set['pre_meeting_1_from_today']
|
|
if 'pre_meeting_2_from_today' in data_set:
|
|
day = datetime.date.today() + datetime.timedelta(data_set['pre_meeting_2_from_today'])
|
|
data_set['pre_meeting_2'] = datetime.datetime.combine(day, datetime.time(19, 30))
|
|
del data_set['pre_meeting_2_from_today']
|
|
|
|
status_updates = []
|
|
if 'status_updates' in data_set:
|
|
status_updates = data_set['status_updates']
|
|
del data_set['status_updates']
|
|
|
|
event = self._create_event(data_set)
|
|
|
|
for status in status_updates:
|
|
event.workflow.update_status(status, event.owner)
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument('--purge', action='store_true', help='remove all data from the database beforehand')
|
|
parser.add_argument('--var', '-e', action='append', dest='data_vars', metavar='KEY=VALUE',
|
|
help='Give a key=value pair for variable substitution in the test data')
|
|
parser.add_argument('data_file', metavar='FILE.json', help='File containing the test data as JSON')
|
|
|
|
def handle(self, *args, **options):
|
|
app_config = apps.get_containing_app_config(__package__)
|
|
app_settings = app_config.settings
|
|
self._app_settings = app_settings
|
|
|
|
with open(options['data_file'], 'r') as f:
|
|
test_data = json.load(f)
|
|
|
|
data_vars = test_data['vars']
|
|
if options['data_vars'] is not None:
|
|
for pair in options['data_vars']:
|
|
key = pair.split('=', 1)[0]
|
|
value = pair.split('=', 1)[1]
|
|
data_vars[key] = value
|
|
|
|
if options['purge']:
|
|
self.stdout.write(self.style.NOTICE('Purge database...'))
|
|
OneClickAction.objects.all().delete()
|
|
Event.objects.all().delete()
|
|
EventStatus.objects.all().delete()
|
|
user_model = get_user_model()
|
|
user_model.objects.all().delete()
|
|
Group.objects.all().delete()
|
|
|
|
self.stdout.write(colorize('Creating groups...', fg='cyan', opts=('bold',)))
|
|
self._create_groups()
|
|
self.stdout.write(colorize('Creating users...', fg='cyan', opts=('bold',)))
|
|
self._create_users(test_data['users'], data_vars)
|
|
self.stdout.write(colorize('Creating events...', fg='cyan', opts=('bold',)))
|
|
self._create_events(test_data['events'], data_vars)
|
|
|
|
# raise CommandError('This is an error')
|
|
|