import datetime import json from django.apps import apps from django.contrib.auth import get_user_model from django.contrib.auth.models import Group from django.core.management.base import BaseCommand, CommandError from django.utils.termcolors import colorize from dav_events.models import Event, EventStatus, OneClickAction class Command(BaseCommand): help = 'Populate database with test data.' def _substitute_data_vars(self, data_dict, vars_dict): for k in data_dict: try: data_dict[k] = data_dict[k].format(**vars_dict) except AttributeError: pass return data_dict def _get_groups_for_role(self, role_name): settings = self._app_settings var_name = 'groups_{}'.format(role_name) group_names = getattr(settings, var_name) return group_names def _get_group_for_role(self, role_name): group_names = self._get_groups_for_role(role_name) group_name = group_names[0] return group_name def _grant_role_to_user(self, user, role_name): group_name = self._get_group_for_role(role_name) group = Group.objects.get(name=group_name) user.groups.add(group) self.stdout.write(self.style.SUCCESS( 'Added user \'{}\' to group \'{}\''.format(user, group) )) def _create_group(self, group_name): group, created = Group.objects.get_or_create(name=group_name) if created: self.stdout.write(self.style.SUCCESS('Created group \'{}\''.format(group))) else: self.stdout.write(self.style.WARNING('Recycled group \'{}\''.format(group))) return group def _create_groups(self): roles = ( 'manager_super', 'manager_w', 'manager_s', 'manager_m', 'manager_k', 'manager_b', 'publisher_print', 'publisher_web', 'office', ) for role_name in roles: group_names = self._get_groups_for_role(role_name) for group_name in group_names: self._create_group(group_name) def _create_user(self, username, email, first_name, last_name, password=None, is_superuser=False): if password is None: password = email user_model = get_user_model() try: user = user_model.objects.get(username=username) self.stdout.write(self.style.WARNING('Recycled user \'{}\''.format(user))) user.groups.clear() except user_model.DoesNotExist: user = user_model.objects.create_user(username=username, email=email, first_name=first_name, last_name=last_name, password=password) self.stdout.write(self.style.SUCCESS('Created user \'{}\''.format(user))) if is_superuser: user.is_staff = True user.is_superuser = True user.save() self.stdout.write(self.style.SUCCESS('Made user \'{}\' a super user'.format(user))) return user def _create_users(self, user_list, data_vars=None): if data_vars is None: data_vars = {} for data_set in user_list: if data_vars: data_set = self._substitute_data_vars(data_set, data_vars) if 'username' not in data_set: data_set['username'] = data_set['email'] if 'roles' not in data_set: data_set['roles'] = () if 'is_superuser' not in data_set: data_set['is_superuser'] = False if 'password' not in data_set: data_set['password'] = None user = self._create_user(username=data_set['username'], email=data_set['email'], first_name=data_set['first_name'], last_name=data_set['last_name'], password=data_set['password'], is_superuser=data_set['is_superuser']) for role_name in data_set['roles']: self._grant_role_to_user(user, role_name) def _create_event(self, event_data): event = Event(**event_data) event.save() self.stdout.write(self.style.SUCCESS('Created event \'{}\''.format(event))) return event def _create_events(self, event_list, data_vars=None): if data_vars is None: data_vars = {} for data_set in event_list: if data_vars: data_set = self._substitute_data_vars(data_set, data_vars) if 'first_day_from_today' in data_set: data_set['first_day'] = datetime.date.today() + datetime.timedelta(data_set['first_day_from_today']) del data_set['first_day_from_today'] if 'last_day_from_today' in data_set: data_set['last_day'] = datetime.date.today() + datetime.timedelta(data_set['last_day_from_today']) del data_set['last_day_from_today'] if 'deadline_from_today' in data_set: data_set['deadline'] = datetime.date.today() + datetime.timedelta(data_set['deadline_from_today']) del data_set['deadline_from_today'] status_updates = [] if 'status_updates' in data_set: status_updates = data_set['status_updates'] del data_set['status_updates'] event = self._create_event(data_set) for status in status_updates: event.workflow.update_status(status, event.owner) def add_arguments(self, parser): parser.add_argument('--purge', action='store_true', help='remove all data from the database beforehand') parser.add_argument('--var', '-e', action='append', dest='data_vars', metavar='KEY=VALUE', help='Give a key=value pair for variable substitution in the test data') parser.add_argument('data_file', metavar='FILE.json', help='File containing the test data as JSON') def handle(self, *args, **options): app_config = apps.get_containing_app_config(__package__) app_settings = app_config.settings self._app_settings = app_settings with open(options['data_file'], 'r') as f: test_data = json.load(f) data_vars = test_data['vars'] if options['data_vars'] is not None: for pair in options['data_vars']: key = pair.split('=', 1)[0] value = pair.split('=', 1)[1] data_vars[key] = value if options['purge']: self.stdout.write(self.style.NOTICE('Purge database...')) OneClickAction.objects.all().delete() Event.objects.all().delete() EventStatus.objects.all().delete() user_model = get_user_model() user_model.objects.all().delete() Group.objects.all().delete() self.stdout.write(colorize('Creating groups...', fg='cyan', opts=('bold',))) self._create_groups() self.stdout.write(colorize('Creating users...', fg='cyan', opts=('bold',))) self._create_users(test_data['users'], data_vars) self.stdout.write(colorize('Creating events...', fg='cyan', opts=('bold',))) self._create_events(test_data['events'], data_vars) # raise CommandError('This is an error')