From 04d1894aeab84afb61a0a229f13355bcb71a305c Mon Sep 17 00:00:00 2001 From: Mo Tarbin Date: Sun, 9 Feb 2025 20:15:28 -0500 Subject: [PATCH] Add Event Producer Update User to carry webhook from circle if assigned Refactor notification handling and update models for webhook support --- config/config.go | 6 + internal/authorization/middleware.go | 35 ++-- internal/chore/handler.go | 42 +++-- internal/circle/handler.go | 2 +- internal/circle/model/model.go | 9 +- internal/events/producer.go | 164 ++++++++++++++++++ internal/notifier/model/model.go | 45 +++-- internal/notifier/notifier.go | 33 ++-- internal/notifier/repo/repository.go | 12 +- internal/notifier/service/planner.go | 51 +++++- .../notifier/service/pushover/pushover.go | 6 +- .../notifier/service/telegram/telegram.go | 6 +- internal/user/handler.go | 8 +- internal/user/model/model.go | 12 +- internal/user/repo/repository.go | 14 +- main.go | 5 +- ..._migrate_chat_id_to_notification_target.go | 2 +- 17 files changed, 351 insertions(+), 101 deletions(-) create mode 100644 internal/events/producer.go diff --git a/config/config.go b/config/config.go index 785554a..8c6cc11 100644 --- a/config/config.go +++ b/config/config.go @@ -20,6 +20,7 @@ type Config struct { EmailConfig EmailConfig `mapstructure:"email" yaml:"email"` StripeConfig StripeConfig `mapstructure:"stripe" yaml:"stripe"` OAuth2Config OAuth2Config `mapstructure:"oauth2" yaml:"oauth2"` + WebhookConfig WebhookConfig `mapstructure:"webhook" yaml:"webhook"` IsDoneTickDotCom bool `mapstructure:"is_done_tick_dot_com" yaml:"is_done_tick_dot_com"` IsUserCreationDisabled bool `mapstructure:"is_user_creation_disabled" yaml:"is_user_creation_disabled"` } @@ -97,6 +98,11 @@ type OAuth2Config struct { Name string `mapstructure:"name" yaml:"name"` } +type WebhookConfig struct { + Timeout time.Duration `mapstructure:"timeout" yaml:"timeout" default:"5s"` + QueueSize int `mapstructure:"queue_size" yaml:"queue_size" default:"100"` +} + func NewConfig() *Config { return &Config{ Telegram: TelegramConfig{ diff --git a/internal/authorization/middleware.go b/internal/authorization/middleware.go index 18a7026..87fd1fe 100644 --- a/internal/authorization/middleware.go +++ b/internal/authorization/middleware.go @@ -20,16 +20,16 @@ type signIn struct { Password string `form:"password" json:"password" binding:"required"` } -func CurrentUser(c *gin.Context) (*uModel.User, bool) { +func CurrentUser(c *gin.Context) (*uModel.UserDetails, bool) { data, ok := c.Get(identityKey) if !ok { return nil, false } - acc, ok := data.(*uModel.User) + acc, ok := data.(*uModel.UserDetails) return acc, ok } -func MustCurrentUser(c *gin.Context) *uModel.User { +func MustCurrentUser(c *gin.Context) *uModel.UserDetails { acc, ok := CurrentUser(c) if ok { return acc @@ -45,7 +45,7 @@ func NewAuthMiddleware(cfg *config.Config, userRepo *uRepo.UserRepository) (*jwt MaxRefresh: cfg.Jwt.MaxRefresh, // 7 days as long as their token is valid they can refresh it IdentityKey: identityKey, PayloadFunc: func(data interface{}) jwt.MapClaims { - if u, ok := data.(*uModel.User); ok { + if u, ok := data.(*uModel.UserDetails); ok { return jwt.MapClaims{ identityKey: u.Username, } @@ -85,22 +85,25 @@ func NewAuthMiddleware(cfg *config.Config, userRepo *uRepo.UserRepository) (*jwt } return nil, jwt.ErrFailedAuthentication } - return &uModel.User{ - ID: user.ID, - Username: user.Username, - Password: "", - Image: user.Image, - CreatedAt: user.CreatedAt, - UpdatedAt: user.UpdatedAt, - Disabled: user.Disabled, - CircleID: user.CircleID, + return &uModel.UserDetails{ + User: uModel.User{ + ID: user.ID, + Username: user.Username, + Password: "", + Image: user.Image, + CreatedAt: user.CreatedAt, + UpdatedAt: user.UpdatedAt, + Disabled: user.Disabled, + CircleID: user.CircleID, + }, + WebhookURL: user.WebhookURL, }, nil case "3rdPartyAuth": // we should only reach this stage if a handler mannually call authenticator with it's context: - var authObject *uModel.User + var authObject *uModel.UserDetails v := c.Value("user_account") - authObject = v.(*uModel.User) + authObject = v.(*uModel.UserDetails) return authObject, nil @@ -111,7 +114,7 @@ func NewAuthMiddleware(cfg *config.Config, userRepo *uRepo.UserRepository) (*jwt Authorizator: func(data interface{}, c *gin.Context) bool { - if _, ok := data.(*uModel.User); ok { + if _, ok := data.(*uModel.UserDetails); ok { return true } return false diff --git a/internal/chore/handler.go b/internal/chore/handler.go index 5c39aa8..80f8b61 100644 --- a/internal/chore/handler.go +++ b/internal/chore/handler.go @@ -15,6 +15,7 @@ import ( chModel "donetick.com/core/internal/chore/model" chRepo "donetick.com/core/internal/chore/repo" cRepo "donetick.com/core/internal/circle/repo" + "donetick.com/core/internal/events" lRepo "donetick.com/core/internal/label/repo" "donetick.com/core/internal/notifier" nRepo "donetick.com/core/internal/notifier/repo" @@ -27,25 +28,28 @@ import ( ) type Handler struct { - choreRepo *chRepo.ChoreRepository - circleRepo *cRepo.CircleRepository - notifier *notifier.Notifier - nPlanner *nps.NotificationPlanner - nRepo *nRepo.NotificationRepository - tRepo *tRepo.ThingRepository - lRepo *lRepo.LabelRepository + choreRepo *chRepo.ChoreRepository + circleRepo *cRepo.CircleRepository + notifier *notifier.Notifier + nPlanner *nps.NotificationPlanner + nRepo *nRepo.NotificationRepository + tRepo *tRepo.ThingRepository + lRepo *lRepo.LabelRepository + eventProducer *events.EventsProducer } func NewHandler(cr *chRepo.ChoreRepository, circleRepo *cRepo.CircleRepository, nt *notifier.Notifier, - np *nps.NotificationPlanner, nRepo *nRepo.NotificationRepository, tRepo *tRepo.ThingRepository, lRepo *lRepo.LabelRepository) *Handler { + np *nps.NotificationPlanner, nRepo *nRepo.NotificationRepository, tRepo *tRepo.ThingRepository, lRepo *lRepo.LabelRepository, + ep *events.EventsProducer) *Handler { return &Handler{ - choreRepo: cr, - circleRepo: circleRepo, - notifier: nt, - nPlanner: np, - nRepo: nRepo, - tRepo: tRepo, - lRepo: lRepo, + choreRepo: cr, + circleRepo: circleRepo, + notifier: nt, + nPlanner: np, + nRepo: nRepo, + tRepo: tRepo, + lRepo: lRepo, + eventProducer: ep, } } @@ -294,7 +298,7 @@ func (h *Handler) createChore(c *gin.Context) { go func() { h.nPlanner.GenerateNotifications(c, createdChore) }() - shouldReturn := HandleThingAssociation(choreReq, h, c, currentUser) + shouldReturn := HandleThingAssociation(choreReq, h, c, ¤tUser.User) if shouldReturn { return } @@ -551,7 +555,7 @@ func (h *Handler) editChore(c *gin.Context) { h.tRepo.DissociateThingWithChore(c, oldChore.ThingChore.ThingID, oldChore.ID) } - shouldReturn := HandleThingAssociation(choreReq, h, c, currentUser) + shouldReturn := HandleThingAssociation(choreReq, h, c, ¤tUser.User) if shouldReturn { return } @@ -818,7 +822,7 @@ func (h *Handler) skipChore(c *gin.Context) { }) return } - + h.eventProducer.ChoreSkipped(c, currentUser.WebhookURL, updatedChore, ¤tUser.User) c.JSON(200, gin.H{ "res": updatedChore, }) @@ -1069,7 +1073,7 @@ func (h *Handler) completeChore(c *gin.Context) { // h.notifier.SendChoreCompletion(c, chore, currentUser) // }() h.nPlanner.GenerateNotifications(c, updatedChore) - + h.eventProducer.ChoreCompleted(c, currentUser.WebhookURL, chore, ¤tUser.User) c.JSON(200, gin.H{ "res": updatedChore, }) diff --git a/internal/circle/handler.go b/internal/circle/handler.go index cd8d346..5ba4efc 100644 --- a/internal/circle/handler.go +++ b/internal/circle/handler.go @@ -143,7 +143,7 @@ func (h *Handler) LeaveCircle(c *gin.Context) { // START : HANDLE USER LEAVING CIRCLE // bulk update chores: - if err := handleUserLeavingCircle(h, c, currentUser, orginalCircleID); err != nil { + if err := handleUserLeavingCircle(h, c, ¤tUser.User, orginalCircleID); err != nil { log.Error("Error handling user leaving circle:", err) c.JSON(500, gin.H{ "error": "Error handling user leaving circle", diff --git a/internal/circle/model/model.go b/internal/circle/model/model.go index 284c412..e8b9e38 100644 --- a/internal/circle/model/model.go +++ b/internal/circle/model/model.go @@ -14,6 +14,7 @@ type Circle struct { UpdatedAt time.Time `json:"updated_at" gorm:"column:updated_at"` // Updated at InviteCode string `json:"invite_code" gorm:"column:invite_code"` // Invite code Disabled bool `json:"disabled" gorm:"column:disabled"` // Disabled + WebhookURL *string `json:"-" gorm:"column:webhook_url"` // Webhook URL } type CircleDetail struct { @@ -35,8 +36,8 @@ type UserCircle struct { type UserCircleDetail struct { UserCircle - Username string `json:"-" gorm:"column:username"` - DisplayName string `json:"displayName" gorm:"column:display_name"` - NotificationType nModel.NotificationType `json:"-" gorm:"column:notification_type"` - TargetID string `json:"-" gorm:"column:target_id"` // Target ID + Username string `json:"-" gorm:"column:username"` + DisplayName string `json:"displayName" gorm:"column:display_name"` + NotificationType nModel.NotificationPlatform `json:"-" gorm:"column:notification_type"` + TargetID string `json:"-" gorm:"column:target_id"` // Target ID } diff --git a/internal/events/producer.go b/internal/events/producer.go new file mode 100644 index 0000000..d1158d4 --- /dev/null +++ b/internal/events/producer.go @@ -0,0 +1,164 @@ +package events + +import ( + "bytes" + "context" + "encoding/json" + "log" + "net/http" + "time" + + "donetick.com/core/config" + chModel "donetick.com/core/internal/chore/model" + uModel "donetick.com/core/internal/user/model" + "donetick.com/core/logging" + "go.uber.org/zap" +) + +const ( + METHOD_POST = "POST" + HEAD_CONTENT_TYPE = "Content-Type" + CONTENT_TYPE_JSON = "application/json" +) + +type EventType string + +const ( + EventTypeUnknown EventType = "" + EventTypeChoreCreated EventType = "CREATED" + EventTypeChoreReminder EventType = "REMINDER" + EventTypeChoreUpdated EventType = "UPDATED" + EventTypeChoreCompleted EventType = "COMPLETED" + EventTypeChoreReassigned EventType = "REASSIGNED" + EventTypeChoreSkipped EventType = "SKIPPED" +) + +type Event struct { + Type EventType `json:"type"` + URL string `json:"-"` + Timestamp time.Time `json:"timestamp"` + Data interface{} `json:"data"` +} + +type ChoreData struct { + Chore *chModel.Chore `json:"chore"` + Username string `json:"username"` + DisplayName string `json:"display_name"` + Note string `json:"note"` +} + +type EventsProducer struct { + client *http.Client + queue chan Event + logger *zap.SugaredLogger +} + +func (p *EventsProducer) Start(ctx context.Context) { + + p.logger = logging.FromContext(ctx) + + go func() { + for event := range p.queue { + p.processEvent(event) + } + }() +} + +func NewEventsProducer(cfg *config.Config) *EventsProducer { + return &EventsProducer{ + client: &http.Client{ + Timeout: cfg.WebhookConfig.Timeout, + }, + queue: make(chan Event, cfg.WebhookConfig.QueueSize), + } +} + +func (p *EventsProducer) publishEvent(event Event) { + select { + case p.queue <- event: + // Successfully added to queue + default: + log.Println("Webhook queue is full, dropping event") + } +} + +func (p *EventsProducer) processEvent(event Event) { + p.logger.Debugw("Sending webhook event", "type", event.Type, "url", event.URL) + + eventJSON, err := json.Marshal(event) + if err != nil { + p.logger.Errorw("Failed to marshal webhook event", "error", err) + return + } + + // Pring the event and the url: + p.logger.Debug("Sending event to webhook", "url", event.URL, "event", event) + p.logger.Debug("Event: ", event) + + req, err := http.NewRequest(METHOD_POST, event.URL, bytes.NewBuffer(eventJSON)) + if err != nil { + p.logger.Errorw("Failed to create webhook request", "error", err) + return + } + req.Header.Set(HEAD_CONTENT_TYPE, CONTENT_TYPE_JSON) + + resp, err := p.client.Do(req) + if err != nil { + p.logger.Errorw("Failed to send webhook event", "error", err) + return + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + p.logger.Errorw("Webhook request failed", "status", resp.StatusCode) + return + } +} + +func (p *EventsProducer) ChoreCompleted(ctx context.Context, webhookURL *string, chore *chModel.Chore, performer *uModel.User) { + if webhookURL == nil { + p.logger.Debug("No subscribers for circle, skipping webhook") + return + } + + event := Event{ + Type: EventTypeChoreCompleted, + URL: *webhookURL, + Timestamp: time.Now(), + Data: ChoreData{Chore: chore, + Username: performer.Username, + DisplayName: performer.DisplayName, + }, + } + p.publishEvent(event) +} + +func (p *EventsProducer) ChoreSkipped(ctx context.Context, webhookURL *string, chore *chModel.Chore, performer *uModel.User) { + if webhookURL == nil { + p.logger.Debug("No Webhook URL for circle, skipping webhook") + return + } + + event := Event{ + Type: EventTypeChoreSkipped, + URL: *webhookURL, + Timestamp: time.Now(), + Data: ChoreData{Chore: chore, + Username: performer.Username, + DisplayName: performer.DisplayName, + }, + } + p.publishEvent(event) +} + +func (p *EventsProducer) NotificaitonEvent(ctx context.Context, url string, event interface{}) { + // print the event and the url : + p.logger.Debug("Sending notification event") + + p.publishEvent(Event{ + URL: url, + Type: EventTypeChoreReminder, + Timestamp: time.Now(), + Data: event, + }) +} diff --git a/internal/notifier/model/model.go b/internal/notifier/model/model.go index 823807d..cde2c1c 100644 --- a/internal/notifier/model/model.go +++ b/internal/notifier/model/model.go @@ -3,35 +3,32 @@ package model import "time" type Notification struct { - ID int `json:"id" gorm:"primaryKey"` - ChoreID int `json:"chore_id" gorm:"column:chore_id"` - UserID int `json:"user_id" gorm:"column:user_id"` - TargetID string `json:"target_id" gorm:"column:target_id"` - Text string `json:"text" gorm:"column:text"` - IsSent bool `json:"is_sent" gorm:"column:is_sent;index;default:false"` - TypeID NotificationType `json:"type" gorm:"column:type"` - ScheduledFor time.Time `json:"scheduled_for" gorm:"column:scheduled_for;index"` - CreatedAt time.Time `json:"created_at" gorm:"column:created_at"` + ID int `json:"id" gorm:"primaryKey"` + ChoreID int `json:"chore_id" gorm:"column:chore_id"` + CircleID int `json:"circle_id" gorm:"column:circle_id"` + UserID int `json:"user_id" gorm:"column:user_id"` + TargetID string `json:"target_id" gorm:"column:target_id"` + Text string `json:"text" gorm:"column:text"` + IsSent bool `json:"is_sent" gorm:"column:is_sent;index;default:false"` + TypeID NotificationPlatform `json:"type" gorm:"column:type"` + ScheduledFor time.Time `json:"scheduled_for" gorm:"column:scheduled_for;index"` + CreatedAt time.Time `json:"created_at" gorm:"column:created_at"` + RawEvent interface{} `json:"raw_event" gorm:"column:raw_event;type:jsonb"` +} +type NotificationDetails struct { + Notification + WebhookURL *string `json:"webhook_url" gorm:"column:webhook_url;<-:null"` // read-only, will only be used if webhook enabled + } func (n *Notification) IsValid() bool { - switch n.TypeID { - case NotificationTypeTelegram, NotificationTypePushover: - if n.TargetID == "" { - return false - } else if n.Text == "0" { - return false - } - return true - default: - return false - } + return true } -type NotificationType int8 +type NotificationPlatform int8 const ( - NotificationTypeNone NotificationType = iota - NotificationTypeTelegram - NotificationTypePushover + NotificationPlatformNone NotificationPlatform = iota + NotificationPlatformTelegram + NotificationPlatformPushover ) diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go index 0e12a0e..28fb523 100644 --- a/internal/notifier/notifier.go +++ b/internal/notifier/notifier.go @@ -3,39 +3,52 @@ package notifier import ( "context" + "donetick.com/core/internal/events" nModel "donetick.com/core/internal/notifier/model" pushover "donetick.com/core/internal/notifier/service/pushover" telegram "donetick.com/core/internal/notifier/service/telegram" + "donetick.com/core/logging" ) type Notifier struct { - Telegram *telegram.TelegramNotifier - Pushover *pushover.Pushover + Telegram *telegram.TelegramNotifier + Pushover *pushover.Pushover + eventsProducer *events.EventsProducer } -func NewNotifier(t *telegram.TelegramNotifier, p *pushover.Pushover) *Notifier { +func NewNotifier(t *telegram.TelegramNotifier, p *pushover.Pushover, ep *events.EventsProducer) *Notifier { return &Notifier{ - Telegram: t, - Pushover: p, + Telegram: t, + Pushover: p, + eventsProducer: ep, } } -func (n *Notifier) SendNotification(c context.Context, notification *nModel.Notification) error { +func (n *Notifier) SendNotification(c context.Context, notification *nModel.NotificationDetails) error { log := logging.FromContext(c) + var err error switch notification.TypeID { - case nModel.NotificationTypeTelegram: + case nModel.NotificationPlatformTelegram: if n.Telegram == nil { log.Error("Telegram bot is not initialized, Skipping sending message") return nil } - return n.Telegram.SendNotification(c, notification) - case nModel.NotificationTypePushover: + err = n.Telegram.SendNotification(c, notification) + case nModel.NotificationPlatformPushover: if n.Pushover == nil { log.Error("Pushover is not initialized, Skipping sending message") return nil } - return n.Pushover.SendNotification(c, notification) + err = n.Pushover.SendNotification(c, notification) } + if err != nil { + log.Error("Failed to send notification", "err", err) + } + if notification.RawEvent != nil && notification.WebhookURL != nil { + // if we have a webhook url, we should send the event to the webhook + n.eventsProducer.NotificaitonEvent(c, *notification.WebhookURL, notification.RawEvent) + } + return nil } diff --git a/internal/notifier/repo/repository.go b/internal/notifier/repo/repository.go index 0e69a3f..6491390 100644 --- a/internal/notifier/repo/repository.go +++ b/internal/notifier/repo/repository.go @@ -23,7 +23,7 @@ func (r *NotificationRepository) DeleteAllChoreNotifications(choreID int) error func (r *NotificationRepository) BatchInsertNotifications(notifications []*nModel.Notification) error { return r.db.Create(¬ifications).Error } -func (r *NotificationRepository) MarkNotificationsAsSent(notifications []*nModel.Notification) error { +func (r *NotificationRepository) MarkNotificationsAsSent(notifications []*nModel.NotificationDetails) error { // Extract IDs from notifications var ids []int for _, notification := range notifications { @@ -32,11 +32,15 @@ func (r *NotificationRepository) MarkNotificationsAsSent(notifications []*nModel // Use the extracted IDs in the Where clause return r.db.Model(&nModel.Notification{}).Where("id IN (?)", ids).Update("is_sent", true).Error } -func (r *NotificationRepository) GetPendingNotificaiton(c context.Context, lookback time.Duration) ([]*nModel.Notification, error) { - var notifications []*nModel.Notification +func (r *NotificationRepository) GetPendingNotificaiton(c context.Context, lookback time.Duration) ([]*nModel.NotificationDetails, error) { + var notifications []*nModel.NotificationDetails start := time.Now().UTC().Add(-lookback) end := time.Now().UTC() - if err := r.db.Where("is_sent = ? AND scheduled_for < ? AND scheduled_for > ?", false, end, start).Find(¬ifications).Error; err != nil { + if err := r.db.Table("notifications"). + Select("notifications.*, circles.webhook_url as webhook_url"). + Joins("left join circles on circles.id = notifications.circle_id"). + Where("notifications.is_sent = ? AND notifications.scheduled_for < ? AND notifications.scheduled_for > ?", false, end, start). + Find(¬ifications).Error; err != nil { return nil, err } return notifications, nil diff --git a/internal/notifier/service/planner.go b/internal/notifier/service/planner.go index a8f8ef6..2544b10 100644 --- a/internal/notifier/service/planner.go +++ b/internal/notifier/service/planner.go @@ -65,7 +65,7 @@ func (n *NotificationPlanner) GenerateNotifications(c context.Context, chore *ch if mt.CircleGroup { notifications = append(notifications, generateCircleGroupNotifications(chore, mt)...) } - + log.Debug("Generated notifications", "count", len(notifications)) n.nRepo.BatchInsertNotifications(notifications) return true } @@ -89,6 +89,13 @@ func generateDueNotifications(chore *chModel.Chore, users []*cModel.UserCircleDe UserID: user.ID, TargetID: user.TargetID, Text: fmt.Sprintf("📅 Reminder: *%s* is due today and assigned to %s.", chore.Name, assignee.DisplayName), + RawEvent: map[string]interface{}{ + "id": chore.ID, + "name": chore.Name, + "due_date": chore.NextDueDate.Format("January 2nd"), + "assignee": assignee.DisplayName, + "assignee_username": assignee.Username, + }, } if notification.IsValid() { notifications = append(notifications, notification) @@ -117,6 +124,13 @@ func generatePreDueNotifications(chore *chModel.Chore, users []*cModel.UserCircl UserID: user.ID, TargetID: user.TargetID, Text: fmt.Sprintf("📢 Heads up! *%s* is due soon (on %s) and assigned to %s.", chore.Name, chore.NextDueDate.Format("January 2nd"), assignee.DisplayName), + RawEvent: map[string]interface{}{ + "id": chore.ID, + "name": chore.Name, + "due_date": chore.NextDueDate.Format("January 2nd"), + "assignee": assignee.DisplayName, + "assignee_username": assignee.Username, + }, } if notification.IsValid() { notifications = append(notifications, notification) @@ -148,6 +162,14 @@ func generateOverdueNotifications(chore *chModel.Chore, users []*cModel.UserCirc UserID: user.ID, TargetID: fmt.Sprint(user.TargetID), Text: fmt.Sprintf("🚨 *%s* is now %d hours overdue. Please complete it as soon as possible. (Assigned to %s)", chore.Name, hours, assignee.DisplayName), + RawEvent: map[string]interface{}{ + "id": chore.ID, + "type": EventTypeOverdue, + "name": chore.Name, + "due_date": chore.NextDueDate.Format("January 2nd"), + "assignee": assignee.DisplayName, + "assignee_username": assignee.Username, + }, } if notification.IsValid() { notifications = append(notifications, notification) @@ -173,6 +195,12 @@ func generateCircleGroupNotifications(chore *chModel.Chore, mt *chModel.Notifica TypeID: 1, TargetID: fmt.Sprint(*mt.CircleGroupID), Text: fmt.Sprintf("📅 Reminder: *%s* is due today.", chore.Name), + RawEvent: map[string]interface{}{ + "id": chore.ID, + "type": EventTypeDue, + "name": chore.Name, + "due_date": chore.NextDueDate.Format("January 2nd"), + }, } if notification.IsValid() { notifications = append(notifications, notification) @@ -188,6 +216,12 @@ func generateCircleGroupNotifications(chore *chModel.Chore, mt *chModel.Notifica TypeID: 3, TargetID: fmt.Sprint(*mt.CircleGroupID), Text: fmt.Sprintf("📢 Heads up! *%s* is due soon (on %s).", chore.Name, chore.NextDueDate.Format("January 2nd")), + RawEvent: map[string]interface{}{ + "id": chore.ID, + "type": EventTypePreDue, + "name": chore.Name, + "due_date": chore.NextDueDate.Format("January 2nd"), + }, } if notification.IsValid() { notifications = append(notifications, notification) @@ -205,6 +239,12 @@ func generateCircleGroupNotifications(chore *chModel.Chore, mt *chModel.Notifica TypeID: 2, TargetID: fmt.Sprint(*mt.CircleGroupID), Text: fmt.Sprintf("🚨 *%s* is now %d hours overdue. Please complete it as soon as possible.", chore.Name, hours), + RawEvent: map[string]interface{}{ + "id": chore.ID, + "type": EventTypeOverdue, + "name": chore.Name, + "due_date": chore.NextDueDate.Format("January 2nd"), + }, } if notification.IsValid() { notifications = append(notifications, notification) @@ -214,3 +254,12 @@ func generateCircleGroupNotifications(chore *chModel.Chore, mt *chModel.Notifica return notifications } + +type EventType string + +const ( + EventTypeUnknown EventType = "unknown" + EventTypeDue EventType = "due" + EventTypePreDue EventType = "pre_due" + EventTypeOverdue EventType = "overdue" +) diff --git a/internal/notifier/service/pushover/pushover.go b/internal/notifier/service/pushover/pushover.go index 08caacd..c1c9916 100644 --- a/internal/notifier/service/pushover/pushover.go +++ b/internal/notifier/service/pushover/pushover.go @@ -2,6 +2,7 @@ package pushover import ( "context" + "errors" "donetick.com/core/config" nModel "donetick.com/core/internal/notifier/model" @@ -22,7 +23,10 @@ func NewPushover(cfg *config.Config) *Pushover { } } -func (p *Pushover) SendNotification(c context.Context, notification *nModel.Notification) error { +func (p *Pushover) SendNotification(c context.Context, notification *nModel.NotificationDetails) error { + if notification.TargetID == "" { + return errors.New("unable to send notification, targetID is empty") + } log := logging.FromContext(c) recipient := pushover.NewRecipient(notification.TargetID) message := pushover.NewMessageWithTitle(notification.Text, "Donetick") diff --git a/internal/notifier/service/telegram/telegram.go b/internal/notifier/service/telegram/telegram.go index 82ed539..5481cf0 100644 --- a/internal/notifier/service/telegram/telegram.go +++ b/internal/notifier/service/telegram/telegram.go @@ -70,12 +70,10 @@ func (tn *TelegramNotifier) SendChoreCompletion(c context.Context, chore *chMode } -func (tn *TelegramNotifier) SendNotification(c context.Context, notification *nModel.Notification) error { - +func (tn *TelegramNotifier) SendNotification(c context.Context, notification *nModel.NotificationDetails) error { log := logging.FromContext(c) if notification.TargetID == "" { - log.Error("Notification target ID is empty") - return errors.New("Notification target ID is empty") + return errors.New("unable to send notification, targetID is empty") } chatID, err := strconv.ParseInt(notification.TargetID, 10, 64) if err != nil { diff --git a/internal/user/handler.go b/internal/user/handler.go index ed87890..1d18c8e 100644 --- a/internal/user/handler.go +++ b/internal/user/handler.go @@ -518,7 +518,7 @@ func (h *Handler) UpdateUserDetails(c *gin.Context) { user.Image = *req.Image } - if err := h.userRepo.UpdateUser(c, user); err != nil { + if err := h.userRepo.UpdateUser(c, &user.User); err != nil { c.JSON(500, gin.H{ "error": "Error updating user", }) @@ -608,8 +608,8 @@ func (h *Handler) UpdateNotificationTarget(c *gin.Context) { } type Request struct { - Type nModel.NotificationType `json:"type"` - Target string `json:"target"` + Type nModel.NotificationPlatform `json:"type"` + Target string `json:"target"` } var req Request @@ -617,7 +617,7 @@ func (h *Handler) UpdateNotificationTarget(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request"}) return } - if req.Type == nModel.NotificationTypeNone { + if req.Type == nModel.NotificationPlatformNone { err := h.userRepo.DeleteNotificationTarget(c, currentUser.ID) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to delete notification target"}) diff --git a/internal/user/model/model.go b/internal/user/model/model.go index 5d70e98..b780eac 100644 --- a/internal/user/model/model.go +++ b/internal/user/model/model.go @@ -25,6 +25,10 @@ type User struct { Expiration *string `json:"expiration" gorm:"column:expiration;<-:false"` // read only column UserNotificationTargets UserNotificationTarget `json:"notification_target" gorm:"foreignKey:UserID;references:ID"` } +type UserDetails struct { + User + WebhookURL *string `json:"webhookURL" gorm:"column:webhook_url;<-:false"` // read only column +} type UserPasswordReset struct { ID int `gorm:"column:id"` @@ -43,10 +47,10 @@ type APIToken struct { } type UserNotificationTarget struct { - UserID int `json:"userId" gorm:"column:user_id;index;primaryKey"` // Index on userID - Type nModel.NotificationType `json:"type" gorm:"column:type"` // Type - TargetID string `json:"target_id" gorm:"column:target_id"` // Target ID - CreatedAt time.Time `json:"-" gorm:"column:created_at"` + UserID int `json:"userId" gorm:"column:user_id;index;primaryKey"` // Index on userID + Type nModel.NotificationPlatform `json:"type" gorm:"column:type"` // Type + TargetID string `json:"target_id" gorm:"column:target_id"` // Target ID + CreatedAt time.Time `json:"-" gorm:"column:created_at"` } type AuthProviderType int diff --git a/internal/user/repo/repository.go b/internal/user/repo/repository.go index 38fdb32..9873766 100644 --- a/internal/user/repo/repository.go +++ b/internal/user/repo/repository.go @@ -13,7 +13,7 @@ import ( ) type IUserRepository interface { - GetUserByUsername(username string) (*uModel.User, error) + GetUserByUsername(username string) (*uModel.UserDetails, error) GetUser(id int) (*uModel.User, error) GetAllUsers() ([]*uModel.User, error) CreateUser(user *uModel.User) error @@ -52,14 +52,14 @@ func (r *UserRepository) CreateUser(c context.Context, user *uModel.User) (*uMod } return user, nil } -func (r *UserRepository) GetUserByUsername(c context.Context, username string) (*uModel.User, error) { - var user *uModel.User +func (r *UserRepository) GetUserByUsername(c context.Context, username string) (*uModel.UserDetails, error) { + var user *uModel.UserDetails if r.isDonetickDotCom { - if err := r.db.WithContext(c).Preload("UserNotificationTargets").Table("users u").Select("u.*, ss.status as subscription, ss.expired_at as expiration").Joins("left join stripe_customers sc on sc.user_id = u.id ").Joins("left join stripe_subscriptions ss on sc.customer_id = ss.customer_id").Where("username = ?", username).First(&user).Error; err != nil { + if err := r.db.WithContext(c).Preload("UserNotificationTargets").Table("users u").Select("u.*, ss.status as subscription, ss.expired_at as expiration, c.webhook_url as webhook_url").Joins("left join stripe_customers sc on sc.user_id = u.id ").Joins("left join stripe_subscriptions ss on sc.customer_id = ss.customer_id").Joins("left join circles c on c.id = u.circle_id").Where("username = ?", username).First(&user).Error; err != nil { return nil, err } } else { - if err := r.db.WithContext(c).Preload("UserNotificationTargets").Table("users u").Select("u.*, 'active' as subscription, '2999-12-31' as expiration").Where("username = ?", username).First(&user).Error; err != nil { + if err := r.db.WithContext(c).Preload("UserNotificationTargets").Table("users u").Select("u.*, 'active' as subscription, '2999-12-31' as expiration, c.webhook_url as webhook_url").Joins("left join circles c on c.id = u.circle_id").Where("username = ?", username).First(&user).Error; err != nil { return nil, err } } @@ -160,7 +160,7 @@ func (r *UserRepository) DeleteAPIToken(c context.Context, userID int, tokenID s return r.db.WithContext(c).Where("id = ? AND user_id = ?", tokenID, userID).Delete(&uModel.APIToken{}).Error } -func (r *UserRepository) UpdateNotificationTarget(c context.Context, userID int, targetID string, targetType nModel.NotificationType) error { +func (r *UserRepository) UpdateNotificationTarget(c context.Context, userID int, targetID string, targetType nModel.NotificationPlatform) error { return r.db.WithContext(c).Save(&uModel.UserNotificationTarget{ UserID: userID, TargetID: targetID, @@ -173,7 +173,7 @@ func (r *UserRepository) DeleteNotificationTarget(c context.Context, userID int) return r.db.WithContext(c).Where("user_id = ?", userID).Delete(&uModel.UserNotificationTarget{}).Error } -func (r *UserRepository) UpdateNotificationTargetForAllNotifications(c context.Context, userID int, targetID string, targetType nModel.NotificationType) error { +func (r *UserRepository) UpdateNotificationTargetForAllNotifications(c context.Context, userID int, targetID string, targetType nModel.NotificationPlatform) error { return r.db.WithContext(c).Model(&nModel.Notification{}).Where("user_id = ?", userID).Update("target_id", targetID).Update("type", targetType).Error } func (r *UserRepository) UpdatePasswordByUserId(c context.Context, userID int, password string) error { diff --git a/main.go b/main.go index a168f39..da51770 100644 --- a/main.go +++ b/main.go @@ -22,6 +22,7 @@ import ( cRepo "donetick.com/core/internal/circle/repo" "donetick.com/core/internal/database" "donetick.com/core/internal/email" + "donetick.com/core/internal/events" label "donetick.com/core/internal/label" lRepo "donetick.com/core/internal/label/repo" "donetick.com/core/internal/resource" @@ -72,6 +73,7 @@ func main() { fx.Provide(pushover.NewPushover), fx.Provide(telegram.NewTelegramNotifier), fx.Provide(notifier.NewNotifier), + fx.Provide(events.NewEventsProducer), // Rate limiter fx.Provide(utils.NewRateLimiter), @@ -123,7 +125,7 @@ func main() { } -func newServer(lc fx.Lifecycle, cfg *config.Config, db *gorm.DB, notifier *notifier.Scheduler) *gin.Engine { +func newServer(lc fx.Lifecycle, cfg *config.Config, db *gorm.DB, notifier *notifier.Scheduler, eventProducer *events.EventsProducer) *gin.Engine { gin.SetMode(gin.DebugMode) // log when http request is made: @@ -157,6 +159,7 @@ func newServer(lc fx.Lifecycle, cfg *config.Config, db *gorm.DB, notifier *notif } } notifier.Start(context.Background()) + eventProducer.Start(context.Background()) go func() { if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("listen: %s\n", err) diff --git a/migrations/20241212_migrate_chat_id_to_notification_target.go b/migrations/20241212_migrate_chat_id_to_notification_target.go index f95b74d..4ccda8b 100644 --- a/migrations/20241212_migrate_chat_id_to_notification_target.go +++ b/migrations/20241212_migrate_chat_id_to_notification_target.go @@ -52,7 +52,7 @@ func (m MigrateChatIdToNotificationTarget20241212) Up(ctx context.Context, db *g notificationTargets = append(notificationTargets, uModel.UserNotificationTarget{ UserID: user.ID, TargetID: fmt.Sprint(user.ChatID), - Type: nModel.NotificationTypeTelegram, + Type: nModel.NotificationPlatformTelegram, }) }