backend-service/internal/jobs/scheduler.go

298 lines
7.8 KiB
Go
Raw Permalink Normal View History

package jobs
import (
"context"
"log"
"sync"
"time"
"attune-heart-therapy/internal/models"
"attune-heart-therapy/internal/repositories"
)
// JobType represents the type of background job
type JobType string
const (
JobTypeReminderEmail JobType = "reminder_email"
JobTypeProcessPending JobType = "process_pending"
)
// Job represents a background job to be executed
type Job struct {
ID string
Type JobType
BookingID *uint
UserID *uint
ScheduledAt time.Time
Payload map[string]interface{}
RetryCount int
MaxRetries int
Status string
CreatedAt time.Time
UpdatedAt time.Time
}
// NotificationService interface for sending notifications (to avoid import cycle)
type NotificationService interface {
SendReminder(user *models.User, booking *models.Booking) error
ProcessPendingNotifications() error
}
// JobScheduler manages background jobs and workers
type JobScheduler struct {
jobs chan *Job
workers int
quit chan bool
wg sync.WaitGroup
notificationService NotificationService
bookingRepo repositories.BookingRepository
userRepo repositories.UserRepository
ctx context.Context
cancel context.CancelFunc
ticker *time.Ticker
}
// NewJobScheduler creates a new job scheduler instance
func NewJobScheduler(
workers int,
notificationService NotificationService,
bookingRepo repositories.BookingRepository,
userRepo repositories.UserRepository,
) *JobScheduler {
ctx, cancel := context.WithCancel(context.Background())
return &JobScheduler{
jobs: make(chan *Job, 100), // Buffer for 100 jobs
workers: workers,
quit: make(chan bool),
notificationService: notificationService,
bookingRepo: bookingRepo,
userRepo: userRepo,
ctx: ctx,
cancel: cancel,
ticker: time.NewTicker(1 * time.Minute), // Check every minute
}
}
// Start starts the job scheduler and workers
func (js *JobScheduler) Start() {
log.Printf("Starting job scheduler with %d workers", js.workers)
// Start workers
for i := 0; i < js.workers; i++ {
js.wg.Add(1)
go js.worker(i)
}
// Start the scheduler ticker
js.wg.Add(1)
go js.scheduler()
log.Printf("Job scheduler started successfully")
}
// Stop stops the job scheduler and all workers
func (js *JobScheduler) Stop() {
log.Printf("Stopping job scheduler...")
js.cancel()
js.ticker.Stop()
close(js.quit)
close(js.jobs)
js.wg.Wait()
log.Printf("Job scheduler stopped")
}
// ScheduleJob adds a job to the queue
func (js *JobScheduler) ScheduleJob(job *Job) {
if job.ScheduledAt.Before(time.Now()) || job.ScheduledAt.Equal(time.Now()) {
// Execute immediately
select {
case js.jobs <- job:
log.Printf("Job %s scheduled for immediate execution", job.ID)
default:
log.Printf("Job queue full, dropping job %s", job.ID)
}
} else {
// Store for later execution (in a real implementation, this would be persisted)
log.Printf("Job %s scheduled for %s", job.ID, job.ScheduledAt.Format(time.RFC3339))
}
}
// ScheduleReminderJob schedules a reminder email job
func (js *JobScheduler) ScheduleReminderJob(bookingID uint, userID uint, reminderTime time.Time) {
job := &Job{
ID: generateJobID(),
Type: JobTypeReminderEmail,
BookingID: &bookingID,
UserID: &userID,
ScheduledAt: reminderTime,
MaxRetries: 3,
Status: "scheduled",
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
js.ScheduleJob(job)
}
// worker processes jobs from the queue
func (js *JobScheduler) worker(id int) {
defer js.wg.Done()
log.Printf("Worker %d started", id)
for {
select {
case job, ok := <-js.jobs:
if !ok {
log.Printf("Worker %d: job channel closed, stopping", id)
return
}
log.Printf("Worker %d processing job %s of type %s", id, job.ID, job.Type)
js.processJob(job)
case <-js.quit:
log.Printf("Worker %d: received quit signal, stopping", id)
return
}
}
}
// scheduler runs periodically to check for scheduled jobs
func (js *JobScheduler) scheduler() {
defer js.wg.Done()
log.Printf("Job scheduler ticker started")
for {
select {
case <-js.ticker.C:
// Process pending notifications
js.processPendingNotifications()
case <-js.ctx.Done():
log.Printf("Scheduler: context cancelled, stopping")
return
}
}
}
// processJob executes a specific job based on its type
func (js *JobScheduler) processJob(job *Job) {
defer func() {
if r := recover(); r != nil {
log.Printf("Job %s panicked: %v", job.ID, r)
js.handleJobFailure(job, "job panicked")
}
}()
switch job.Type {
case JobTypeReminderEmail:
js.processReminderJob(job)
case JobTypeProcessPending:
js.processAllPendingNotifications()
default:
log.Printf("Unknown job type: %s", job.Type)
}
}
// processReminderJob processes a reminder email job
func (js *JobScheduler) processReminderJob(job *Job) {
if job.BookingID == nil || job.UserID == nil {
log.Printf("Invalid reminder job %s: missing booking or user ID", job.ID)
return
}
// Get booking details
booking, err := js.bookingRepo.GetByID(*job.BookingID)
if err != nil {
log.Printf("Failed to get booking %d for reminder job %s: %v", *job.BookingID, job.ID, err)
js.handleJobFailure(job, err.Error())
return
}
// Get user details
user, err := js.userRepo.GetByID(*job.UserID)
if err != nil {
log.Printf("Failed to get user %d for reminder job %s: %v", *job.UserID, job.ID, err)
js.handleJobFailure(job, err.Error())
return
}
// Check if booking is still valid for reminder
if booking.Status != models.BookingStatusScheduled {
log.Printf("Skipping reminder for booking %d - status is %s", booking.ID, booking.Status)
return
}
// Check if the meeting is still in the future
if booking.ScheduledAt.Before(time.Now()) {
log.Printf("Skipping reminder for booking %d - meeting time has passed", booking.ID)
return
}
// Send the reminder
if err := js.notificationService.SendReminder(user, booking); err != nil {
log.Printf("Failed to send reminder for booking %d: %v", booking.ID, err)
js.handleJobFailure(job, err.Error())
return
}
log.Printf("Successfully sent reminder for booking %d to user %d", booking.ID, user.ID)
}
// processPendingNotifications processes all pending notifications
func (js *JobScheduler) processPendingNotifications() {
if err := js.notificationService.ProcessPendingNotifications(); err != nil {
log.Printf("Failed to process pending notifications: %v", err)
}
}
// processAllPendingNotifications is a job wrapper for processing pending notifications
func (js *JobScheduler) processAllPendingNotifications() {
js.processPendingNotifications()
}
// handleJobFailure handles job failures and retries
func (js *JobScheduler) handleJobFailure(job *Job, errorMsg string) {
job.RetryCount++
job.UpdatedAt = time.Now()
if job.RetryCount < job.MaxRetries {
// Retry with exponential backoff
retryDelay := time.Duration(job.RetryCount*job.RetryCount) * time.Minute
job.ScheduledAt = time.Now().Add(retryDelay)
log.Printf("Job %s failed (attempt %d/%d), retrying in %v: %s",
job.ID, job.RetryCount, job.MaxRetries, retryDelay, errorMsg)
// Reschedule the job
go func() {
time.Sleep(retryDelay)
js.ScheduleJob(job)
}()
} else {
job.Status = "failed"
log.Printf("Job %s failed permanently after %d attempts: %s", job.ID, job.RetryCount, errorMsg)
}
}
// generateJobID generates a unique job ID
func generateJobID() string {
return time.Now().Format("20060102150405") + "-" + randomString(8)
}
// randomString generates a random string of specified length
func randomString(length int) string {
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
b := make([]byte, length)
for i := range b {
b[i] = charset[time.Now().UnixNano()%int64(len(charset))]
}
return string(b)
}