package jobs import ( "context" "log" "sync" "time" "attune-heart-therapy/internal/models" "attune-heart-therapy/internal/repositories" ) // JobType represents the type of background job type JobType string const ( JobTypeReminderEmail JobType = "reminder_email" JobTypeProcessPending JobType = "process_pending" ) // Job represents a background job to be executed type Job struct { ID string Type JobType BookingID *uint UserID *uint ScheduledAt time.Time Payload map[string]interface{} RetryCount int MaxRetries int Status string CreatedAt time.Time UpdatedAt time.Time } // NotificationService interface for sending notifications (to avoid import cycle) type NotificationService interface { SendReminder(user *models.User, booking *models.Booking) error ProcessPendingNotifications() error } // JobScheduler manages background jobs and workers type JobScheduler struct { jobs chan *Job workers int quit chan bool wg sync.WaitGroup notificationService NotificationService bookingRepo repositories.BookingRepository userRepo repositories.UserRepository ctx context.Context cancel context.CancelFunc ticker *time.Ticker } // NewJobScheduler creates a new job scheduler instance func NewJobScheduler( workers int, notificationService NotificationService, bookingRepo repositories.BookingRepository, userRepo repositories.UserRepository, ) *JobScheduler { ctx, cancel := context.WithCancel(context.Background()) return &JobScheduler{ jobs: make(chan *Job, 100), // Buffer for 100 jobs workers: workers, quit: make(chan bool), notificationService: notificationService, bookingRepo: bookingRepo, userRepo: userRepo, ctx: ctx, cancel: cancel, ticker: time.NewTicker(1 * time.Minute), // Check every minute } } // Start starts the job scheduler and workers func (js *JobScheduler) Start() { log.Printf("Starting job scheduler with %d workers", js.workers) // Start workers for i := 0; i < js.workers; i++ { js.wg.Add(1) go js.worker(i) } // Start the scheduler ticker js.wg.Add(1) go js.scheduler() log.Printf("Job scheduler started successfully") } // Stop stops the job scheduler and all workers func (js *JobScheduler) Stop() { log.Printf("Stopping job scheduler...") js.cancel() js.ticker.Stop() close(js.quit) close(js.jobs) js.wg.Wait() log.Printf("Job scheduler stopped") } // ScheduleJob adds a job to the queue func (js *JobScheduler) ScheduleJob(job *Job) { if job.ScheduledAt.Before(time.Now()) || job.ScheduledAt.Equal(time.Now()) { // Execute immediately select { case js.jobs <- job: log.Printf("Job %s scheduled for immediate execution", job.ID) default: log.Printf("Job queue full, dropping job %s", job.ID) } } else { // Store for later execution (in a real implementation, this would be persisted) log.Printf("Job %s scheduled for %s", job.ID, job.ScheduledAt.Format(time.RFC3339)) } } // ScheduleReminderJob schedules a reminder email job func (js *JobScheduler) ScheduleReminderJob(bookingID uint, userID uint, reminderTime time.Time) { job := &Job{ ID: generateJobID(), Type: JobTypeReminderEmail, BookingID: &bookingID, UserID: &userID, ScheduledAt: reminderTime, MaxRetries: 3, Status: "scheduled", CreatedAt: time.Now(), UpdatedAt: time.Now(), } js.ScheduleJob(job) } // worker processes jobs from the queue func (js *JobScheduler) worker(id int) { defer js.wg.Done() log.Printf("Worker %d started", id) for { select { case job, ok := <-js.jobs: if !ok { log.Printf("Worker %d: job channel closed, stopping", id) return } log.Printf("Worker %d processing job %s of type %s", id, job.ID, job.Type) js.processJob(job) case <-js.quit: log.Printf("Worker %d: received quit signal, stopping", id) return } } } // scheduler runs periodically to check for scheduled jobs func (js *JobScheduler) scheduler() { defer js.wg.Done() log.Printf("Job scheduler ticker started") for { select { case <-js.ticker.C: // Process pending notifications js.processPendingNotifications() case <-js.ctx.Done(): log.Printf("Scheduler: context cancelled, stopping") return } } } // processJob executes a specific job based on its type func (js *JobScheduler) processJob(job *Job) { defer func() { if r := recover(); r != nil { log.Printf("Job %s panicked: %v", job.ID, r) js.handleJobFailure(job, "job panicked") } }() switch job.Type { case JobTypeReminderEmail: js.processReminderJob(job) case JobTypeProcessPending: js.processAllPendingNotifications() default: log.Printf("Unknown job type: %s", job.Type) } } // processReminderJob processes a reminder email job func (js *JobScheduler) processReminderJob(job *Job) { if job.BookingID == nil || job.UserID == nil { log.Printf("Invalid reminder job %s: missing booking or user ID", job.ID) return } // Get booking details booking, err := js.bookingRepo.GetByID(*job.BookingID) if err != nil { log.Printf("Failed to get booking %d for reminder job %s: %v", *job.BookingID, job.ID, err) js.handleJobFailure(job, err.Error()) return } // Get user details user, err := js.userRepo.GetByID(*job.UserID) if err != nil { log.Printf("Failed to get user %d for reminder job %s: %v", *job.UserID, job.ID, err) js.handleJobFailure(job, err.Error()) return } // Check if booking is still valid for reminder if booking.Status != models.BookingStatusScheduled { log.Printf("Skipping reminder for booking %d - status is %s", booking.ID, booking.Status) return } // Check if the meeting is still in the future if booking.ScheduledAt.Before(time.Now()) { log.Printf("Skipping reminder for booking %d - meeting time has passed", booking.ID) return } // Send the reminder if err := js.notificationService.SendReminder(user, booking); err != nil { log.Printf("Failed to send reminder for booking %d: %v", booking.ID, err) js.handleJobFailure(job, err.Error()) return } log.Printf("Successfully sent reminder for booking %d to user %d", booking.ID, user.ID) } // processPendingNotifications processes all pending notifications func (js *JobScheduler) processPendingNotifications() { if err := js.notificationService.ProcessPendingNotifications(); err != nil { log.Printf("Failed to process pending notifications: %v", err) } } // processAllPendingNotifications is a job wrapper for processing pending notifications func (js *JobScheduler) processAllPendingNotifications() { js.processPendingNotifications() } // handleJobFailure handles job failures and retries func (js *JobScheduler) handleJobFailure(job *Job, errorMsg string) { job.RetryCount++ job.UpdatedAt = time.Now() if job.RetryCount < job.MaxRetries { // Retry with exponential backoff retryDelay := time.Duration(job.RetryCount*job.RetryCount) * time.Minute job.ScheduledAt = time.Now().Add(retryDelay) log.Printf("Job %s failed (attempt %d/%d), retrying in %v: %s", job.ID, job.RetryCount, job.MaxRetries, retryDelay, errorMsg) // Reschedule the job go func() { time.Sleep(retryDelay) js.ScheduleJob(job) }() } else { job.Status = "failed" log.Printf("Job %s failed permanently after %d attempts: %s", job.ID, job.RetryCount, errorMsg) } } // generateJobID generates a unique job ID func generateJobID() string { return time.Now().Format("20060102150405") + "-" + randomString(8) } // randomString generates a random string of specified length func randomString(length int) string { const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" b := make([]byte, length) for i := range b { b[i] = charset[time.Now().UnixNano()%int64(len(charset))] } return string(b) }