Add Event Producer

Update User to carry webhook from circle if assigned
Refactor notification handling and update models for webhook support
This commit is contained in:
Mo Tarbin 2025-02-09 20:15:28 -05:00
parent 44cb5501dd
commit 04d1894aea
17 changed files with 351 additions and 101 deletions

View file

@ -20,6 +20,7 @@ type Config struct {
EmailConfig EmailConfig `mapstructure:"email" yaml:"email"` EmailConfig EmailConfig `mapstructure:"email" yaml:"email"`
StripeConfig StripeConfig `mapstructure:"stripe" yaml:"stripe"` StripeConfig StripeConfig `mapstructure:"stripe" yaml:"stripe"`
OAuth2Config OAuth2Config `mapstructure:"oauth2" yaml:"oauth2"` OAuth2Config OAuth2Config `mapstructure:"oauth2" yaml:"oauth2"`
WebhookConfig WebhookConfig `mapstructure:"webhook" yaml:"webhook"`
IsDoneTickDotCom bool `mapstructure:"is_done_tick_dot_com" yaml:"is_done_tick_dot_com"` IsDoneTickDotCom bool `mapstructure:"is_done_tick_dot_com" yaml:"is_done_tick_dot_com"`
IsUserCreationDisabled bool `mapstructure:"is_user_creation_disabled" yaml:"is_user_creation_disabled"` IsUserCreationDisabled bool `mapstructure:"is_user_creation_disabled" yaml:"is_user_creation_disabled"`
} }
@ -97,6 +98,11 @@ type OAuth2Config struct {
Name string `mapstructure:"name" yaml:"name"` Name string `mapstructure:"name" yaml:"name"`
} }
type WebhookConfig struct {
Timeout time.Duration `mapstructure:"timeout" yaml:"timeout" default:"5s"`
QueueSize int `mapstructure:"queue_size" yaml:"queue_size" default:"100"`
}
func NewConfig() *Config { func NewConfig() *Config {
return &Config{ return &Config{
Telegram: TelegramConfig{ Telegram: TelegramConfig{

View file

@ -20,16 +20,16 @@ type signIn struct {
Password string `form:"password" json:"password" binding:"required"` Password string `form:"password" json:"password" binding:"required"`
} }
func CurrentUser(c *gin.Context) (*uModel.User, bool) { func CurrentUser(c *gin.Context) (*uModel.UserDetails, bool) {
data, ok := c.Get(identityKey) data, ok := c.Get(identityKey)
if !ok { if !ok {
return nil, false return nil, false
} }
acc, ok := data.(*uModel.User) acc, ok := data.(*uModel.UserDetails)
return acc, ok return acc, ok
} }
func MustCurrentUser(c *gin.Context) *uModel.User { func MustCurrentUser(c *gin.Context) *uModel.UserDetails {
acc, ok := CurrentUser(c) acc, ok := CurrentUser(c)
if ok { if ok {
return acc return acc
@ -45,7 +45,7 @@ func NewAuthMiddleware(cfg *config.Config, userRepo *uRepo.UserRepository) (*jwt
MaxRefresh: cfg.Jwt.MaxRefresh, // 7 days as long as their token is valid they can refresh it MaxRefresh: cfg.Jwt.MaxRefresh, // 7 days as long as their token is valid they can refresh it
IdentityKey: identityKey, IdentityKey: identityKey,
PayloadFunc: func(data interface{}) jwt.MapClaims { PayloadFunc: func(data interface{}) jwt.MapClaims {
if u, ok := data.(*uModel.User); ok { if u, ok := data.(*uModel.UserDetails); ok {
return jwt.MapClaims{ return jwt.MapClaims{
identityKey: u.Username, identityKey: u.Username,
} }
@ -85,7 +85,8 @@ func NewAuthMiddleware(cfg *config.Config, userRepo *uRepo.UserRepository) (*jwt
} }
return nil, jwt.ErrFailedAuthentication return nil, jwt.ErrFailedAuthentication
} }
return &uModel.User{ return &uModel.UserDetails{
User: uModel.User{
ID: user.ID, ID: user.ID,
Username: user.Username, Username: user.Username,
Password: "", Password: "",
@ -94,13 +95,15 @@ func NewAuthMiddleware(cfg *config.Config, userRepo *uRepo.UserRepository) (*jwt
UpdatedAt: user.UpdatedAt, UpdatedAt: user.UpdatedAt,
Disabled: user.Disabled, Disabled: user.Disabled,
CircleID: user.CircleID, CircleID: user.CircleID,
},
WebhookURL: user.WebhookURL,
}, nil }, nil
case "3rdPartyAuth": case "3rdPartyAuth":
// we should only reach this stage if a handler mannually call authenticator with it's context: // we should only reach this stage if a handler mannually call authenticator with it's context:
var authObject *uModel.User var authObject *uModel.UserDetails
v := c.Value("user_account") v := c.Value("user_account")
authObject = v.(*uModel.User) authObject = v.(*uModel.UserDetails)
return authObject, nil return authObject, nil
@ -111,7 +114,7 @@ func NewAuthMiddleware(cfg *config.Config, userRepo *uRepo.UserRepository) (*jwt
Authorizator: func(data interface{}, c *gin.Context) bool { Authorizator: func(data interface{}, c *gin.Context) bool {
if _, ok := data.(*uModel.User); ok { if _, ok := data.(*uModel.UserDetails); ok {
return true return true
} }
return false return false

View file

@ -15,6 +15,7 @@ import (
chModel "donetick.com/core/internal/chore/model" chModel "donetick.com/core/internal/chore/model"
chRepo "donetick.com/core/internal/chore/repo" chRepo "donetick.com/core/internal/chore/repo"
cRepo "donetick.com/core/internal/circle/repo" cRepo "donetick.com/core/internal/circle/repo"
"donetick.com/core/internal/events"
lRepo "donetick.com/core/internal/label/repo" lRepo "donetick.com/core/internal/label/repo"
"donetick.com/core/internal/notifier" "donetick.com/core/internal/notifier"
nRepo "donetick.com/core/internal/notifier/repo" nRepo "donetick.com/core/internal/notifier/repo"
@ -34,10 +35,12 @@ type Handler struct {
nRepo *nRepo.NotificationRepository nRepo *nRepo.NotificationRepository
tRepo *tRepo.ThingRepository tRepo *tRepo.ThingRepository
lRepo *lRepo.LabelRepository lRepo *lRepo.LabelRepository
eventProducer *events.EventsProducer
} }
func NewHandler(cr *chRepo.ChoreRepository, circleRepo *cRepo.CircleRepository, nt *notifier.Notifier, func NewHandler(cr *chRepo.ChoreRepository, circleRepo *cRepo.CircleRepository, nt *notifier.Notifier,
np *nps.NotificationPlanner, nRepo *nRepo.NotificationRepository, tRepo *tRepo.ThingRepository, lRepo *lRepo.LabelRepository) *Handler { np *nps.NotificationPlanner, nRepo *nRepo.NotificationRepository, tRepo *tRepo.ThingRepository, lRepo *lRepo.LabelRepository,
ep *events.EventsProducer) *Handler {
return &Handler{ return &Handler{
choreRepo: cr, choreRepo: cr,
circleRepo: circleRepo, circleRepo: circleRepo,
@ -46,6 +49,7 @@ func NewHandler(cr *chRepo.ChoreRepository, circleRepo *cRepo.CircleRepository,
nRepo: nRepo, nRepo: nRepo,
tRepo: tRepo, tRepo: tRepo,
lRepo: lRepo, lRepo: lRepo,
eventProducer: ep,
} }
} }
@ -294,7 +298,7 @@ func (h *Handler) createChore(c *gin.Context) {
go func() { go func() {
h.nPlanner.GenerateNotifications(c, createdChore) h.nPlanner.GenerateNotifications(c, createdChore)
}() }()
shouldReturn := HandleThingAssociation(choreReq, h, c, currentUser) shouldReturn := HandleThingAssociation(choreReq, h, c, &currentUser.User)
if shouldReturn { if shouldReturn {
return return
} }
@ -551,7 +555,7 @@ func (h *Handler) editChore(c *gin.Context) {
h.tRepo.DissociateThingWithChore(c, oldChore.ThingChore.ThingID, oldChore.ID) h.tRepo.DissociateThingWithChore(c, oldChore.ThingChore.ThingID, oldChore.ID)
} }
shouldReturn := HandleThingAssociation(choreReq, h, c, currentUser) shouldReturn := HandleThingAssociation(choreReq, h, c, &currentUser.User)
if shouldReturn { if shouldReturn {
return return
} }
@ -818,7 +822,7 @@ func (h *Handler) skipChore(c *gin.Context) {
}) })
return return
} }
h.eventProducer.ChoreSkipped(c, currentUser.WebhookURL, updatedChore, &currentUser.User)
c.JSON(200, gin.H{ c.JSON(200, gin.H{
"res": updatedChore, "res": updatedChore,
}) })
@ -1069,7 +1073,7 @@ func (h *Handler) completeChore(c *gin.Context) {
// h.notifier.SendChoreCompletion(c, chore, currentUser) // h.notifier.SendChoreCompletion(c, chore, currentUser)
// }() // }()
h.nPlanner.GenerateNotifications(c, updatedChore) h.nPlanner.GenerateNotifications(c, updatedChore)
h.eventProducer.ChoreCompleted(c, currentUser.WebhookURL, chore, &currentUser.User)
c.JSON(200, gin.H{ c.JSON(200, gin.H{
"res": updatedChore, "res": updatedChore,
}) })

View file

@ -143,7 +143,7 @@ func (h *Handler) LeaveCircle(c *gin.Context) {
// START : HANDLE USER LEAVING CIRCLE // START : HANDLE USER LEAVING CIRCLE
// bulk update chores: // bulk update chores:
if err := handleUserLeavingCircle(h, c, currentUser, orginalCircleID); err != nil { if err := handleUserLeavingCircle(h, c, &currentUser.User, orginalCircleID); err != nil {
log.Error("Error handling user leaving circle:", err) log.Error("Error handling user leaving circle:", err)
c.JSON(500, gin.H{ c.JSON(500, gin.H{
"error": "Error handling user leaving circle", "error": "Error handling user leaving circle",

View file

@ -14,6 +14,7 @@ type Circle struct {
UpdatedAt time.Time `json:"updated_at" gorm:"column:updated_at"` // Updated at UpdatedAt time.Time `json:"updated_at" gorm:"column:updated_at"` // Updated at
InviteCode string `json:"invite_code" gorm:"column:invite_code"` // Invite code InviteCode string `json:"invite_code" gorm:"column:invite_code"` // Invite code
Disabled bool `json:"disabled" gorm:"column:disabled"` // Disabled Disabled bool `json:"disabled" gorm:"column:disabled"` // Disabled
WebhookURL *string `json:"-" gorm:"column:webhook_url"` // Webhook URL
} }
type CircleDetail struct { type CircleDetail struct {
@ -37,6 +38,6 @@ type UserCircleDetail struct {
UserCircle UserCircle
Username string `json:"-" gorm:"column:username"` Username string `json:"-" gorm:"column:username"`
DisplayName string `json:"displayName" gorm:"column:display_name"` DisplayName string `json:"displayName" gorm:"column:display_name"`
NotificationType nModel.NotificationType `json:"-" gorm:"column:notification_type"` NotificationType nModel.NotificationPlatform `json:"-" gorm:"column:notification_type"`
TargetID string `json:"-" gorm:"column:target_id"` // Target ID TargetID string `json:"-" gorm:"column:target_id"` // Target ID
} }

164
internal/events/producer.go Normal file
View file

@ -0,0 +1,164 @@
package events
import (
"bytes"
"context"
"encoding/json"
"log"
"net/http"
"time"
"donetick.com/core/config"
chModel "donetick.com/core/internal/chore/model"
uModel "donetick.com/core/internal/user/model"
"donetick.com/core/logging"
"go.uber.org/zap"
)
const (
METHOD_POST = "POST"
HEAD_CONTENT_TYPE = "Content-Type"
CONTENT_TYPE_JSON = "application/json"
)
type EventType string
const (
EventTypeUnknown EventType = ""
EventTypeChoreCreated EventType = "CREATED"
EventTypeChoreReminder EventType = "REMINDER"
EventTypeChoreUpdated EventType = "UPDATED"
EventTypeChoreCompleted EventType = "COMPLETED"
EventTypeChoreReassigned EventType = "REASSIGNED"
EventTypeChoreSkipped EventType = "SKIPPED"
)
type Event struct {
Type EventType `json:"type"`
URL string `json:"-"`
Timestamp time.Time `json:"timestamp"`
Data interface{} `json:"data"`
}
type ChoreData struct {
Chore *chModel.Chore `json:"chore"`
Username string `json:"username"`
DisplayName string `json:"display_name"`
Note string `json:"note"`
}
type EventsProducer struct {
client *http.Client
queue chan Event
logger *zap.SugaredLogger
}
func (p *EventsProducer) Start(ctx context.Context) {
p.logger = logging.FromContext(ctx)
go func() {
for event := range p.queue {
p.processEvent(event)
}
}()
}
func NewEventsProducer(cfg *config.Config) *EventsProducer {
return &EventsProducer{
client: &http.Client{
Timeout: cfg.WebhookConfig.Timeout,
},
queue: make(chan Event, cfg.WebhookConfig.QueueSize),
}
}
func (p *EventsProducer) publishEvent(event Event) {
select {
case p.queue <- event:
// Successfully added to queue
default:
log.Println("Webhook queue is full, dropping event")
}
}
func (p *EventsProducer) processEvent(event Event) {
p.logger.Debugw("Sending webhook event", "type", event.Type, "url", event.URL)
eventJSON, err := json.Marshal(event)
if err != nil {
p.logger.Errorw("Failed to marshal webhook event", "error", err)
return
}
// Pring the event and the url:
p.logger.Debug("Sending event to webhook", "url", event.URL, "event", event)
p.logger.Debug("Event: ", event)
req, err := http.NewRequest(METHOD_POST, event.URL, bytes.NewBuffer(eventJSON))
if err != nil {
p.logger.Errorw("Failed to create webhook request", "error", err)
return
}
req.Header.Set(HEAD_CONTENT_TYPE, CONTENT_TYPE_JSON)
resp, err := p.client.Do(req)
if err != nil {
p.logger.Errorw("Failed to send webhook event", "error", err)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
p.logger.Errorw("Webhook request failed", "status", resp.StatusCode)
return
}
}
func (p *EventsProducer) ChoreCompleted(ctx context.Context, webhookURL *string, chore *chModel.Chore, performer *uModel.User) {
if webhookURL == nil {
p.logger.Debug("No subscribers for circle, skipping webhook")
return
}
event := Event{
Type: EventTypeChoreCompleted,
URL: *webhookURL,
Timestamp: time.Now(),
Data: ChoreData{Chore: chore,
Username: performer.Username,
DisplayName: performer.DisplayName,
},
}
p.publishEvent(event)
}
func (p *EventsProducer) ChoreSkipped(ctx context.Context, webhookURL *string, chore *chModel.Chore, performer *uModel.User) {
if webhookURL == nil {
p.logger.Debug("No Webhook URL for circle, skipping webhook")
return
}
event := Event{
Type: EventTypeChoreSkipped,
URL: *webhookURL,
Timestamp: time.Now(),
Data: ChoreData{Chore: chore,
Username: performer.Username,
DisplayName: performer.DisplayName,
},
}
p.publishEvent(event)
}
func (p *EventsProducer) NotificaitonEvent(ctx context.Context, url string, event interface{}) {
// print the event and the url :
p.logger.Debug("Sending notification event")
p.publishEvent(Event{
URL: url,
Type: EventTypeChoreReminder,
Timestamp: time.Now(),
Data: event,
})
}

View file

@ -5,33 +5,30 @@ import "time"
type Notification struct { type Notification struct {
ID int `json:"id" gorm:"primaryKey"` ID int `json:"id" gorm:"primaryKey"`
ChoreID int `json:"chore_id" gorm:"column:chore_id"` ChoreID int `json:"chore_id" gorm:"column:chore_id"`
CircleID int `json:"circle_id" gorm:"column:circle_id"`
UserID int `json:"user_id" gorm:"column:user_id"` UserID int `json:"user_id" gorm:"column:user_id"`
TargetID string `json:"target_id" gorm:"column:target_id"` TargetID string `json:"target_id" gorm:"column:target_id"`
Text string `json:"text" gorm:"column:text"` Text string `json:"text" gorm:"column:text"`
IsSent bool `json:"is_sent" gorm:"column:is_sent;index;default:false"` IsSent bool `json:"is_sent" gorm:"column:is_sent;index;default:false"`
TypeID NotificationType `json:"type" gorm:"column:type"` TypeID NotificationPlatform `json:"type" gorm:"column:type"`
ScheduledFor time.Time `json:"scheduled_for" gorm:"column:scheduled_for;index"` ScheduledFor time.Time `json:"scheduled_for" gorm:"column:scheduled_for;index"`
CreatedAt time.Time `json:"created_at" gorm:"column:created_at"` CreatedAt time.Time `json:"created_at" gorm:"column:created_at"`
RawEvent interface{} `json:"raw_event" gorm:"column:raw_event;type:jsonb"`
}
type NotificationDetails struct {
Notification
WebhookURL *string `json:"webhook_url" gorm:"column:webhook_url;<-:null"` // read-only, will only be used if webhook enabled
} }
func (n *Notification) IsValid() bool { func (n *Notification) IsValid() bool {
switch n.TypeID {
case NotificationTypeTelegram, NotificationTypePushover:
if n.TargetID == "" {
return false
} else if n.Text == "0" {
return false
}
return true return true
default:
return false
}
} }
type NotificationType int8 type NotificationPlatform int8
const ( const (
NotificationTypeNone NotificationType = iota NotificationPlatformNone NotificationPlatform = iota
NotificationTypeTelegram NotificationPlatformTelegram
NotificationTypePushover NotificationPlatformPushover
) )

View file

@ -3,39 +3,52 @@ package notifier
import ( import (
"context" "context"
"donetick.com/core/internal/events"
nModel "donetick.com/core/internal/notifier/model" nModel "donetick.com/core/internal/notifier/model"
pushover "donetick.com/core/internal/notifier/service/pushover" pushover "donetick.com/core/internal/notifier/service/pushover"
telegram "donetick.com/core/internal/notifier/service/telegram" telegram "donetick.com/core/internal/notifier/service/telegram"
"donetick.com/core/logging" "donetick.com/core/logging"
) )
type Notifier struct { type Notifier struct {
Telegram *telegram.TelegramNotifier Telegram *telegram.TelegramNotifier
Pushover *pushover.Pushover Pushover *pushover.Pushover
eventsProducer *events.EventsProducer
} }
func NewNotifier(t *telegram.TelegramNotifier, p *pushover.Pushover) *Notifier { func NewNotifier(t *telegram.TelegramNotifier, p *pushover.Pushover, ep *events.EventsProducer) *Notifier {
return &Notifier{ return &Notifier{
Telegram: t, Telegram: t,
Pushover: p, Pushover: p,
eventsProducer: ep,
} }
} }
func (n *Notifier) SendNotification(c context.Context, notification *nModel.Notification) error { func (n *Notifier) SendNotification(c context.Context, notification *nModel.NotificationDetails) error {
log := logging.FromContext(c) log := logging.FromContext(c)
var err error
switch notification.TypeID { switch notification.TypeID {
case nModel.NotificationTypeTelegram: case nModel.NotificationPlatformTelegram:
if n.Telegram == nil { if n.Telegram == nil {
log.Error("Telegram bot is not initialized, Skipping sending message") log.Error("Telegram bot is not initialized, Skipping sending message")
return nil return nil
} }
return n.Telegram.SendNotification(c, notification) err = n.Telegram.SendNotification(c, notification)
case nModel.NotificationTypePushover: case nModel.NotificationPlatformPushover:
if n.Pushover == nil { if n.Pushover == nil {
log.Error("Pushover is not initialized, Skipping sending message") log.Error("Pushover is not initialized, Skipping sending message")
return nil return nil
} }
return n.Pushover.SendNotification(c, notification) err = n.Pushover.SendNotification(c, notification)
} }
if err != nil {
log.Error("Failed to send notification", "err", err)
}
if notification.RawEvent != nil && notification.WebhookURL != nil {
// if we have a webhook url, we should send the event to the webhook
n.eventsProducer.NotificaitonEvent(c, *notification.WebhookURL, notification.RawEvent)
}
return nil return nil
} }

View file

@ -23,7 +23,7 @@ func (r *NotificationRepository) DeleteAllChoreNotifications(choreID int) error
func (r *NotificationRepository) BatchInsertNotifications(notifications []*nModel.Notification) error { func (r *NotificationRepository) BatchInsertNotifications(notifications []*nModel.Notification) error {
return r.db.Create(&notifications).Error return r.db.Create(&notifications).Error
} }
func (r *NotificationRepository) MarkNotificationsAsSent(notifications []*nModel.Notification) error { func (r *NotificationRepository) MarkNotificationsAsSent(notifications []*nModel.NotificationDetails) error {
// Extract IDs from notifications // Extract IDs from notifications
var ids []int var ids []int
for _, notification := range notifications { for _, notification := range notifications {
@ -32,11 +32,15 @@ func (r *NotificationRepository) MarkNotificationsAsSent(notifications []*nModel
// Use the extracted IDs in the Where clause // Use the extracted IDs in the Where clause
return r.db.Model(&nModel.Notification{}).Where("id IN (?)", ids).Update("is_sent", true).Error return r.db.Model(&nModel.Notification{}).Where("id IN (?)", ids).Update("is_sent", true).Error
} }
func (r *NotificationRepository) GetPendingNotificaiton(c context.Context, lookback time.Duration) ([]*nModel.Notification, error) { func (r *NotificationRepository) GetPendingNotificaiton(c context.Context, lookback time.Duration) ([]*nModel.NotificationDetails, error) {
var notifications []*nModel.Notification var notifications []*nModel.NotificationDetails
start := time.Now().UTC().Add(-lookback) start := time.Now().UTC().Add(-lookback)
end := time.Now().UTC() end := time.Now().UTC()
if err := r.db.Where("is_sent = ? AND scheduled_for < ? AND scheduled_for > ?", false, end, start).Find(&notifications).Error; err != nil { if err := r.db.Table("notifications").
Select("notifications.*, circles.webhook_url as webhook_url").
Joins("left join circles on circles.id = notifications.circle_id").
Where("notifications.is_sent = ? AND notifications.scheduled_for < ? AND notifications.scheduled_for > ?", false, end, start).
Find(&notifications).Error; err != nil {
return nil, err return nil, err
} }
return notifications, nil return notifications, nil

View file

@ -65,7 +65,7 @@ func (n *NotificationPlanner) GenerateNotifications(c context.Context, chore *ch
if mt.CircleGroup { if mt.CircleGroup {
notifications = append(notifications, generateCircleGroupNotifications(chore, mt)...) notifications = append(notifications, generateCircleGroupNotifications(chore, mt)...)
} }
log.Debug("Generated notifications", "count", len(notifications))
n.nRepo.BatchInsertNotifications(notifications) n.nRepo.BatchInsertNotifications(notifications)
return true return true
} }
@ -89,6 +89,13 @@ func generateDueNotifications(chore *chModel.Chore, users []*cModel.UserCircleDe
UserID: user.ID, UserID: user.ID,
TargetID: user.TargetID, TargetID: user.TargetID,
Text: fmt.Sprintf("📅 Reminder: *%s* is due today and assigned to %s.", chore.Name, assignee.DisplayName), Text: fmt.Sprintf("📅 Reminder: *%s* is due today and assigned to %s.", chore.Name, assignee.DisplayName),
RawEvent: map[string]interface{}{
"id": chore.ID,
"name": chore.Name,
"due_date": chore.NextDueDate.Format("January 2nd"),
"assignee": assignee.DisplayName,
"assignee_username": assignee.Username,
},
} }
if notification.IsValid() { if notification.IsValid() {
notifications = append(notifications, notification) notifications = append(notifications, notification)
@ -117,6 +124,13 @@ func generatePreDueNotifications(chore *chModel.Chore, users []*cModel.UserCircl
UserID: user.ID, UserID: user.ID,
TargetID: user.TargetID, TargetID: user.TargetID,
Text: fmt.Sprintf("📢 Heads up! *%s* is due soon (on %s) and assigned to %s.", chore.Name, chore.NextDueDate.Format("January 2nd"), assignee.DisplayName), Text: fmt.Sprintf("📢 Heads up! *%s* is due soon (on %s) and assigned to %s.", chore.Name, chore.NextDueDate.Format("January 2nd"), assignee.DisplayName),
RawEvent: map[string]interface{}{
"id": chore.ID,
"name": chore.Name,
"due_date": chore.NextDueDate.Format("January 2nd"),
"assignee": assignee.DisplayName,
"assignee_username": assignee.Username,
},
} }
if notification.IsValid() { if notification.IsValid() {
notifications = append(notifications, notification) notifications = append(notifications, notification)
@ -148,6 +162,14 @@ func generateOverdueNotifications(chore *chModel.Chore, users []*cModel.UserCirc
UserID: user.ID, UserID: user.ID,
TargetID: fmt.Sprint(user.TargetID), TargetID: fmt.Sprint(user.TargetID),
Text: fmt.Sprintf("🚨 *%s* is now %d hours overdue. Please complete it as soon as possible. (Assigned to %s)", chore.Name, hours, assignee.DisplayName), Text: fmt.Sprintf("🚨 *%s* is now %d hours overdue. Please complete it as soon as possible. (Assigned to %s)", chore.Name, hours, assignee.DisplayName),
RawEvent: map[string]interface{}{
"id": chore.ID,
"type": EventTypeOverdue,
"name": chore.Name,
"due_date": chore.NextDueDate.Format("January 2nd"),
"assignee": assignee.DisplayName,
"assignee_username": assignee.Username,
},
} }
if notification.IsValid() { if notification.IsValid() {
notifications = append(notifications, notification) notifications = append(notifications, notification)
@ -173,6 +195,12 @@ func generateCircleGroupNotifications(chore *chModel.Chore, mt *chModel.Notifica
TypeID: 1, TypeID: 1,
TargetID: fmt.Sprint(*mt.CircleGroupID), TargetID: fmt.Sprint(*mt.CircleGroupID),
Text: fmt.Sprintf("📅 Reminder: *%s* is due today.", chore.Name), Text: fmt.Sprintf("📅 Reminder: *%s* is due today.", chore.Name),
RawEvent: map[string]interface{}{
"id": chore.ID,
"type": EventTypeDue,
"name": chore.Name,
"due_date": chore.NextDueDate.Format("January 2nd"),
},
} }
if notification.IsValid() { if notification.IsValid() {
notifications = append(notifications, notification) notifications = append(notifications, notification)
@ -188,6 +216,12 @@ func generateCircleGroupNotifications(chore *chModel.Chore, mt *chModel.Notifica
TypeID: 3, TypeID: 3,
TargetID: fmt.Sprint(*mt.CircleGroupID), TargetID: fmt.Sprint(*mt.CircleGroupID),
Text: fmt.Sprintf("📢 Heads up! *%s* is due soon (on %s).", chore.Name, chore.NextDueDate.Format("January 2nd")), Text: fmt.Sprintf("📢 Heads up! *%s* is due soon (on %s).", chore.Name, chore.NextDueDate.Format("January 2nd")),
RawEvent: map[string]interface{}{
"id": chore.ID,
"type": EventTypePreDue,
"name": chore.Name,
"due_date": chore.NextDueDate.Format("January 2nd"),
},
} }
if notification.IsValid() { if notification.IsValid() {
notifications = append(notifications, notification) notifications = append(notifications, notification)
@ -205,6 +239,12 @@ func generateCircleGroupNotifications(chore *chModel.Chore, mt *chModel.Notifica
TypeID: 2, TypeID: 2,
TargetID: fmt.Sprint(*mt.CircleGroupID), TargetID: fmt.Sprint(*mt.CircleGroupID),
Text: fmt.Sprintf("🚨 *%s* is now %d hours overdue. Please complete it as soon as possible.", chore.Name, hours), Text: fmt.Sprintf("🚨 *%s* is now %d hours overdue. Please complete it as soon as possible.", chore.Name, hours),
RawEvent: map[string]interface{}{
"id": chore.ID,
"type": EventTypeOverdue,
"name": chore.Name,
"due_date": chore.NextDueDate.Format("January 2nd"),
},
} }
if notification.IsValid() { if notification.IsValid() {
notifications = append(notifications, notification) notifications = append(notifications, notification)
@ -214,3 +254,12 @@ func generateCircleGroupNotifications(chore *chModel.Chore, mt *chModel.Notifica
return notifications return notifications
} }
type EventType string
const (
EventTypeUnknown EventType = "unknown"
EventTypeDue EventType = "due"
EventTypePreDue EventType = "pre_due"
EventTypeOverdue EventType = "overdue"
)

View file

@ -2,6 +2,7 @@ package pushover
import ( import (
"context" "context"
"errors"
"donetick.com/core/config" "donetick.com/core/config"
nModel "donetick.com/core/internal/notifier/model" nModel "donetick.com/core/internal/notifier/model"
@ -22,7 +23,10 @@ func NewPushover(cfg *config.Config) *Pushover {
} }
} }
func (p *Pushover) SendNotification(c context.Context, notification *nModel.Notification) error { func (p *Pushover) SendNotification(c context.Context, notification *nModel.NotificationDetails) error {
if notification.TargetID == "" {
return errors.New("unable to send notification, targetID is empty")
}
log := logging.FromContext(c) log := logging.FromContext(c)
recipient := pushover.NewRecipient(notification.TargetID) recipient := pushover.NewRecipient(notification.TargetID)
message := pushover.NewMessageWithTitle(notification.Text, "Donetick") message := pushover.NewMessageWithTitle(notification.Text, "Donetick")

View file

@ -70,12 +70,10 @@ func (tn *TelegramNotifier) SendChoreCompletion(c context.Context, chore *chMode
} }
func (tn *TelegramNotifier) SendNotification(c context.Context, notification *nModel.Notification) error { func (tn *TelegramNotifier) SendNotification(c context.Context, notification *nModel.NotificationDetails) error {
log := logging.FromContext(c) log := logging.FromContext(c)
if notification.TargetID == "" { if notification.TargetID == "" {
log.Error("Notification target ID is empty") return errors.New("unable to send notification, targetID is empty")
return errors.New("Notification target ID is empty")
} }
chatID, err := strconv.ParseInt(notification.TargetID, 10, 64) chatID, err := strconv.ParseInt(notification.TargetID, 10, 64)
if err != nil { if err != nil {

View file

@ -518,7 +518,7 @@ func (h *Handler) UpdateUserDetails(c *gin.Context) {
user.Image = *req.Image user.Image = *req.Image
} }
if err := h.userRepo.UpdateUser(c, user); err != nil { if err := h.userRepo.UpdateUser(c, &user.User); err != nil {
c.JSON(500, gin.H{ c.JSON(500, gin.H{
"error": "Error updating user", "error": "Error updating user",
}) })
@ -608,7 +608,7 @@ func (h *Handler) UpdateNotificationTarget(c *gin.Context) {
} }
type Request struct { type Request struct {
Type nModel.NotificationType `json:"type"` Type nModel.NotificationPlatform `json:"type"`
Target string `json:"target"` Target string `json:"target"`
} }
@ -617,7 +617,7 @@ func (h *Handler) UpdateNotificationTarget(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request"}) c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request"})
return return
} }
if req.Type == nModel.NotificationTypeNone { if req.Type == nModel.NotificationPlatformNone {
err := h.userRepo.DeleteNotificationTarget(c, currentUser.ID) err := h.userRepo.DeleteNotificationTarget(c, currentUser.ID)
if err != nil { if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to delete notification target"}) c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to delete notification target"})

View file

@ -25,6 +25,10 @@ type User struct {
Expiration *string `json:"expiration" gorm:"column:expiration;<-:false"` // read only column Expiration *string `json:"expiration" gorm:"column:expiration;<-:false"` // read only column
UserNotificationTargets UserNotificationTarget `json:"notification_target" gorm:"foreignKey:UserID;references:ID"` UserNotificationTargets UserNotificationTarget `json:"notification_target" gorm:"foreignKey:UserID;references:ID"`
} }
type UserDetails struct {
User
WebhookURL *string `json:"webhookURL" gorm:"column:webhook_url;<-:false"` // read only column
}
type UserPasswordReset struct { type UserPasswordReset struct {
ID int `gorm:"column:id"` ID int `gorm:"column:id"`
@ -44,7 +48,7 @@ type APIToken struct {
type UserNotificationTarget struct { type UserNotificationTarget struct {
UserID int `json:"userId" gorm:"column:user_id;index;primaryKey"` // Index on userID UserID int `json:"userId" gorm:"column:user_id;index;primaryKey"` // Index on userID
Type nModel.NotificationType `json:"type" gorm:"column:type"` // Type Type nModel.NotificationPlatform `json:"type" gorm:"column:type"` // Type
TargetID string `json:"target_id" gorm:"column:target_id"` // Target ID TargetID string `json:"target_id" gorm:"column:target_id"` // Target ID
CreatedAt time.Time `json:"-" gorm:"column:created_at"` CreatedAt time.Time `json:"-" gorm:"column:created_at"`
} }

View file

@ -13,7 +13,7 @@ import (
) )
type IUserRepository interface { type IUserRepository interface {
GetUserByUsername(username string) (*uModel.User, error) GetUserByUsername(username string) (*uModel.UserDetails, error)
GetUser(id int) (*uModel.User, error) GetUser(id int) (*uModel.User, error)
GetAllUsers() ([]*uModel.User, error) GetAllUsers() ([]*uModel.User, error)
CreateUser(user *uModel.User) error CreateUser(user *uModel.User) error
@ -52,14 +52,14 @@ func (r *UserRepository) CreateUser(c context.Context, user *uModel.User) (*uMod
} }
return user, nil return user, nil
} }
func (r *UserRepository) GetUserByUsername(c context.Context, username string) (*uModel.User, error) { func (r *UserRepository) GetUserByUsername(c context.Context, username string) (*uModel.UserDetails, error) {
var user *uModel.User var user *uModel.UserDetails
if r.isDonetickDotCom { if r.isDonetickDotCom {
if err := r.db.WithContext(c).Preload("UserNotificationTargets").Table("users u").Select("u.*, ss.status as subscription, ss.expired_at as expiration").Joins("left join stripe_customers sc on sc.user_id = u.id ").Joins("left join stripe_subscriptions ss on sc.customer_id = ss.customer_id").Where("username = ?", username).First(&user).Error; err != nil { if err := r.db.WithContext(c).Preload("UserNotificationTargets").Table("users u").Select("u.*, ss.status as subscription, ss.expired_at as expiration, c.webhook_url as webhook_url").Joins("left join stripe_customers sc on sc.user_id = u.id ").Joins("left join stripe_subscriptions ss on sc.customer_id = ss.customer_id").Joins("left join circles c on c.id = u.circle_id").Where("username = ?", username).First(&user).Error; err != nil {
return nil, err return nil, err
} }
} else { } else {
if err := r.db.WithContext(c).Preload("UserNotificationTargets").Table("users u").Select("u.*, 'active' as subscription, '2999-12-31' as expiration").Where("username = ?", username).First(&user).Error; err != nil { if err := r.db.WithContext(c).Preload("UserNotificationTargets").Table("users u").Select("u.*, 'active' as subscription, '2999-12-31' as expiration, c.webhook_url as webhook_url").Joins("left join circles c on c.id = u.circle_id").Where("username = ?", username).First(&user).Error; err != nil {
return nil, err return nil, err
} }
} }
@ -160,7 +160,7 @@ func (r *UserRepository) DeleteAPIToken(c context.Context, userID int, tokenID s
return r.db.WithContext(c).Where("id = ? AND user_id = ?", tokenID, userID).Delete(&uModel.APIToken{}).Error return r.db.WithContext(c).Where("id = ? AND user_id = ?", tokenID, userID).Delete(&uModel.APIToken{}).Error
} }
func (r *UserRepository) UpdateNotificationTarget(c context.Context, userID int, targetID string, targetType nModel.NotificationType) error { func (r *UserRepository) UpdateNotificationTarget(c context.Context, userID int, targetID string, targetType nModel.NotificationPlatform) error {
return r.db.WithContext(c).Save(&uModel.UserNotificationTarget{ return r.db.WithContext(c).Save(&uModel.UserNotificationTarget{
UserID: userID, UserID: userID,
TargetID: targetID, TargetID: targetID,
@ -173,7 +173,7 @@ func (r *UserRepository) DeleteNotificationTarget(c context.Context, userID int)
return r.db.WithContext(c).Where("user_id = ?", userID).Delete(&uModel.UserNotificationTarget{}).Error return r.db.WithContext(c).Where("user_id = ?", userID).Delete(&uModel.UserNotificationTarget{}).Error
} }
func (r *UserRepository) UpdateNotificationTargetForAllNotifications(c context.Context, userID int, targetID string, targetType nModel.NotificationType) error { func (r *UserRepository) UpdateNotificationTargetForAllNotifications(c context.Context, userID int, targetID string, targetType nModel.NotificationPlatform) error {
return r.db.WithContext(c).Model(&nModel.Notification{}).Where("user_id = ?", userID).Update("target_id", targetID).Update("type", targetType).Error return r.db.WithContext(c).Model(&nModel.Notification{}).Where("user_id = ?", userID).Update("target_id", targetID).Update("type", targetType).Error
} }
func (r *UserRepository) UpdatePasswordByUserId(c context.Context, userID int, password string) error { func (r *UserRepository) UpdatePasswordByUserId(c context.Context, userID int, password string) error {

View file

@ -22,6 +22,7 @@ import (
cRepo "donetick.com/core/internal/circle/repo" cRepo "donetick.com/core/internal/circle/repo"
"donetick.com/core/internal/database" "donetick.com/core/internal/database"
"donetick.com/core/internal/email" "donetick.com/core/internal/email"
"donetick.com/core/internal/events"
label "donetick.com/core/internal/label" label "donetick.com/core/internal/label"
lRepo "donetick.com/core/internal/label/repo" lRepo "donetick.com/core/internal/label/repo"
"donetick.com/core/internal/resource" "donetick.com/core/internal/resource"
@ -72,6 +73,7 @@ func main() {
fx.Provide(pushover.NewPushover), fx.Provide(pushover.NewPushover),
fx.Provide(telegram.NewTelegramNotifier), fx.Provide(telegram.NewTelegramNotifier),
fx.Provide(notifier.NewNotifier), fx.Provide(notifier.NewNotifier),
fx.Provide(events.NewEventsProducer),
// Rate limiter // Rate limiter
fx.Provide(utils.NewRateLimiter), fx.Provide(utils.NewRateLimiter),
@ -123,7 +125,7 @@ func main() {
} }
func newServer(lc fx.Lifecycle, cfg *config.Config, db *gorm.DB, notifier *notifier.Scheduler) *gin.Engine { func newServer(lc fx.Lifecycle, cfg *config.Config, db *gorm.DB, notifier *notifier.Scheduler, eventProducer *events.EventsProducer) *gin.Engine {
gin.SetMode(gin.DebugMode) gin.SetMode(gin.DebugMode)
// log when http request is made: // log when http request is made:
@ -157,6 +159,7 @@ func newServer(lc fx.Lifecycle, cfg *config.Config, db *gorm.DB, notifier *notif
} }
} }
notifier.Start(context.Background()) notifier.Start(context.Background())
eventProducer.Start(context.Background())
go func() { go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("listen: %s\n", err) log.Fatalf("listen: %s\n", err)

View file

@ -52,7 +52,7 @@ func (m MigrateChatIdToNotificationTarget20241212) Up(ctx context.Context, db *g
notificationTargets = append(notificationTargets, uModel.UserNotificationTarget{ notificationTargets = append(notificationTargets, uModel.UserNotificationTarget{
UserID: user.ID, UserID: user.ID,
TargetID: fmt.Sprint(user.ChatID), TargetID: fmt.Sprint(user.ChatID),
Type: nModel.NotificationTypeTelegram, Type: nModel.NotificationPlatformTelegram,
}) })
} }