backend-service/internal/jobs/manager.go

135 lines
2.9 KiB
Go
Raw Normal View History

package jobs
import (
"log"
"sync"
"attune-heart-therapy/internal/repositories"
)
// Manager manages all background job operations
type Manager struct {
scheduler *JobScheduler
reminderScheduler *ReminderScheduler
config *ReminderConfig
mu sync.RWMutex
running bool
}
// NewManager creates a new job manager instance
func NewManager(
notificationService NotificationService,
bookingRepo repositories.BookingRepository,
userRepo repositories.UserRepository,
config *ReminderConfig,
) *Manager {
if config == nil {
config = DefaultReminderConfig()
}
// Create job scheduler with 3 workers by default
jobScheduler := NewJobScheduler(3, notificationService, bookingRepo, userRepo)
// Create reminder scheduler
reminderScheduler := NewReminderScheduler(config, jobScheduler)
return &Manager{
scheduler: jobScheduler,
reminderScheduler: reminderScheduler,
config: config,
running: false,
}
}
// Start starts the job manager and all its components
func (m *Manager) Start() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
return nil
}
log.Printf("Starting job manager...")
// Start the job scheduler
m.scheduler.Start()
m.running = true
log.Printf("Job manager started successfully")
return nil
}
// Stop stops the job manager and all its components
func (m *Manager) Stop() error {
m.mu.Lock()
defer m.mu.Unlock()
if !m.running {
return nil
}
log.Printf("Stopping job manager...")
// Stop the job scheduler
m.scheduler.Stop()
m.running = false
log.Printf("Job manager stopped successfully")
return nil
}
// IsRunning returns whether the job manager is currently running
func (m *Manager) IsRunning() bool {
m.mu.RLock()
defer m.mu.RUnlock()
return m.running
}
// GetReminderScheduler returns the reminder scheduler instance
func (m *Manager) GetReminderScheduler() *ReminderScheduler {
return m.reminderScheduler
}
// GetJobScheduler returns the job scheduler instance
func (m *Manager) GetJobScheduler() *JobScheduler {
return m.scheduler
}
// UpdateReminderConfig updates the reminder configuration
func (m *Manager) UpdateReminderConfig(config *ReminderConfig) {
m.mu.Lock()
defer m.mu.Unlock()
m.config = config
m.reminderScheduler.UpdateReminderConfig(config)
log.Printf("Reminder configuration updated")
}
// GetReminderConfig returns the current reminder configuration
func (m *Manager) GetReminderConfig() *ReminderConfig {
m.mu.RLock()
defer m.mu.RUnlock()
return m.config
}
// ScheduleProcessPendingJob schedules a job to process all pending notifications
func (m *Manager) ScheduleProcessPendingJob() {
if !m.running {
log.Printf("Job manager not running, cannot schedule process pending job")
return
}
job := &Job{
ID: generateJobID(),
Type: JobTypeProcessPending,
MaxRetries: 1,
Status: "scheduled",
}
m.scheduler.ScheduleJob(job)
}