300 lines
9.7 KiB
Python
300 lines
9.7 KiB
Python
|
|
import uuid
|
||
|
|
from django.db import models
|
||
|
|
from django.conf import settings
|
||
|
|
from django.utils import timezone
|
||
|
|
from cryptography.fernet import Fernet
|
||
|
|
from cryptography.hazmat.primitives import hashes
|
||
|
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||
|
|
import base64
|
||
|
|
import os
|
||
|
|
|
||
|
|
class EncryptionManager:
|
||
|
|
def __init__(self):
|
||
|
|
self.fernet = self._get_fernet()
|
||
|
|
|
||
|
|
def _get_fernet_key(self):
|
||
|
|
key = getattr(settings, 'ENCRYPTION_KEY', None) or os.environ.get('ENCRYPTION_KEY')
|
||
|
|
if not key:
|
||
|
|
key = Fernet.generate_key().decode()
|
||
|
|
key = key.encode()
|
||
|
|
return key
|
||
|
|
|
||
|
|
def _get_fernet(self):
|
||
|
|
key = self._get_fernet_key()
|
||
|
|
return Fernet(key)
|
||
|
|
|
||
|
|
def encrypt_value(self, value):
|
||
|
|
if value is None or value == "":
|
||
|
|
return value
|
||
|
|
encrypted_value = self.fernet.encrypt(value.encode())
|
||
|
|
return base64.urlsafe_b64encode(encrypted_value).decode()
|
||
|
|
|
||
|
|
def decrypt_value(self, encrypted_value):
|
||
|
|
if encrypted_value is None or encrypted_value == "":
|
||
|
|
return encrypted_value
|
||
|
|
try:
|
||
|
|
encrypted_bytes = base64.urlsafe_b64decode(encrypted_value.encode())
|
||
|
|
decrypted_value = self.fernet.decrypt(encrypted_bytes)
|
||
|
|
return decrypted_value.decode()
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Decryption error: {e}")
|
||
|
|
return encrypted_value
|
||
|
|
|
||
|
|
encryption_manager = EncryptionManager()
|
||
|
|
|
||
|
|
class EncryptedCharField(models.CharField):
|
||
|
|
def from_db_value(self, value, expression, connection):
|
||
|
|
if value is None:
|
||
|
|
return value
|
||
|
|
return encryption_manager.decrypt_value(value)
|
||
|
|
|
||
|
|
def get_prep_value(self, value):
|
||
|
|
if value is None:
|
||
|
|
return value
|
||
|
|
return encryption_manager.encrypt_value(value)
|
||
|
|
|
||
|
|
class EncryptedEmailField(EncryptedCharField):
|
||
|
|
def __init__(self, *args, **kwargs):
|
||
|
|
kwargs['max_length'] = 254
|
||
|
|
super().__init__(*args, **kwargs)
|
||
|
|
|
||
|
|
def from_db_value(self, value, expression, connection):
|
||
|
|
if value is None:
|
||
|
|
return value
|
||
|
|
return encryption_manager.decrypt_value(value)
|
||
|
|
|
||
|
|
def get_prep_value(self, value):
|
||
|
|
if value is None:
|
||
|
|
return value
|
||
|
|
return encryption_manager.encrypt_value(value)
|
||
|
|
|
||
|
|
class EncryptedTextField(models.TextField):
|
||
|
|
def from_db_value(self, value, expression, connection):
|
||
|
|
if value is None:
|
||
|
|
return value
|
||
|
|
return encryption_manager.decrypt_value(value)
|
||
|
|
|
||
|
|
def get_prep_value(self, value):
|
||
|
|
if value is None:
|
||
|
|
return value
|
||
|
|
return encryption_manager.encrypt_value(value)
|
||
|
|
|
||
|
|
class AdminWeeklyAvailability(models.Model):
|
||
|
|
DAYS_OF_WEEK = [
|
||
|
|
(0, 'Monday'),
|
||
|
|
(1, 'Tuesday'),
|
||
|
|
(2, 'Wednesday'),
|
||
|
|
(3, 'Thursday'),
|
||
|
|
(4, 'Friday'),
|
||
|
|
(5, 'Saturday'),
|
||
|
|
(6, 'Sunday'),
|
||
|
|
]
|
||
|
|
|
||
|
|
available_days = models.JSONField(
|
||
|
|
default=list,
|
||
|
|
help_text="List of weekdays (0-6) when appointments are accepted"
|
||
|
|
)
|
||
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
||
|
|
updated_at = models.DateTimeField(auto_now=True)
|
||
|
|
|
||
|
|
class Meta:
|
||
|
|
verbose_name = 'Admin Weekly Availability'
|
||
|
|
verbose_name_plural = 'Admin Weekly Availability'
|
||
|
|
|
||
|
|
def __str__(self):
|
||
|
|
days = [self.DAYS_OF_WEEK[day][1] for day in self.available_days]
|
||
|
|
return f"Available: {', '.join(days)}"
|
||
|
|
|
||
|
|
class AppointmentRequest(models.Model):
|
||
|
|
STATUS_CHOICES = [
|
||
|
|
('pending_review', 'Pending Review'),
|
||
|
|
('scheduled', 'Scheduled'),
|
||
|
|
('rejected', 'Rejected'),
|
||
|
|
('completed', 'Completed'),
|
||
|
|
('cancelled', 'Cancelled'),
|
||
|
|
]
|
||
|
|
|
||
|
|
TIME_SLOT_CHOICES = [
|
||
|
|
('morning', 'Morning (9AM - 12PM)'),
|
||
|
|
('afternoon', 'Afternoon (1PM - 5PM)'),
|
||
|
|
('evening', 'Evening (6PM - 9PM)'),
|
||
|
|
]
|
||
|
|
|
||
|
|
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
|
||
|
|
|
||
|
|
first_name = EncryptedCharField(max_length=100)
|
||
|
|
last_name = EncryptedCharField(max_length=100)
|
||
|
|
email = EncryptedEmailField()
|
||
|
|
phone = EncryptedCharField(max_length=20, blank=True)
|
||
|
|
reason = EncryptedTextField(blank=True)
|
||
|
|
|
||
|
|
preferred_dates = models.JSONField(
|
||
|
|
help_text="List of preferred dates (YYYY-MM-DD format)"
|
||
|
|
)
|
||
|
|
preferred_time_slots = models.JSONField(
|
||
|
|
help_text="List of preferred time slots (morning/afternoon/evening)"
|
||
|
|
)
|
||
|
|
|
||
|
|
status = models.CharField(
|
||
|
|
max_length=20,
|
||
|
|
choices=STATUS_CHOICES,
|
||
|
|
default='pending_review'
|
||
|
|
)
|
||
|
|
|
||
|
|
scheduled_datetime = models.DateTimeField(null=True, blank=True)
|
||
|
|
scheduled_duration = models.PositiveIntegerField(
|
||
|
|
default=60,
|
||
|
|
help_text="Duration in minutes"
|
||
|
|
)
|
||
|
|
rejection_reason = EncryptedTextField(blank=True)
|
||
|
|
|
||
|
|
jitsi_meet_url = models.URLField(blank=True, help_text="Jitsi Meet URL for the video session")
|
||
|
|
jitsi_room_id = models.CharField(max_length=100, unique=True, blank=True, help_text="Jitsi room ID")
|
||
|
|
|
||
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
||
|
|
updated_at = models.DateTimeField(auto_now=True)
|
||
|
|
|
||
|
|
class Meta:
|
||
|
|
ordering = ['-created_at']
|
||
|
|
verbose_name = 'Appointment Request'
|
||
|
|
verbose_name_plural = 'Appointment Requests'
|
||
|
|
indexes = [
|
||
|
|
models.Index(fields=['status', 'scheduled_datetime']),
|
||
|
|
models.Index(fields=['email', 'created_at']),
|
||
|
|
]
|
||
|
|
|
||
|
|
@property
|
||
|
|
def full_name(self):
|
||
|
|
return f"{self.first_name} {self.last_name}"
|
||
|
|
|
||
|
|
@property
|
||
|
|
def formatted_created_at(self):
|
||
|
|
return self.created_at.strftime("%B %d, %Y at %I:%M %p")
|
||
|
|
|
||
|
|
@property
|
||
|
|
def formatted_scheduled_datetime(self):
|
||
|
|
if self.scheduled_datetime:
|
||
|
|
return self.scheduled_datetime.strftime("%B %d, %Y at %I:%M %p")
|
||
|
|
return None
|
||
|
|
|
||
|
|
@property
|
||
|
|
def has_jitsi_meeting(self):
|
||
|
|
return bool(self.jitsi_meet_url and self.jitsi_room_id)
|
||
|
|
|
||
|
|
@property
|
||
|
|
def meeting_in_future(self):
|
||
|
|
if not self.scheduled_datetime:
|
||
|
|
return False
|
||
|
|
return self.scheduled_datetime > timezone.now()
|
||
|
|
|
||
|
|
@property
|
||
|
|
def meeting_duration_display(self):
|
||
|
|
hours = self.scheduled_duration // 60
|
||
|
|
minutes = self.scheduled_duration % 60
|
||
|
|
if hours > 0:
|
||
|
|
return f"{hours}h {minutes}m"
|
||
|
|
return f"{minutes}m"
|
||
|
|
|
||
|
|
def get_preferred_dates_display(self):
|
||
|
|
try:
|
||
|
|
dates = [timezone.datetime.strptime(date, '%Y-%m-%d').strftime('%b %d, %Y')
|
||
|
|
for date in self.preferred_dates]
|
||
|
|
return ', '.join(dates)
|
||
|
|
except:
|
||
|
|
return ', '.join(self.preferred_dates)
|
||
|
|
|
||
|
|
def get_preferred_time_slots_display(self):
|
||
|
|
slot_display = {
|
||
|
|
'morning': 'Morning',
|
||
|
|
'afternoon': 'Afternoon',
|
||
|
|
'evening': 'Evening'
|
||
|
|
}
|
||
|
|
return ', '.join([slot_display.get(slot, slot) for slot in self.preferred_time_slots])
|
||
|
|
|
||
|
|
def generate_jitsi_room_id(self):
|
||
|
|
if not self.jitsi_room_id:
|
||
|
|
self.jitsi_room_id = f"therapy_session_{self.id.hex[:16]}"
|
||
|
|
return self.jitsi_room_id
|
||
|
|
|
||
|
|
def create_jitsi_meeting(self):
|
||
|
|
if not self.jitsi_room_id:
|
||
|
|
self.generate_jitsi_room_id()
|
||
|
|
|
||
|
|
jitsi_base_url = getattr(settings, 'JITSI_BASE_URL', 'https://meet.jit.si')
|
||
|
|
self.jitsi_meet_url = f"{jitsi_base_url}/{self.jitsi_room_id}"
|
||
|
|
return self.jitsi_meet_url
|
||
|
|
|
||
|
|
def get_jitsi_join_info(self):
|
||
|
|
if not self.has_jitsi_meeting:
|
||
|
|
return None
|
||
|
|
|
||
|
|
return {
|
||
|
|
'meeting_url': self.jitsi_meet_url,
|
||
|
|
'room_id': self.jitsi_room_id,
|
||
|
|
'scheduled_time': self.formatted_scheduled_datetime,
|
||
|
|
'duration': self.meeting_duration_display,
|
||
|
|
'join_instructions': 'Click the meeting URL to join the video session. No password required.'
|
||
|
|
}
|
||
|
|
|
||
|
|
def schedule_appointment(self, datetime_obj, duration=60, commit=True):
|
||
|
|
self.status = 'scheduled'
|
||
|
|
self.scheduled_datetime = datetime_obj
|
||
|
|
self.scheduled_duration = duration
|
||
|
|
self.rejection_reason = ''
|
||
|
|
|
||
|
|
self.create_jitsi_meeting()
|
||
|
|
|
||
|
|
if commit:
|
||
|
|
self.save()
|
||
|
|
|
||
|
|
def reject_appointment(self, reason='', commit=True):
|
||
|
|
self.status = 'rejected'
|
||
|
|
self.rejection_reason = reason
|
||
|
|
self.scheduled_datetime = None
|
||
|
|
self.jitsi_meet_url = ''
|
||
|
|
self.jitsi_room_id = ''
|
||
|
|
if commit:
|
||
|
|
self.save()
|
||
|
|
|
||
|
|
def cancel_appointment(self, reason='', commit=True):
|
||
|
|
self.status = 'cancelled'
|
||
|
|
self.rejection_reason = reason
|
||
|
|
if commit:
|
||
|
|
self.save()
|
||
|
|
|
||
|
|
def complete_appointment(self, commit=True):
|
||
|
|
self.status = 'completed'
|
||
|
|
if commit:
|
||
|
|
self.save()
|
||
|
|
|
||
|
|
def can_join_meeting(self):
|
||
|
|
if not self.scheduled_datetime or not self.has_jitsi_meeting:
|
||
|
|
return False
|
||
|
|
|
||
|
|
if self.status != 'scheduled':
|
||
|
|
return False
|
||
|
|
|
||
|
|
now = timezone.now()
|
||
|
|
meeting_start = self.scheduled_datetime
|
||
|
|
meeting_end = meeting_start + timezone.timedelta(minutes=self.scheduled_duration + 15) # 15 min buffer
|
||
|
|
|
||
|
|
return meeting_start - timezone.timedelta(minutes=10) <= now <= meeting_end
|
||
|
|
|
||
|
|
def get_meeting_status(self):
|
||
|
|
if not self.scheduled_datetime:
|
||
|
|
return "Not scheduled"
|
||
|
|
|
||
|
|
now = timezone.now()
|
||
|
|
meeting_start = self.scheduled_datetime
|
||
|
|
|
||
|
|
if now < meeting_start - timezone.timedelta(minutes=10):
|
||
|
|
return "Scheduled"
|
||
|
|
elif self.can_join_meeting():
|
||
|
|
return "Ready to join"
|
||
|
|
elif now > meeting_start + timezone.timedelta(minutes=self.scheduled_duration):
|
||
|
|
return "Completed"
|
||
|
|
else:
|
||
|
|
return "Ended"
|
||
|
|
|
||
|
|
def __str__(self):
|
||
|
|
return f"{self.full_name} - {self.get_status_display()} - {self.created_at.strftime('%Y-%m-%d')}"
|