alternative-backend-service/meetings/models.py

821 lines
31 KiB
Python
Raw Normal View History

import uuid
from django.db import models
from django.conf import settings
from django.utils import timezone
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
import secrets
import time
from datetime import datetime, timedelta
import jwt
import json
class EncryptionManager:
def __init__(self):
self.fernet = self._get_fernet()
def _get_fernet_key(self):
key = getattr(settings, 'ENCRYPTION_KEY', None) or os.environ.get('ENCRYPTION_KEY')
if not key:
key = Fernet.generate_key().decode()
key = key.encode()
return key
def _get_fernet(self):
key = self._get_fernet_key()
return Fernet(key)
def encrypt_value(self, value):
if value is None or value == "":
return value
encrypted_value = self.fernet.encrypt(value.encode())
return base64.urlsafe_b64encode(encrypted_value).decode()
def decrypt_value(self, encrypted_value):
if encrypted_value is None or encrypted_value == "":
return encrypted_value
try:
encrypted_bytes = base64.urlsafe_b64decode(encrypted_value.encode())
decrypted_value = self.fernet.decrypt(encrypted_bytes)
return decrypted_value.decode()
except Exception as e:
print(f"Decryption error: {e}")
return encrypted_value
encryption_manager = EncryptionManager()
class EncryptedCharField(models.CharField):
def from_db_value(self, value, expression, connection):
if value is None:
return value
return encryption_manager.decrypt_value(value)
def get_prep_value(self, value):
if value is None:
return value
return encryption_manager.encrypt_value(value)
class EncryptedEmailField(EncryptedCharField):
def __init__(self, *args, **kwargs):
kwargs['max_length'] = 254
super().__init__(*args, **kwargs)
def from_db_value(self, value, expression, connection):
if value is None:
return value
return encryption_manager.decrypt_value(value)
def get_prep_value(self, value):
if value is None:
return value
return encryption_manager.encrypt_value(value)
class EncryptedTextField(models.TextField):
def from_db_value(self, value, expression, connection):
if value is None:
return value
return encryption_manager.decrypt_value(value)
def get_prep_value(self, value):
if value is None:
return value
return encryption_manager.encrypt_value(value)
class EncryptedURLField(EncryptedCharField):
def __init__(self, *args, **kwargs):
kwargs['max_length'] = kwargs.get('max_length', 200)
super().__init__(*args, **kwargs)
class AdminWeeklyAvailability(models.Model):
DAYS_OF_WEEK = [
(0, 'Monday'),
(1, 'Tuesday'),
(2, 'Wednesday'),
(3, 'Thursday'),
(4, 'Friday'),
(5, 'Saturday'),
(6, 'Sunday'),
]
TIME_SLOT_CHOICES = [
('morning', 'Morning (9AM - 12PM)'),
('afternoon', 'Afternoon (1PM - 5PM)'),
('evening', 'Evening (6PM - 9PM)'),
]
availability_schedule = models.JSONField(
default=dict,
help_text="Dictionary with days as keys and lists of time slots as values"
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
verbose_name = 'Admin Weekly Availability'
verbose_name_plural = 'Admin Weekly Availability'
def set_availability(self, day, time_slots):
if not self.availability_schedule:
self.availability_schedule = {}
if day not in [str(d[0]) for d in self.DAYS_OF_WEEK]:
raise ValueError(f"Invalid day: {day}")
valid_slots = [slot[0] for slot in self.TIME_SLOT_CHOICES]
for slot in time_slots:
if slot not in valid_slots:
raise ValueError(f"Invalid time slot: {slot}")
self.availability_schedule[str(day)] = time_slots
def get_availability_for_day(self, day):
return self.availability_schedule.get(str(day), [])
def is_available(self, day, time_slot):
return time_slot in self.get_availability_for_day(day)
def get_all_available_slots(self):
available_slots = []
for day_num, time_slots in self.availability_schedule.items():
day_name = dict(self.DAYS_OF_WEEK).get(int(day_num))
for time_slot in time_slots:
time_display = dict(self.TIME_SLOT_CHOICES).get(time_slot)
available_slots.append({
'day_num': int(day_num),
'day_name': day_name,
'time_slot': time_slot,
'time_display': time_display
})
return available_slots
def clear_availability(self, day=None):
if day is None:
self.availability_schedule = {}
else:
day_str = str(day)
if day_str in self.availability_schedule:
del self.availability_schedule[day_str]
def __str__(self):
if not self.availability_schedule:
return "No availability set"
display_strings = []
for day_num, time_slots in sorted(self.availability_schedule.items()):
day_name = dict(self.DAYS_OF_WEEK).get(int(day_num))
slot_displays = [dict(self.TIME_SLOT_CHOICES).get(slot) for slot in time_slots]
display_strings.append(f"{day_name}: {', '.join(slot_displays)}")
return " | ".join(display_strings)
class AppointmentRequest(models.Model):
STATUS_CHOICES = [
('pending_review', 'Pending Review'),
('scheduled', 'Scheduled'),
('rejected', 'Rejected'),
('completed', 'Completed'),
('cancelled', 'Cancelled'),
]
TIME_SLOT_CHOICES = [
('morning', 'Morning (9AM - 12PM)'),
('afternoon', 'Afternoon (1PM - 5PM)'),
('evening', 'Evening (6PM - 9PM)'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
first_name = EncryptedCharField(max_length=255)
last_name = EncryptedCharField(max_length=255)
email = EncryptedEmailField()
phone = EncryptedCharField(max_length=255, blank=True)
reason = EncryptedTextField(blank=True)
preferred_dates = models.JSONField(
help_text="List of preferred dates (YYYY-MM-DD format)"
)
preferred_time_slots = models.JSONField(
help_text="List of preferred time slots (morning/afternoon/evening)"
)
status = models.CharField(
max_length=20,
choices=STATUS_CHOICES,
default='pending_review'
)
scheduled_datetime = models.DateTimeField(null=True, blank=True)
scheduled_duration = models.PositiveIntegerField(
default=60,
help_text="Duration in minutes"
)
rejection_reason = EncryptedTextField(blank=True)
jitsi_meet_url = models.URLField(
blank=True,
null=True,
help_text="Jitsi Meet URL for the video session"
)
jitsi_room_id = models.CharField(
max_length=100,
unique=True,
null=True,
blank=True,
help_text="Jitsi room ID"
)
jitsi_meeting_created = models.BooleanField(default=False)
jitsi_moderator_token = models.TextField(blank=True, null=True)
jitsi_participant_token = models.TextField(blank=True, null=True)
jitsi_meeting_password = models.CharField(max_length=100, blank=True, null=True)
jitsi_meeting_config = models.JSONField(
default=dict,
help_text="Jitsi meeting configuration and settings"
)
jitsi_recording_url = models.URLField(blank=True, null=True, help_text="URL to meeting recording")
jitsi_meeting_data = models.JSONField(
default=dict,
help_text="Additional meeting data (participants, duration, etc)"
)
selected_slots = models.JSONField(
default=list,
help_text="Original selected slots with day and time slot pairs"
)
meeting_started_at = models.DateTimeField(null=True, blank=True)
meeting_ended_at = models.DateTimeField(null=True, blank=True)
meeting_duration_actual = models.PositiveIntegerField(
default=0,
help_text="Actual meeting duration in minutes"
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-created_at']
verbose_name = 'Appointment Request'
verbose_name_plural = 'Appointment Requests'
indexes = [
models.Index(fields=['status', 'scheduled_datetime']),
models.Index(fields=['email', 'created_at']),
models.Index(fields=['jitsi_meeting_created', 'scheduled_datetime']),
models.Index(fields=['meeting_started_at']),
]
def __str__(self):
return f"{self.full_name} - {self.get_status_display()} - {self.created_at.strftime('%Y-%m-%d')}"
@property
def full_name(self):
return f"{self.first_name} {self.last_name}"
@property
def formatted_created_at(self):
return self.created_at.strftime("%B %d, %Y at %I:%M %p")
@property
def formatted_scheduled_datetime(self):
if self.scheduled_datetime:
return self.scheduled_datetime.strftime("%B %d, %Y at %I:%M %p")
return None
@property
def has_jitsi_meeting(self):
return bool(self.jitsi_meet_url and self.jitsi_room_id and self.jitsi_meeting_created)
@property
def meeting_in_future(self):
if not self.scheduled_datetime:
return False
return self.scheduled_datetime > timezone.now()
@property
def meeting_duration_display(self):
hours = self.scheduled_duration // 60
minutes = self.scheduled_duration % 60
if hours > 0:
return f"{hours}h {minutes}m"
return f"{minutes}m"
@property
def meeting_join_ready(self):
if not self.has_jitsi_meeting:
return False
if not self.scheduled_datetime:
return False
if self.status != 'scheduled':
return False
now = timezone.now()
meeting_start = self.scheduled_datetime
meeting_end = meeting_start + timedelta(minutes=self.scheduled_duration + 15)
return meeting_start - timedelta(minutes=10) <= now <= meeting_end
def get_preferred_dates_display(self):
try:
dates = [datetime.strptime(date, '%Y-%m-%d').strftime('%b %d, %Y')
for date in self.preferred_dates]
return ', '.join(dates)
except:
return ', '.join(self.preferred_dates)
def get_preferred_time_slots_display(self):
slot_display = {
'morning': 'Morning',
'afternoon': 'Afternoon',
'evening': 'Evening'
}
return ', '.join([slot_display.get(slot, slot) for slot in self.preferred_time_slots])
def are_preferences_available(self):
availability = get_admin_availability()
if not availability:
return False
for date_str in self.preferred_dates:
try:
date_obj = datetime.strptime(date_str, '%Y-%m-%d').date()
day_of_week = date_obj.weekday()
available_slots = availability.get_availability_for_day(day_of_week)
if any(slot in available_slots for slot in self.preferred_time_slots):
return True
except Exception as e:
print(f"Error checking availability for {date_str}: {e}")
continue
return False
def get_matching_availability(self):
availability = get_admin_availability()
if not availability:
return []
matching_slots = []
for date_str in self.preferred_dates:
try:
date_obj = datetime.strptime(date_str, '%Y-%m-%d').date()
day_of_week = date_obj.weekday()
day_name = dict(AdminWeeklyAvailability.DAYS_OF_WEEK).get(day_of_week)
available_slots = availability.get_availability_for_day(day_of_week)
matching_time_slots = [slot for slot in self.preferred_time_slots if slot in available_slots]
if matching_time_slots:
matching_slots.append({
'date': date_str,
'day_name': day_name,
'available_slots': matching_time_slots,
'date_obj': date_obj
})
except Exception as e:
print(f"Error processing {date_str}: {e}")
continue
return matching_slots
def generate_jitsi_room_id(self):
if not self.jitsi_room_id:
timestamp = int(time.time())
unique_id = secrets.token_hex(4)
self.jitsi_room_id = f"appointment_{str(self.id).replace('-', '')}_{timestamp}"
return self.jitsi_room_id
def generate_jwt_token(self, user, user_type='participant'):
if not self.jitsi_room_id:
self.generate_jitsi_room_id()
jitsi_config = getattr(settings, 'JITSI_CONFIG', {})
domain = jitsi_config['DOMAIN']
app_id = jitsi_config['APP_ID']
secret_key = jitsi_config['SECRET_KEY']
is_moderator = user_type == 'moderator'
features = {
"recording": "true" if is_moderator and jitsi_config.get('ENABLE_RECORDING') else "false",
"screen-sharing": "true",
"moderation": "true" if is_moderator else "false"
}
if hasattr(user, 'full_name') and user.full_name:
user_name = user.full_name
elif hasattr(user, 'get_full_name'):
user_name = user.get_full_name()
else:
user_name = f"{getattr(user, 'first_name', '')} {getattr(user, 'last_name', '')}".strip()
if not user_name:
user_name = getattr(user, 'username', 'User')
payload = {
'aud': app_id,
'iss': app_id,
'sub': domain,
'room': self.jitsi_room_id,
'exp': int(time.time()) + 7200,
'nbf': int(time.time()) - 60,
'moderator': is_moderator,
'context': {
'user': {
'id': str(user.id),
'name': user_name,
'email': getattr(user, 'email', ''),
'affiliation': 'moderator' if is_moderator else 'participant'
},
'features': features
}
}
debug_mode = getattr(settings, 'DEBUG', False)
if debug_mode:
print(f"\n=== Generating {user_type} JWT ===")
print(f"User: {user_name}")
print(f"Payload: {json.dumps(payload, indent=2)}")
token = jwt.encode(payload, secret_key, algorithm='HS256')
if isinstance(token, bytes):
token = token.decode('utf-8')
if debug_mode:
try:
decoded = jwt.decode(token, options={"verify_signature": False})
print(f"Token verified successfully")
features_str = json.dumps(decoded.get('context', {}).get('features', {}))
if '":"' in features_str and '","' not in features_str:
print("WARNING: Malformed JSON detected in features!")
except Exception as e:
print(f"Error verifying token: {e}")
self._store_token(token, user_type)
return token
def _store_token(self, token, user_type):
if user_type == 'moderator':
self.jitsi_moderator_token = token
else:
self.jitsi_participant_token = token
update_fields = ['updated_at']
if user_type == 'moderator':
update_fields.append('jitsi_moderator_token')
else:
update_fields.append('jitsi_participant_token')
self.save(update_fields=update_fields)
def create_jitsi_meeting(self, moderator_user, participant_user=None, with_moderation=True, custom_config=None):
if self.has_jitsi_meeting and not custom_config:
return self.jitsi_meet_url
self.generate_jitsi_room_id()
moderator_token = self.generate_jwt_token(moderator_user, 'moderator')
if participant_user:
participant_token = self.generate_jwt_token(participant_user, 'participant')
else:
class GenericUser:
id = 'participant'
first_name = 'Participant'
last_name = ''
email = ''
participant_token = self.generate_jwt_token(GenericUser(), 'participant')
jitsi_config = getattr(settings, 'JITSI_CONFIG', {})
domain = jitsi_config.get('DOMAIN', 'meet.jit.si')
custom_logo = jitsi_config.get('CUSTOM_LOGO', '')
brand_name = jitsi_config.get('BRAND_NAME', 'Therapy Session')
self.jitsi_meeting_password = secrets.token_urlsafe(8)
meeting_config = {
'domain': domain,
'room_id': self.jitsi_room_id,
'brand_name': brand_name,
'with_moderation': with_moderation,
'enable_recording': jitsi_config.get('ENABLE_RECORDING', False),
'enable_transcription': jitsi_config.get('ENABLE_TRANSCRIPTION', False),
'custom_logo': custom_logo,
'created_at': timezone.now().isoformat(),
}
if custom_config and isinstance(custom_config, dict):
meeting_config.update(custom_config)
self.jitsi_meet_url = self._build_meeting_url(
domain=domain,
room_id=self.jitsi_room_id,
token=participant_token,
config=meeting_config,
is_moderator=False
)
self.jitsi_meeting_config = meeting_config
self.jitsi_meeting_created = True
self.save()
return self.jitsi_meet_url
def _build_meeting_url(self, domain, room_id, token, config, is_moderator=False):
base_url = f"https://{domain}/{room_id}"
params = {
'jwt': token,
'config.startWithAudioMuted': 'true',
'config.startWithVideoMuted': 'true',
'config.prejoinPageEnabled': 'false',
'config.requireDisplayName': 'true',
'interfaceConfig.APP_NAME': config.get('brand_name', 'Therapy Session'),
'interfaceConfig.SHOW_JITSI_WATERMARK': 'false',
'interfaceConfig.SHOW_WATERMARK_FOR_GUESTS': 'false',
}
custom_logo = config.get('custom_logo')
if custom_logo:
params['interfaceConfig.DEFAULT_LOGO_URL'] = custom_logo
if config.get('with_moderation', True):
params.update({
'config.enableLobby': 'true',
'config.moderatorsCanMute': 'true',
'config.moderatorsCanLockRoom': 'true',
'config.moderatorsCanKick': 'true',
'config.enableClosePage': 'true',
})
if config.get('enable_recording', False) and is_moderator:
params['config.enableRecording'] = 'true'
params_list = [f"{key}={value}" for key, value in params.items() if value is not None]
query_string = '&'.join(params_list)
return f"{base_url}?{query_string}"
def get_moderator_join_url(self, moderator_user):
self.generate_jwt_token(moderator_user, 'moderator')
jitsi_config = getattr(settings, 'JITSI_CONFIG', {})
domain = jitsi_config.get('DOMAIN', 'meet.jit.si')
brand_name = jitsi_config.get('BRAND_NAME', 'Therapist Console')
custom_logo = jitsi_config.get('CUSTOM_LOGO', '')
moderator_params = {
'jwt': self.jitsi_moderator_token,
'config.startWithAudioMuted': 'true',
'config.startWithVideoMuted': 'true',
'config.enableLobby': str(jitsi_config.get('ENABLE_LOBBY', True)).lower(),
'config.requireDisplayName': str(jitsi_config.get('REQUIRE_DISPLAY_NAME', True)).lower(),
'config.moderatorsCanMute': str(jitsi_config.get('MODERATOR_CAN_MUTE', True)).lower(),
'config.moderatorsCanLockRoom': str(jitsi_config.get('MODERATOR_CAN_LOCK_ROOM', True)).lower(),
'config.moderatorsCanKick': str(jitsi_config.get('MODERATOR_CAN_KICK', True)).lower(),
'config.enableClosePage': str(jitsi_config.get('ENABLE_CLOSE_PAGE', True)).lower(),
'interfaceConfig.APP_NAME': brand_name,
'interfaceConfig.SHOW_JITSI_WATERMARK': 'false',
'interfaceConfig.SHOW_WATERMARK_FOR_GUESTS': 'false',
}
if custom_logo:
moderator_params['interfaceConfig.DEFAULT_LOGO_URL'] = custom_logo
if jitsi_config.get('ENABLE_RECORDING', False):
moderator_params['config.enableRecording'] = 'true'
if self.jitsi_meeting_password and jitsi_config.get('ENABLE_PASSWORD', True):
moderator_params['config.password'] = self.jitsi_meeting_password
params_list = [f"{key}={value}" for key, value in moderator_params.items() if value is not None]
query_string = '&'.join(params_list)
return f"https://{domain}/{self.jitsi_room_id}?{query_string}"
def get_participant_join_url(self, participant_user, include_password=True):
self.generate_jwt_token(participant_user, 'participant')
jitsi_config = getattr(settings, 'JITSI_CONFIG', {})
domain = jitsi_config.get('DOMAIN', 'meet.jit.si')
brand_name = jitsi_config.get('BRAND_NAME', 'Therapy Session')
custom_logo = jitsi_config.get('CUSTOM_LOGO', '')
participant_params = {
'jwt': self.jitsi_participant_token,
'config.startWithAudioMuted': 'true',
'config.startWithVideoMuted': 'true',
'config.enableLobby': str(jitsi_config.get('ENABLE_LOBBY', True)).lower(),
'config.requireDisplayName': str(jitsi_config.get('REQUIRE_DISPLAY_NAME', True)).lower(),
'interfaceConfig.APP_NAME': brand_name,
'interfaceConfig.SHOW_JITSI_WATERMARK': 'false',
'interfaceConfig.SHOW_WATERMARK_FOR_GUESTS': 'false',
}
if custom_logo:
participant_params['interfaceConfig.DEFAULT_LOGO_URL'] = custom_logo
if include_password and self.jitsi_meeting_password and jitsi_config.get('ENABLE_PASSWORD', True):
participant_params['config.password'] = self.jitsi_meeting_password
params_list = [f"{key}={value}" for key, value in participant_params.items() if value is not None]
query_string = '&'.join(params_list)
return f"https://{domain}/{self.jitsi_room_id}?{query_string}"
def can_join_meeting(self, user_type='participant'):
if not self.scheduled_datetime or not self.has_jitsi_meeting:
return False
if self.status not in ['scheduled', 'in_progress']:
return False
now = timezone.now()
meeting_start = self.scheduled_datetime
meeting_end = meeting_start + timedelta(minutes=self.scheduled_duration + 30)
if user_type == 'moderator':
return meeting_start - timedelta(minutes=15) <= now <= meeting_end + timedelta(minutes=15)
else:
return meeting_start - timedelta(minutes=5) <= now <= meeting_end
def schedule_appointment(self, datetime_obj, moderator_user, participant_user=None, duration=60, create_meeting=True, commit=True):
self.status = 'scheduled'
self.scheduled_datetime = datetime_obj
self.scheduled_duration = duration
self.rejection_reason = ''
if create_meeting:
self.create_jitsi_meeting(
moderator_user=moderator_user,
participant_user=participant_user,
with_moderation=True
)
if commit:
self.save()
def reject_appointment(self, reason='', commit=True):
self.status = 'rejected'
self.rejection_reason = reason
self.scheduled_datetime = None
self.jitsi_meet_url = None
self.jitsi_room_id = None
if commit:
self.save()
def cancel_appointment(self, reason='', commit=True):
self.status = 'cancelled'
self.rejection_reason = reason
if commit:
self.save()
def complete_appointment(self, commit=True):
self.status = 'completed'
if commit:
self.save()
def start_meeting(self, commit=True):
if self.status == 'scheduled':
self.meeting_started_at = timezone.now()
if commit:
self.save(update_fields=['meeting_started_at'])
def end_meeting(self, commit=True):
if self.meeting_started_at and not self.meeting_ended_at:
self.meeting_ended_at = timezone.now()
if self.meeting_started_at:
duration = self.meeting_ended_at - self.meeting_started_at
self.meeting_duration_actual = int(duration.total_seconds() / 60)
if commit:
self.save(update_fields=[
'meeting_ended_at',
'meeting_duration_actual'
])
def can_join_meeting(self, *args, **kwargs):
if args:
user_type = args[0]
elif 'user_type' in kwargs:
user_type = kwargs['user_type']
else:
user_type = 'participant'
if not self.scheduled_datetime or not self.has_jitsi_meeting:
return False
if self.status not in ['scheduled', 'in_progress']:
return False
now = timezone.now()
meeting_start = self.scheduled_datetime
meeting_end = meeting_start + timedelta(minutes=self.scheduled_duration + 30)
if user_type == 'moderator':
return meeting_start - timedelta(minutes=15) <= now <= meeting_end + timedelta(minutes=15)
else:
return meeting_start - timedelta(minutes=5) <= now <= meeting_end
def get_meeting_join_info(self, user_type='participant'):
if not self.has_jitsi_meeting:
return None
join_url = self.get_moderator_join_url() if user_type == 'moderator' else self.get_participant_join_url()
return {
'meeting_url': join_url,
'room_id': self.jitsi_room_id,
'scheduled_time': self.formatted_scheduled_datetime,
'duration': self.meeting_duration_display,
'password': self.jitsi_meeting_password if user_type == 'participant' else None,
'can_join_now': self.can_join_meeting(user_type),
'join_window_start': (self.scheduled_datetime - timedelta(minutes=15)).strftime("%I:%M %p") if user_type == 'moderator' else (self.scheduled_datetime - timedelta(minutes=5)).strftime("%I:%M %p"),
'join_window_end': (self.scheduled_datetime + timedelta(minutes=self.scheduled_duration + 30)).strftime("%I:%M %p"),
'status': self.get_status_display(),
}
def update_meeting_data(self, data):
if not isinstance(data, dict):
return
current_data = self.jitsi_meeting_data or {}
current_data.update(data)
self.jitsi_meeting_data = current_data
self.save(update_fields=['jitsi_meeting_data'])
def get_meeting_analytics(self):
return {
'scheduled_duration': self.scheduled_duration,
'actual_duration': self.meeting_duration_actual,
'started_at': self.meeting_started_at.isoformat() if self.meeting_started_at else None,
'ended_at': self.meeting_ended_at.isoformat() if self.meeting_ended_at else None,
'status': self.status,
'punctuality': self._calculate_punctuality(),
'efficiency': self._calculate_efficiency(),
}
def _calculate_punctuality(self):
if not self.meeting_started_at or not self.scheduled_datetime:
return None
delay = (self.meeting_started_at - self.scheduled_datetime).total_seconds() / 60
if abs(delay) <= 5:
return 'On time'
elif delay > 5:
return f'Late by {int(delay)} minutes'
else:
return f'Early by {int(abs(delay))} minutes'
def _calculate_efficiency(self):
if not self.meeting_duration_actual or not self.scheduled_duration:
return None
efficiency = (self.meeting_duration_actual / self.scheduled_duration) * 100
if efficiency <= 100:
return f'{int(efficiency)}% (Within schedule)'
else:
return f'{int(efficiency)}% (Overtime)'
def get_meeting_status(self):
if not self.scheduled_datetime:
return "Not scheduled"
now = timezone.now()
meeting_start = self.scheduled_datetime
if now < meeting_start - timedelta(minutes=10):
return "Scheduled"
elif self.can_join_meeting():
return "Ready to join"
elif now > meeting_start + timedelta(minutes=self.scheduled_duration):
return "Completed"
else:
return "Ended"
def get_admin_availability():
availability, created = AdminWeeklyAvailability.objects.get_or_create(
id=1
)
return availability
def set_admin_availability(availability_dict):
availability = get_admin_availability()
for day, time_slots in availability_dict.items():
availability.set_availability(day, time_slots)
availability.save()
return availability
def get_available_slots_for_week():
availability = get_admin_availability()
return availability.get_all_available_slots()
def check_date_availability(date_str):
try:
from datetime import datetime
date_obj = datetime.strptime(date_str, '%Y-%m-%d').date()
day_of_week = date_obj.weekday()
availability = get_admin_availability()
return availability.get_availability_for_day(day_of_week)
except Exception as e:
print(f"Error checking date availability: {e}")
return []