alternative-backend-service/users/views.py
saani 4fdc7c35ee feat: add user management endpoints and update appointment model
Add comprehensive API documentation for user management endpoints including profile updates, user listing, and admin user management features. Update appointment model to include additional status options (completed, cancelled) and add max_length constraint to email field. Change appointment creation endpoint to require user authentication instead of being public.

Changes:
- Add API docs for update_profile, get_profile, all-users endpoints
- Add API docs for activate-deactivate-user and delete-user admin endpoints
- Update appointment creation to require authentication
- Add 'completed' and 'cancelled' status options to Appointment model
- Add max_length constraint to EncryptedEmailField
- Regenerate initial migration with updated model definitions
2025-11-23 13:55:04 +00:00

408 lines
14 KiB
Python

from rest_framework import status, generics
from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework_simplejwt.tokens import RefreshToken
from django.contrib.auth import authenticate
from .models import CustomUser, UserProfile
from .serializers import UserRegistrationSerializer, UserSerializer, ResetPasswordSerializer, ForgotPasswordSerializer, VerifyPasswordResetOTPSerializer
from .utils import send_otp_via_email, is_otp_expired, generate_otp
from django.utils import timezone
from datetime import timedelta
from rest_framework.reverse import reverse
from meetings.models import AppointmentRequest
@api_view(['POST'])
@permission_classes([AllowAny])
def register_user(request):
serializer = UserRegistrationSerializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
UserProfile.objects.create(user=user)
otp = generate_otp()
user.verify_otp = otp
user.verify_otp_expiry = timezone.now() + timedelta(minutes=10)
user.save()
user_name = f"{user.first_name} {user.last_name}".strip() or user.email
email_sent = send_otp_via_email(user.email, otp, user_name, 'registration')
if not email_sent:
return Response({
'error': 'Failed to send OTP. Please try again later.'
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response({
'user': UserSerializer(user).data,
'otp_sent': email_sent,
'otp_expires_in': 10
}, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@api_view(['POST'])
@permission_classes([AllowAny])
def verify_otp(request):
email = request.data.get('email')
otp = request.data.get('otp')
if not email or not otp:
return Response({
'error': 'Email and OTP are required'
}, status=status.HTTP_400_BAD_REQUEST)
try:
user = CustomUser.objects.get(email=email)
if user.isVerified:
return Response({
'error': 'User is already verified'
}, status=status.HTTP_400_BAD_REQUEST)
if (user.verify_otp == otp and
not is_otp_expired(user.verify_otp_expiry)):
user.isVerified = True
user.verify_otp = None
user.verify_otp_expiry = None
user.save()
refresh = RefreshToken.for_user(user)
return Response({
'message': 'Email verified successfully',
'verified': True,
}, status=status.HTTP_200_OK)
else:
return Response({
'error': 'Invalid or expired OTP'
}, status=status.HTTP_400_BAD_REQUEST)
except CustomUser.DoesNotExist:
return Response({
'error': 'User not found'
}, status=status.HTTP_404_NOT_FOUND)
@api_view(['POST'])
@permission_classes([AllowAny])
def resend_otp(request):
email = request.data.get('email')
context = request.data.get('context', 'registration')
if not email:
return Response({
'error': 'Email is required'
}, status=status.HTTP_400_BAD_REQUEST)
try:
user = CustomUser.objects.get(email=email)
if user.isVerified and context == 'registration':
return Response({
'error': 'Already verified',
'message': 'Your email is already verified. You can login now.'
}, status=status.HTTP_400_BAD_REQUEST)
otp = generate_otp()
if context == 'password_reset':
user.forgot_password_otp = otp
user.forgot_password_otp_expiry = timezone.now() + timedelta(minutes=10)
else:
user.verify_otp = otp
user.verify_otp_expiry = timezone.now() + timedelta(minutes=10)
user.save()
user_name = f"{user.first_name} {user.last_name}".strip() or user.email
email_sent = send_otp_via_email(user.email, otp, user_name, context)
if not email_sent:
return Response({
'error': 'Failed to send OTP',
'message': 'Please try again later.'
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response({
'message': f'OTP resent to your email successfully',
'otp_sent': email_sent,
'otp_expires_in': 10,
'context': context
}, status=status.HTTP_200_OK)
except CustomUser.DoesNotExist:
return Response({
'error': 'User not found',
'message': 'No account found with this email address.'
}, status=status.HTTP_404_NOT_FOUND)
@api_view(['POST'])
@permission_classes([AllowAny])
def login_user(request):
email = request.data.get('email')
password = request.data.get('password')
user = authenticate(request, email=email, password=password)
if user is not None:
if not user.isVerified:
return Response({
'error': 'Email not verified',
'message': 'Please verify your email address before logging in.',
'email': user.email,
'can_resend_otp': True
}, status=status.HTTP_403_FORBIDDEN)
if not user.is_active:
return Response({
'error': 'Account deactivated',
'message': 'Your account has been deactivated. Please contact support.'
}, status=status.HTTP_403_FORBIDDEN)
refresh = RefreshToken.for_user(user)
return Response({
'user': UserSerializer(user).data,
'refresh': str(refresh),
'access': str(refresh.access_token),
'message': 'Login successful'
})
else:
return Response(
{'error': 'Invalid credentials'},
status=status.HTTP_401_UNAUTHORIZED
)
@api_view(['POST'])
@permission_classes([AllowAny])
def forgot_password(request):
serializer = ForgotPasswordSerializer(data=request.data)
if serializer.is_valid():
email = serializer.validated_data['email']
try:
user = CustomUser.objects.get(email=email)
if not user.isVerified:
return Response({
'error': 'Email not verified',
'message': 'Please verify your email address first.'
}, status=status.HTTP_400_BAD_REQUEST)
if not user.is_active:
return Response({
'error': 'Account deactivated',
'message': 'Your account has been deactivated.'
}, status=status.HTTP_400_BAD_REQUEST)
otp = generate_otp()
user.forgot_password_otp = otp
user.forgot_password_otp_expiry = timezone.now() + timedelta(minutes=10)
user.save()
user_name = f"{user.first_name} {user.last_name}".strip() or user.email
email_sent = send_otp_via_email(user.email, otp, user_name, 'password_reset')
if not email_sent:
return Response({
'error': 'Failed to send OTP',
'message': 'Please try again later.'
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response({
'message': 'Password reset OTP sent to your email',
'otp_sent': True,
'otp_expires_in': 10,
'email': user.email
}, status=status.HTTP_200_OK)
except CustomUser.DoesNotExist:
return Response({
'message': 'If the email exists, a password reset OTP has been sent.'
}, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@api_view(['POST'])
@permission_classes([AllowAny])
def verify_password_reset_otp(request):
serializer = VerifyPasswordResetOTPSerializer(data=request.data)
if serializer.is_valid():
email = serializer.validated_data['email']
otp = serializer.validated_data['otp']
try:
user = CustomUser.objects.get(email=email)
if (user.forgot_password_otp == otp and
not is_otp_expired(user.forgot_password_otp_expiry)):
return Response({
'message': 'OTP verified successfully',
'verified': True,
'email': user.email
}, status=status.HTTP_200_OK)
else:
return Response({
'error': 'Invalid or expired OTP'
}, status=status.HTTP_400_BAD_REQUEST)
except CustomUser.DoesNotExist:
return Response({
'error': 'User not found'
}, status=status.HTTP_404_NOT_FOUND)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@api_view(['POST'])
@permission_classes([AllowAny])
def reset_password(request):
serializer = ResetPasswordSerializer(data=request.data)
if serializer.is_valid():
email = serializer.validated_data['email']
otp = serializer.validated_data['otp']
new_password = serializer.validated_data['new_password']
try:
user = CustomUser.objects.get(email=email)
if (user.forgot_password_otp == otp and
not is_otp_expired(user.forgot_password_otp_expiry)):
# Set new password
user.set_password(new_password)
user.forgot_password_otp = None
user.forgot_password_otp_expiry = None
user.save()
return Response({
'message': 'Password reset successfully',
'success': True
}, status=status.HTTP_200_OK)
else:
return Response({
'error': 'Invalid or expired OTP'
}, status=status.HTTP_400_BAD_REQUEST)
except CustomUser.DoesNotExist:
return Response({
'error': 'User not found'
}, status=status.HTTP_404_NOT_FOUND)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@api_view(['POST'])
@permission_classes([AllowAny])
def resend_password_reset_otp(request):
serializer = ForgotPasswordSerializer(data=request.data)
if serializer.is_valid():
email = serializer.validated_data['email']
try:
user = CustomUser.objects.get(email=email)
otp = generate_otp()
user.forgot_password_otp = otp
user.forgot_password_otp_expiry = timezone.now() + timedelta(minutes=10)
user.save()
user_name = f"{user.first_name} {user.last_name}".strip() or user.email
email_sent = send_otp_via_email(user.email, otp, user_name, 'password_reset')
if not email_sent:
return Response({
'error': 'Failed to send OTP',
'message': 'Please try again later.'
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response({
'message': 'Password reset OTP resent to your email',
'otp_sent': True,
'otp_expires_in': 10
}, status=status.HTTP_200_OK)
except CustomUser.DoesNotExist:
return Response({
'message': 'If the email exists, a password reset OTP has been sent.'
}, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def get_user_profile(request):
serializer = UserSerializer(request.user)
return Response(serializer.data)
@api_view(['PUT'])
@permission_classes([IsAuthenticated])
def update_user_profile(request):
serializer = UserSerializer(request.user, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
return Response(serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class UserDetailView(generics.RetrieveAPIView):
serializer_class = UserSerializer
permission_classes = [IsAuthenticated]
def get_object(self):
return self.request.user
def IsAdminUser(user):
return user.is_staff
class GetAllUsersView(generics.ListAPIView):
serializer_class = UserSerializer
permission_classes = [IsAdminUser, IsAuthenticated]
def get_queryset(self):
return CustomUser.objects.filter(is_staff=False)
def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class DeleteUserView(generics.DestroyAPIView):
queryset = CustomUser.objects.all()
serializer_class = UserSerializer
permission_classes = [IsAdminUser, IsAuthenticated]
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
self.perform_destroy(instance)
return Response(status=status.HTTP_204_NO_CONTENT)
def perform_destroy(self, instance):
instance.delete()
# Delete associated UserProfile
UserProfile.objects.filter(user=instance).delete()
# Delete associated AppointmentRequests
AppointmentRequest.objects.filter(email=instance.email).delete()
class ActivateOrDeactivateUserView(generics.UpdateAPIView):
queryset = CustomUser.objects.all()
serializer_class = UserSerializer
permission_classes = [IsAdminUser, IsAuthenticated]
def update(self, request, *args, **kwargs):
instance = self.get_object()
instance.is_active = not instance.is_active
instance.save()
serializer = self.get_serializer(instance)
return Response(serializer.data)