diff --git a/internal/service/notification/external_notification.go b/internal/service/notification/external_notification.go index 425a8c2bb..5282cab0f 100644 --- a/internal/service/notification/external_notification.go +++ b/internal/service/notification/external_notification.go @@ -45,6 +45,7 @@ type ExternalNotificationService struct { notificationQueueService noticequeue.ExternalService userExternalLoginRepo user_external_login.UserExternalLoginRepo siteInfoService siteinfo_common.SiteInfoCommonService + newQuestionEmailWorker *newQuestionEmailWorker } func NewExternalNotificationService( @@ -67,6 +68,10 @@ func NewExternalNotificationService( userExternalLoginRepo: userExternalLoginRepo, siteInfoService: siteInfoService, } + n.newQuestionEmailWorker = newQuestionEmailWorkerWithDefaults( + newQuestionNotificationEmailSendInterval, + n.sendNewQuestionNotificationEmail, + ) notificationQueueService.RegisterHandler(n.Handler) return n } diff --git a/internal/service/notification/new_question_email_worker.go b/internal/service/notification/new_question_email_worker.go new file mode 100644 index 000000000..be3e6504d --- /dev/null +++ b/internal/service/notification/new_question_email_worker.go @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package notification + +import ( + "context" + "sync" + "time" + + "github.com/apache/answer/internal/schema" + "github.com/apache/answer/pkg/token" + "github.com/segmentfault/pacman/log" +) + +const newQuestionEmailWorkerQueueSize = 128 + +type newQuestionEmailTask struct { + UserIDs []string + QuestionTitle string + QuestionID string + Tags []string + TagIDs []string +} + +type newQuestionEmailIntervalProvider func() time.Duration + +type newQuestionEmailTimer interface { + C() <-chan time.Time + Stop() +} + +type newQuestionEmailTimerFactory func(time.Duration) newQuestionEmailTimer + +type newQuestionEmailWorker struct { + tasks chan newQuestionEmailTask + send newQuestionNotificationEmailSender + interval newQuestionEmailIntervalProvider + timerFactory newQuestionEmailTimerFactory + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + closed bool + wg sync.WaitGroup +} + +func newQuestionEmailWorkerWithDefaults( + interval newQuestionEmailIntervalProvider, + send newQuestionNotificationEmailSender, +) *newQuestionEmailWorker { + return newQuestionEmailWorkerWithBuffer( + interval, + send, + newRealNewQuestionEmailTimer, + newQuestionEmailWorkerQueueSize, + ) +} + +func newQuestionEmailWorkerWithBuffer( + interval newQuestionEmailIntervalProvider, + send newQuestionNotificationEmailSender, + timerFactory newQuestionEmailTimerFactory, + bufferSize int, +) *newQuestionEmailWorker { + if interval == nil { + interval = newQuestionNotificationEmailSendInterval + } + if timerFactory == nil { + timerFactory = newRealNewQuestionEmailTimer + } + ctx, cancel := context.WithCancel(context.Background()) + w := &newQuestionEmailWorker{ + tasks: make(chan newQuestionEmailTask, bufferSize), + send: send, + interval: interval, + timerFactory: timerFactory, + ctx: ctx, + cancel: cancel, + } + w.wg.Add(1) + go w.run() + return w +} + +func (w *newQuestionEmailWorker) TryEnqueue(task newQuestionEmailTask) bool { + if w == nil { + log.Warnf("[new_question_email] worker is nil, dropping new question email task") + return false + } + + task = copyNewQuestionEmailTask(task) + + w.mu.RLock() + defer w.mu.RUnlock() + + if w.closed { + log.Warnf("[new_question_email] worker is closed, dropping new question email task for question %s", task.QuestionID) + return false + } + + if w.ctx == nil { + log.Warnf("[new_question_email] worker context is nil, dropping new question email task for question %s", task.QuestionID) + return false + } + + select { + case <-w.ctx.Done(): + log.Warnf("[new_question_email] worker is canceled, dropping new question email task for question %s", task.QuestionID) + return false + default: + } + + select { + case w.tasks <- task: + log.Debugf("[new_question_email] enqueued task for question %s to %d users", task.QuestionID, len(task.UserIDs)) + return true + case <-w.ctx.Done(): + log.Warnf("[new_question_email] worker canceled while enqueueing task for question %s", task.QuestionID) + return false + default: + log.Warnf("[new_question_email] queue is full, dropping new question email task for question %s", task.QuestionID) + return false + } +} + +func (w *newQuestionEmailWorker) Close() { + if w == nil { + return + } + + w.mu.Lock() + if w.closed { + w.mu.Unlock() + return + } + w.closed = true + if w.cancel != nil { + w.cancel() + } + w.mu.Unlock() + + w.wg.Wait() + if dropped := w.dropPendingTasks(); dropped > 0 { + log.Warnf("[new_question_email] dropped %d pending tasks during shutdown", dropped) + } + log.Infof("[new_question_email] worker closed") +} + +func (w *newQuestionEmailWorker) run() { + defer w.wg.Done() + + emailAttemptSent := false + for { + if w.ctx.Err() != nil { + return + } + + select { + case <-w.ctx.Done(): + return + case task := <-w.tasks: + if w.ctx.Err() != nil { + return + } + if !w.processTask(task, &emailAttemptSent) { + return + } + } + } +} + +func (w *newQuestionEmailWorker) processTask(task newQuestionEmailTask, emailAttemptSent *bool) bool { + for _, userID := range task.UserIDs { + if w.ctx.Err() != nil { + return false + } + if *emailAttemptSent { + interval := w.interval() + if interval > 0 && !waitNewQuestionEmailInterval(w.ctx, interval, w.timerFactory) { + return false + } + } + if w.ctx.Err() != nil { + return false + } + if w.send == nil { + log.Errorf("[new_question_email] sender is nil, dropping email attempt for user %s question %s", userID, task.QuestionID) + *emailAttemptSent = true + continue + } + w.send(w.ctx, userID, task.newRawData()) + *emailAttemptSent = true + } + return true +} + +func (w *newQuestionEmailWorker) dropPendingTasks() int { + dropped := 0 + for { + select { + case <-w.tasks: + dropped++ + default: + return dropped + } + } +} + +func waitNewQuestionEmailInterval( + ctx context.Context, + interval time.Duration, + timerFactory newQuestionEmailTimerFactory, +) bool { + if interval <= 0 { + return true + } + if timerFactory == nil { + timerFactory = newRealNewQuestionEmailTimer + } + timer := timerFactory(interval) + defer timer.Stop() + + select { + case <-timer.C(): + return true + case <-ctx.Done(): + return false + } +} + +func (task newQuestionEmailTask) newRawData() *schema.NewQuestionTemplateRawData { + return &schema.NewQuestionTemplateRawData{ + QuestionTitle: task.QuestionTitle, + QuestionID: task.QuestionID, + UnsubscribeCode: token.GenerateToken(), + Tags: copyStringSlice(task.Tags), + TagIDs: copyStringSlice(task.TagIDs), + } +} + +func newQuestionEmailTaskFromRawData( + userIDs []string, + rawData *schema.NewQuestionTemplateRawData, +) newQuestionEmailTask { + if rawData == nil { + return newQuestionEmailTask{UserIDs: copyStringSlice(userIDs)} + } + return newQuestionEmailTask{ + UserIDs: copyStringSlice(userIDs), + QuestionTitle: rawData.QuestionTitle, + QuestionID: rawData.QuestionID, + Tags: copyStringSlice(rawData.Tags), + TagIDs: copyStringSlice(rawData.TagIDs), + } +} + +func copyNewQuestionEmailTask(task newQuestionEmailTask) newQuestionEmailTask { + task.UserIDs = copyStringSlice(task.UserIDs) + task.Tags = copyStringSlice(task.Tags) + task.TagIDs = copyStringSlice(task.TagIDs) + return task +} + +func copyStringSlice(values []string) []string { + if values == nil { + return nil + } + copied := make([]string, len(values)) + copy(copied, values) + return copied +} + +type realNewQuestionEmailTimer struct { + timer *time.Timer +} + +func newRealNewQuestionEmailTimer(interval time.Duration) newQuestionEmailTimer { + return &realNewQuestionEmailTimer{timer: time.NewTimer(interval)} +} + +func (t *realNewQuestionEmailTimer) C() <-chan time.Time { + return t.timer.C +} + +func (t *realNewQuestionEmailTimer) Stop() { + if !t.timer.Stop() { + select { + case <-t.timer.C: + default: + } + } +} diff --git a/internal/service/notification/new_question_email_worker_test.go b/internal/service/notification/new_question_email_worker_test.go new file mode 100644 index 000000000..302de4eb9 --- /dev/null +++ b/internal/service/notification/new_question_email_worker_test.go @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package notification + +import ( + "context" + "reflect" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/apache/answer/internal/schema" +) + +func TestNewQuestionEmailWorkerDelaysBetweenAttempts(t *testing.T) { + timerFactory := newFakeNewQuestionEmailTimerFactory() + sendCh := make(chan newQuestionEmailSendEvent, 2) + worker := newQuestionEmailWorkerWithBuffer( + func() time.Duration { return 3 * time.Second }, + newQuestionEmailSendRecorder(sendCh), + timerFactory.New, + 2, + ) + defer worker.Close() + + if !worker.TryEnqueue(newQuestionEmailWorkerTask("question-1", "user-1", "user-2")) { + t.Fatalf("TryEnqueue() = false, want true") + } + + first := receiveNewQuestionEmailSend(t, sendCh) + if first.userID != "user-1" { + t.Fatalf("first send user = %s, want user-1", first.userID) + } + timer := timerFactory.WaitForTimer(t) + assertNoNewQuestionEmailSend(t, sendCh) + timer.Fire() + + second := receiveNewQuestionEmailSend(t, sendCh) + if second.userID != "user-2" { + t.Fatalf("second send user = %s, want user-2", second.userID) + } + assertUniqueNewQuestionUnsubscribeCodes(t, []string{ + first.rawData.UnsubscribeCode, + second.rawData.UnsubscribeCode, + }) + if got := timerFactory.Durations(); !reflect.DeepEqual(got, []time.Duration{3 * time.Second}) { + t.Fatalf("timer durations = %v, want [3s]", got) + } +} + +func TestNewQuestionEmailWorkerDelayContinuesAcrossTaskBoundaries(t *testing.T) { + timerFactory := newFakeNewQuestionEmailTimerFactory() + sendCh := make(chan newQuestionEmailSendEvent, 2) + worker := newQuestionEmailWorkerWithBuffer( + func() time.Duration { return 5 * time.Second }, + newQuestionEmailSendRecorder(sendCh), + timerFactory.New, + 2, + ) + defer worker.Close() + + if !worker.TryEnqueue(newQuestionEmailWorkerTask("question-1", "user-1")) { + t.Fatalf("TryEnqueue() task 1 = false, want true") + } + first := receiveNewQuestionEmailSend(t, sendCh) + if first.userID != "user-1" { + t.Fatalf("first send user = %s, want user-1", first.userID) + } + + if !worker.TryEnqueue(newQuestionEmailWorkerTask("question-2", "user-2")) { + t.Fatalf("TryEnqueue() task 2 = false, want true") + } + timer := timerFactory.WaitForTimer(t) + assertNoNewQuestionEmailSend(t, sendCh) + timer.Fire() + + second := receiveNewQuestionEmailSend(t, sendCh) + if second.userID != "user-2" { + t.Fatalf("second send user = %s, want user-2", second.userID) + } +} + +func TestNewQuestionEmailWorkerZeroIntervalSendsWithoutTimers(t *testing.T) { + var timerCount int + sendCh := make(chan newQuestionEmailSendEvent, 3) + worker := newQuestionEmailWorkerWithBuffer( + func() time.Duration { return 0 }, + newQuestionEmailSendRecorder(sendCh), + func(time.Duration) newQuestionEmailTimer { + timerCount++ + return newFakeNewQuestionEmailTimer() + }, + 2, + ) + defer worker.Close() + + if !worker.TryEnqueue(newQuestionEmailWorkerTask("question-1", "user-1", "user-2", "user-3")) { + t.Fatalf("TryEnqueue() = false, want true") + } + + gotUsers := []string{ + receiveNewQuestionEmailSend(t, sendCh).userID, + receiveNewQuestionEmailSend(t, sendCh).userID, + receiveNewQuestionEmailSend(t, sendCh).userID, + } + if !reflect.DeepEqual(gotUsers, []string{"user-1", "user-2", "user-3"}) { + t.Fatalf("send users = %v, want [user-1 user-2 user-3]", gotUsers) + } + if timerCount != 0 { + t.Fatalf("timer count = %d, want 0", timerCount) + } +} + +func TestNewQuestionEmailWorkerCloseCancelsPendingWaitAndDropsQueuedTasks(t *testing.T) { + timerFactory := newFakeNewQuestionEmailTimerFactory() + sendCh := make(chan newQuestionEmailSendEvent, 3) + worker := newQuestionEmailWorkerWithBuffer( + func() time.Duration { return time.Hour }, + newQuestionEmailSendRecorder(sendCh), + timerFactory.New, + 2, + ) + + if !worker.TryEnqueue(newQuestionEmailWorkerTask("question-1", "user-1", "user-2")) { + t.Fatalf("TryEnqueue() task 1 = false, want true") + } + if !worker.TryEnqueue(newQuestionEmailWorkerTask("question-2", "user-3")) { + t.Fatalf("TryEnqueue() task 2 = false, want true") + } + + first := receiveNewQuestionEmailSend(t, sendCh) + if first.userID != "user-1" { + t.Fatalf("first send user = %s, want user-1", first.userID) + } + timer := timerFactory.WaitForTimer(t) + + worker.Close() + timer.AssertStopped(t) + assertNoNewQuestionEmailSend(t, sendCh) + if got := len(worker.tasks); got != 0 { + t.Fatalf("pending tasks after Close() = %d, want 0", got) + } + if worker.TryEnqueue(newQuestionEmailWorkerTask("question-3", "user-4")) { + t.Fatalf("TryEnqueue() after Close() = true, want false") + } +} + +func TestNewQuestionEmailWorkerProcessesSerially(t *testing.T) { + entered := make(chan string, 2) + releaseFirst := make(chan struct{}) + worker := newQuestionEmailWorkerWithBuffer( + func() time.Duration { return 0 }, + func(_ context.Context, userID string, _ *schema.NewQuestionTemplateRawData) { + entered <- userID + if userID == "user-1" { + <-releaseFirst + } + }, + nil, + 2, + ) + defer worker.Close() + + if !worker.TryEnqueue(newQuestionEmailWorkerTask("question-1", "user-1", "user-2")) { + t.Fatalf("TryEnqueue() = false, want true") + } + if got := receiveString(t, entered); got != "user-1" { + t.Fatalf("first send user = %s, want user-1", got) + } + assertNoString(t, entered) + + close(releaseFirst) + if got := receiveString(t, entered); got != "user-2" { + t.Fatalf("second send user = %s, want user-2", got) + } +} + +func TestNewQuestionEmailWorkerBuildsFreshRawDataPerAttempt(t *testing.T) { + sendCh := make(chan newQuestionEmailSendEvent, 2) + worker := newQuestionEmailWorkerWithBuffer( + func() time.Duration { return 0 }, + func(_ context.Context, userID string, rawData *schema.NewQuestionTemplateRawData) { + if rawData.QuestionAuthorUserID != "" { + t.Errorf("QuestionAuthorUserID = %q, want empty", rawData.QuestionAuthorUserID) + } + if userID == "user-1" { + rawData.Tags[0] = "mutated" + rawData.TagIDs[0] = "mutated" + } + sendCh <- newQuestionEmailSendEvent{userID: userID, rawData: rawData} + }, + nil, + 2, + ) + defer worker.Close() + + if !worker.TryEnqueue(newQuestionEmailTask{ + UserIDs: []string{"user-1", "user-2"}, + QuestionTitle: "Question", + QuestionID: "question-1", + Tags: []string{"go"}, + TagIDs: []string{"tag-1"}, + }) { + t.Fatalf("TryEnqueue() = false, want true") + } + + first := receiveNewQuestionEmailSend(t, sendCh) + second := receiveNewQuestionEmailSend(t, sendCh) + if first.rawData.UnsubscribeCode == "" || second.rawData.UnsubscribeCode == "" { + t.Fatalf("unsubscribe codes must be non-empty: %q %q", + first.rawData.UnsubscribeCode, second.rawData.UnsubscribeCode) + } + if first.rawData.UnsubscribeCode == second.rawData.UnsubscribeCode { + t.Fatalf("unsubscribe codes must be unique, both were %q", first.rawData.UnsubscribeCode) + } + if !reflect.DeepEqual(second.rawData.Tags, []string{"go"}) || + !reflect.DeepEqual(second.rawData.TagIDs, []string{"tag-1"}) { + t.Fatalf("second raw data tags = %v/%v, want original values", + second.rawData.Tags, second.rawData.TagIDs) + } +} + +func TestNewQuestionEmailWorkerTryEnqueueCopiesTaskAndFailsFast(t *testing.T) { + worker := newUnstartedNewQuestionEmailWorkerForTest(1) + task := newQuestionEmailTask{ + UserIDs: []string{"user-1"}, + QuestionTitle: "Question", + QuestionID: "question-1", + Tags: []string{"go"}, + TagIDs: []string{"tag-1"}, + } + if !worker.TryEnqueue(task) { + t.Fatalf("TryEnqueue() = false, want true") + } + task.UserIDs[0] = "mutated-user" + task.Tags[0] = "mutated-tag" + task.TagIDs[0] = "mutated-tag-id" + + queuedTask := <-worker.tasks + if !reflect.DeepEqual(queuedTask.UserIDs, []string{"user-1"}) || + !reflect.DeepEqual(queuedTask.Tags, []string{"go"}) || + !reflect.DeepEqual(queuedTask.TagIDs, []string{"tag-1"}) { + t.Fatalf("queued task was mutated: %+v", queuedTask) + } + + if !worker.TryEnqueue(newQuestionEmailWorkerTask("question-2", "user-2")) { + t.Fatalf("TryEnqueue() refill = false, want true") + } + if worker.TryEnqueue(newQuestionEmailWorkerTask("question-3", "user-3")) { + t.Fatalf("TryEnqueue() with full queue = true, want false") + } + + worker.Close() + if worker.TryEnqueue(newQuestionEmailWorkerTask("question-4", "user-4")) { + t.Fatalf("TryEnqueue() after Close() = true, want false") + } + + canceledWorker := newUnstartedNewQuestionEmailWorkerForTest(1) + canceledWorker.cancel() + if canceledWorker.TryEnqueue(newQuestionEmailWorkerTask("question-5", "user-5")) { + t.Fatalf("TryEnqueue() after cancel = true, want false") + } +} + +func TestNewQuestionEmailWorkerTryEnqueueConcurrentClose(t *testing.T) { + const ( + iterations = 100 + senders = 32 + ) + + for iteration := 0; iteration < iterations; iteration++ { + worker := newUnstartedNewQuestionEmailWorkerForTest(1) + if !worker.TryEnqueue(newQuestionEmailWorkerTask("already-queued", "queued-user")) { + t.Fatalf("iteration %d: pre-fill TryEnqueue() = false, want true", iteration) + } + + start := make(chan struct{}) + ready := make(chan struct{}, senders) + panicCh := make(chan any, senders) + var closeObserved atomic.Bool + var acceptedAfterCloseObserved atomic.Int64 + var wg sync.WaitGroup + + for sender := 0; sender < senders; sender++ { + wg.Add(1) + go func(sender int) { + defer wg.Done() + defer func() { + if recovered := recover(); recovered != nil { + panicCh <- recovered + } + }() + + ready <- struct{}{} + <-start + for { + accepted := worker.TryEnqueue(newQuestionEmailWorkerTask("question", "user")) + if accepted && closeObserved.Load() { + acceptedAfterCloseObserved.Add(1) + } + if closeObserved.Load() { + return + } + runtime.Gosched() + } + }(sender) + } + for sender := 0; sender < senders; sender++ { + <-ready + } + + closeDoneObserved := make(chan struct{}) + go func() { + <-worker.ctx.Done() + closeObserved.Store(true) + close(closeDoneObserved) + }() + + close(start) + runtime.Gosched() + + closeDone := make(chan struct{}) + go func() { + worker.Close() + close(closeDone) + }() + + select { + case <-closeDone: + case <-time.After(time.Second): + t.Fatalf("iteration %d: Close() did not return", iteration) + } + select { + case <-closeDoneObserved: + case <-time.After(time.Second): + t.Fatalf("iteration %d: close was not observed", iteration) + } + + wgDone := make(chan struct{}) + go func() { + wg.Wait() + close(wgDone) + }() + select { + case <-wgDone: + case <-time.After(time.Second): + t.Fatalf("iteration %d: TryEnqueue goroutines did not return", iteration) + } + + select { + case recovered := <-panicCh: + t.Fatalf("iteration %d: TryEnqueue panicked during Close(): %v", iteration, recovered) + default: + } + if got := acceptedAfterCloseObserved.Load(); got != 0 { + t.Fatalf("iteration %d: accepted %d enqueue attempts after close was observed, want 0", + iteration, got) + } + if worker.TryEnqueue(newQuestionEmailWorkerTask("after-close", "user")) { + t.Fatalf("iteration %d: TryEnqueue() after Close() = true, want false", iteration) + } + if got := len(worker.tasks); got != 0 { + t.Fatalf("iteration %d: pending tasks after Close() = %d, want 0", iteration, got) + } + } +} + +func TestWaitNewQuestionEmailIntervalCancel(t *testing.T) { + timerFactory := newFakeNewQuestionEmailTimerFactory() + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan bool, 1) + go func() { + done <- waitNewQuestionEmailInterval(ctx, time.Minute, timerFactory.New) + }() + + timer := timerFactory.WaitForTimer(t) + cancel() + + select { + case got := <-done: + if got { + t.Fatalf("waitNewQuestionEmailInterval() = true, want false") + } + case <-time.After(time.Second): + t.Fatalf("waitNewQuestionEmailInterval() did not return after cancellation") + } + timer.AssertStopped(t) +} + +type newQuestionEmailSendEvent struct { + userID string + rawData *schema.NewQuestionTemplateRawData +} + +func newQuestionEmailSendRecorder(sendCh chan<- newQuestionEmailSendEvent) newQuestionNotificationEmailSender { + return func(_ context.Context, userID string, rawData *schema.NewQuestionTemplateRawData) { + sendCh <- newQuestionEmailSendEvent{userID: userID, rawData: rawData} + } +} + +func newQuestionEmailWorkerTask(questionID string, userIDs ...string) newQuestionEmailTask { + return newQuestionEmailTask{ + UserIDs: userIDs, + QuestionTitle: "Question", + QuestionID: questionID, + Tags: []string{"go"}, + TagIDs: []string{"tag-1"}, + } +} + +func newUnstartedNewQuestionEmailWorkerForTest(bufferSize int) *newQuestionEmailWorker { + ctx, cancel := context.WithCancel(context.Background()) + return &newQuestionEmailWorker{ + tasks: make(chan newQuestionEmailTask, bufferSize), + interval: func() time.Duration { return 0 }, + timerFactory: newRealNewQuestionEmailTimer, + ctx: ctx, + cancel: cancel, + } +} + +func receiveNewQuestionEmailSend(t *testing.T, sendCh <-chan newQuestionEmailSendEvent) newQuestionEmailSendEvent { + t.Helper() + select { + case event := <-sendCh: + return event + case <-time.After(time.Second): + t.Fatalf("timed out waiting for new question email send") + return newQuestionEmailSendEvent{} + } +} + +func assertNoNewQuestionEmailSend(t *testing.T, sendCh <-chan newQuestionEmailSendEvent) { + t.Helper() + select { + case event := <-sendCh: + t.Fatalf("unexpected new question email send: %+v", event) + default: + } +} + +func receiveString(t *testing.T, ch <-chan string) string { + t.Helper() + select { + case value := <-ch: + return value + case <-time.After(time.Second): + t.Fatalf("timed out waiting for string") + return "" + } +} + +func assertNoString(t *testing.T, ch <-chan string) { + t.Helper() + select { + case value := <-ch: + t.Fatalf("unexpected string: %s", value) + default: + } +} + +type fakeNewQuestionEmailTimerFactory struct { + timers chan *fakeNewQuestionEmailTimer + mu sync.Mutex + durations []time.Duration +} + +func newFakeNewQuestionEmailTimerFactory() *fakeNewQuestionEmailTimerFactory { + return &fakeNewQuestionEmailTimerFactory{ + timers: make(chan *fakeNewQuestionEmailTimer, 16), + } +} + +func (f *fakeNewQuestionEmailTimerFactory) New(duration time.Duration) newQuestionEmailTimer { + timer := newFakeNewQuestionEmailTimer() + + f.mu.Lock() + f.durations = append(f.durations, duration) + f.mu.Unlock() + + f.timers <- timer + return timer +} + +func (f *fakeNewQuestionEmailTimerFactory) WaitForTimer(t *testing.T) *fakeNewQuestionEmailTimer { + t.Helper() + select { + case timer := <-f.timers: + return timer + case <-time.After(time.Second): + t.Fatalf("timed out waiting for timer") + return nil + } +} + +func (f *fakeNewQuestionEmailTimerFactory) Durations() []time.Duration { + f.mu.Lock() + defer f.mu.Unlock() + + durations := make([]time.Duration, len(f.durations)) + copy(durations, f.durations) + return durations +} + +type fakeNewQuestionEmailTimer struct { + ch chan time.Time + stopped chan struct{} + once sync.Once +} + +func newFakeNewQuestionEmailTimer() *fakeNewQuestionEmailTimer { + return &fakeNewQuestionEmailTimer{ + ch: make(chan time.Time, 1), + stopped: make(chan struct{}), + } +} + +func (t *fakeNewQuestionEmailTimer) C() <-chan time.Time { + return t.ch +} + +func (t *fakeNewQuestionEmailTimer) Stop() { + t.once.Do(func() { + close(t.stopped) + }) +} + +func (t *fakeNewQuestionEmailTimer) Fire() { + t.ch <- time.Now() +} + +func (t *fakeNewQuestionEmailTimer) AssertStopped(tb testing.TB) { + tb.Helper() + select { + case <-t.stopped: + case <-time.After(time.Second): + tb.Fatalf("timer was not stopped") + } +} diff --git a/internal/service/notification/new_question_notification.go b/internal/service/notification/new_question_notification.go index 0a5471873..43c5ff859 100644 --- a/internal/service/notification/new_question_notification.go +++ b/internal/service/notification/new_question_notification.go @@ -28,7 +28,6 @@ import ( "github.com/apache/answer/internal/base/translator" "github.com/apache/answer/internal/schema" "github.com/apache/answer/pkg/display" - "github.com/apache/answer/pkg/token" "github.com/apache/answer/plugin" "github.com/jinzhu/copier" "github.com/segmentfault/pacman/i18n" @@ -50,25 +49,42 @@ func (ns *ExternalNotificationService) handleNewQuestionNotification(ctx context } log.Debugf("get subscribers %d for question %s", len(subscribers), msg.NewQuestionTemplateRawData.QuestionID) + ns.syncNewQuestionNotificationToPlugin(ctx, msg) + ns.enqueueNewQuestionNotificationEmails(subscribers, msg.NewQuestionTemplateRawData) + return nil +} + +func (ns *ExternalNotificationService) enqueueNewQuestionNotificationEmails( + subscribers []*NewQuestionSubscriber, + rawData *schema.NewQuestionTemplateRawData, +) { + task := newQuestionEmailTaskFromRawData(collectNewQuestionNotificationEmailUserIDs(subscribers), rawData) + if len(task.UserIDs) == 0 { + return + } + if ns.newQuestionEmailWorker == nil { + log.Warnf("[new_question_email] worker is nil, dropping task for question %s", task.QuestionID) + return + } + if !ns.newQuestionEmailWorker.TryEnqueue(task) { + log.Warnf("[new_question_email] failed to enqueue task for question %s", task.QuestionID) + } +} + +func collectNewQuestionNotificationEmailUserIDs(subscribers []*NewQuestionSubscriber) []string { + userIDs := make([]string, 0, len(subscribers)) for _, subscriber := range subscribers { + if subscriber == nil { + continue + } for _, channel := range subscriber.Channels { - if !channel.Enable { + if channel == nil || !channel.Enable || channel.Key != constant.EmailChannel { continue } - if channel.Key == constant.EmailChannel { - ns.sendNewQuestionNotificationEmail(ctx, subscriber.UserID, &schema.NewQuestionTemplateRawData{ - QuestionTitle: msg.NewQuestionTemplateRawData.QuestionTitle, - QuestionID: msg.NewQuestionTemplateRawData.QuestionID, - UnsubscribeCode: token.GenerateToken(), - Tags: msg.NewQuestionTemplateRawData.Tags, - TagIDs: msg.NewQuestionTemplateRawData.TagIDs, - }) - } + userIDs = append(userIDs, subscriber.UserID) } } - - ns.syncNewQuestionNotificationToPlugin(ctx, msg) - return nil + return userIDs } func (ns *ExternalNotificationService) getNewQuestionSubscribers(ctx context.Context, msg *schema.ExternalNotificationMsg) ( diff --git a/internal/service/notification/new_question_notification_interval.go b/internal/service/notification/new_question_notification_interval.go new file mode 100644 index 000000000..19bb1cea8 --- /dev/null +++ b/internal/service/notification/new_question_notification_interval.go @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package notification + +import ( + "context" + "os" + "strconv" + "strings" + "time" + + "github.com/apache/answer/internal/schema" +) + +const newQuestionNotificationEmailSendIntervalEnv = "NEW_QUESTION_NOTIFICATION_EMAIL_SEND_INTERVAL_SECONDS" + +const maxNewQuestionNotificationEmailSendInterval = 5 * time.Minute + +const maxNewQuestionNotificationEmailSendIntervalSeconds = int64(maxNewQuestionNotificationEmailSendInterval / time.Second) + +type newQuestionNotificationEmailSender func(context.Context, string, *schema.NewQuestionTemplateRawData) + +func newQuestionNotificationEmailSendInterval() time.Duration { + return parseNewQuestionNotificationEmailSendInterval(os.Getenv(newQuestionNotificationEmailSendIntervalEnv)) +} + +func parseNewQuestionNotificationEmailSendInterval(value string) time.Duration { + value = strings.TrimSpace(value) + if len(value) == 0 { + return 0 + } + seconds, err := strconv.ParseInt(value, 10, 64) + if err != nil || seconds < 0 { + return 0 + } + if seconds > maxNewQuestionNotificationEmailSendIntervalSeconds { + return maxNewQuestionNotificationEmailSendInterval + } + return time.Duration(seconds) * time.Second +} diff --git a/internal/service/notification/new_question_notification_test.go b/internal/service/notification/new_question_notification_test.go new file mode 100644 index 000000000..e7db3814d --- /dev/null +++ b/internal/service/notification/new_question_notification_test.go @@ -0,0 +1,822 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package notification + +import ( + "context" + "encoding/json" + "os" + "reflect" + "sync" + "testing" + "time" + + "github.com/apache/answer/internal/base/constant" + basedata "github.com/apache/answer/internal/base/data" + "github.com/apache/answer/internal/entity" + "github.com/apache/answer/internal/schema" + "github.com/apache/answer/internal/service/config" + "github.com/apache/answer/internal/service/export" + "github.com/apache/answer/internal/service/mock" + "github.com/apache/answer/plugin" + "go.uber.org/mock/gomock" +) + +func TestNewQuestionNotificationEmailSendInterval(t *testing.T) { + tests := []struct { + name string + value string + set bool + want time.Duration + }{ + { + name: "unset", + want: 0, + }, + { + name: "empty", + value: "", + set: true, + want: 0, + }, + { + name: "positive integer", + value: "5", + set: true, + want: 5 * time.Second, + }, + { + name: "positive integer with whitespace", + value: " 5 ", + set: true, + want: 5 * time.Second, + }, + { + name: "invalid", + value: "not-a-number", + set: true, + want: 0, + }, + { + name: "negative", + value: "-1", + set: true, + want: 0, + }, + { + name: "whitespace", + value: " ", + set: true, + want: 0, + }, + { + name: "above max clamps to max", + value: "301", + set: true, + want: maxNewQuestionNotificationEmailSendInterval, + }, + { + name: "duration overflow clamps to max", + value: "9223372037", + set: true, + want: maxNewQuestionNotificationEmailSendInterval, + }, + { + name: "parse int overflow", + value: "9223372036854775808", + set: true, + want: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + setNewQuestionNotificationEmailSendIntervalEnv(t, tt.value, tt.set) + + got := newQuestionNotificationEmailSendInterval() + if got != tt.want { + t.Fatalf("newQuestionNotificationEmailSendInterval() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestHandleNewQuestionNotificationEnqueuesEmailTask(t *testing.T) { + setNewQuestionNotificationEmailSendIntervalEnv(t, "0", true) + + cache, cleanup, err := basedata.NewCache(&basedata.CacheConf{}) + if err != nil { + t.Fatalf("new cache: %v", err) + } + t.Cleanup(cleanup) + + ctrl := gomock.NewController(t) + siteInfoService := mock.NewMockSiteInfoCommonService(ctrl) + siteInfoService.EXPECT().GetSiteGeneral(gomock.Any()).Return(&schema.SiteGeneralResp{ + Name: "Answer", + SiteUrl: "https://answer.test", + ContactEmail: "support@answer.test", + }, nil).AnyTimes() + siteInfoService.EXPECT().GetSiteSeo(gomock.Any()).Return(&schema.SiteSeoResp{ + Permalink: constant.PermalinkQuestionIDAndTitle, + }, nil).AnyTimes() + + emailRepo := &newQuestionNotificationTestEmailRepo{ + codesByUserID: make(map[string][]string), + } + notificationConfigRepo := &newQuestionNotificationTestUserNotificationConfigRepo{ + followedTagConfigs: map[string]*entity.UserNotificationConfig{ + "tag-user": newQuestionNotificationConfig( + "tag-user", constant.AllNewQuestionForFollowingTagsSource, true), + "dup-user": newQuestionNotificationConfig( + "dup-user", constant.AllNewQuestionForFollowingTagsSource, true), + "author": newQuestionNotificationConfig( + "author", constant.AllNewQuestionForFollowingTagsSource, true), + }, + allQuestionConfigs: []*entity.UserNotificationConfig{ + newQuestionNotificationConfig("all-user", constant.AllNewQuestionSource, true), + newQuestionNotificationConfig("dup-user", constant.AllNewQuestionSource, true), + newQuestionNotificationConfig("author", constant.AllNewQuestionSource, true), + }, + } + service := &ExternalNotificationService{ + data: &basedata.Data{ + Cache: cache, + }, + userNotificationConfigRepo: notificationConfigRepo, + followRepo: &newQuestionNotificationTestFollowRepo{ + followersByObjectID: map[string][]string{ + "tag-1": {"tag-user", "dup-user", "author"}, + }, + }, + emailService: export.NewEmailService( + config.NewConfigService(newQuestionNotificationTestConfigRepo{}), + emailRepo, + siteInfoService, + ), + userRepo: &newQuestionNotificationTestUserRepo{ + users: map[string]*entity.User{ + "tag-user": newQuestionNotificationTestUser("tag-user"), + "dup-user": newQuestionNotificationTestUser("dup-user"), + "all-user": newQuestionNotificationTestUser("all-user"), + "author": newQuestionNotificationTestUser("author"), + }, + }, + siteInfoService: siteInfoService, + } + service.newQuestionEmailWorker = newUnstartedNewQuestionEmailWorkerForTest(1) + + err = service.handleNewQuestionNotification(context.Background(), &schema.ExternalNotificationMsg{ + NewQuestionTemplateRawData: &schema.NewQuestionTemplateRawData{ + QuestionTitle: "New question", + QuestionID: "1", + QuestionAuthorUserID: "author", + Tags: []string{"go"}, + TagIDs: []string{"tag-1"}, + }, + }) + if err != nil { + t.Fatalf("handleNewQuestionNotification() error = %v", err) + } + + var task newQuestionEmailTask + select { + case task = <-service.newQuestionEmailWorker.tasks: + default: + t.Fatalf("expected enqueued new question email task") + } + + wantUsers := []string{"all-user", "dup-user", "tag-user"} + assertStringSet(t, task.UserIDs, wantUsers) + if task.QuestionTitle != "New question" || task.QuestionID != "1" { + t.Fatalf("task question data = %+v", task) + } + if !reflect.DeepEqual(task.Tags, []string{"go"}) || !reflect.DeepEqual(task.TagIDs, []string{"tag-1"}) { + t.Fatalf("task tags = %v/%v", task.Tags, task.TagIDs) + } + if len(emailRepo.codesByUserID) > 0 { + t.Fatalf("handler sent emails synchronously: %v", emailRepo.codesByUserID) + } +} + +func TestHandleNewQuestionNotificationSkipsEnqueueWithoutEnabledEmailAttempts(t *testing.T) { + cache, cleanup, err := basedata.NewCache(&basedata.CacheConf{}) + if err != nil { + t.Fatalf("new cache: %v", err) + } + t.Cleanup(cleanup) + + service := &ExternalNotificationService{ + data: &basedata.Data{Cache: cache}, + userNotificationConfigRepo: &newQuestionNotificationTestUserNotificationConfigRepo{ + followedTagConfigs: map[string]*entity.UserNotificationConfig{ + "tag-user": newQuestionNotificationConfig( + "tag-user", constant.AllNewQuestionForFollowingTagsSource, false), + }, + allQuestionConfigs: []*entity.UserNotificationConfig{ + newQuestionNotificationConfig("all-user", constant.AllNewQuestionSource, false), + }, + }, + followRepo: &newQuestionNotificationTestFollowRepo{ + followersByObjectID: map[string][]string{"tag-1": {"tag-user"}}, + }, + userRepo: &newQuestionNotificationTestUserRepo{ + users: map[string]*entity.User{ + "tag-user": newQuestionNotificationTestUser("tag-user"), + "all-user": newQuestionNotificationTestUser("all-user"), + }, + }, + newQuestionEmailWorker: newUnstartedNewQuestionEmailWorkerForTest(1), + } + + err = service.handleNewQuestionNotification(context.Background(), &schema.ExternalNotificationMsg{ + NewQuestionTemplateRawData: &schema.NewQuestionTemplateRawData{ + QuestionTitle: "New question", + QuestionID: "1", + Tags: []string{"go"}, + TagIDs: []string{"tag-1"}, + }, + }) + if err != nil { + t.Fatalf("handleNewQuestionNotification() error = %v", err) + } + select { + case task := <-service.newQuestionEmailWorker.tasks: + t.Fatalf("unexpected enqueued task: %+v", task) + default: + } +} + +func TestHandleNewQuestionNotificationReturnsWhenEmailWorkerQueueFull(t *testing.T) { + cache, cleanup, err := basedata.NewCache(&basedata.CacheConf{}) + if err != nil { + t.Fatalf("new cache: %v", err) + } + t.Cleanup(cleanup) + + worker := newUnstartedNewQuestionEmailWorkerForTest(1) + if !worker.TryEnqueue(newQuestionEmailWorkerTask("already-queued", "queued-user")) { + t.Fatalf("pre-fill TryEnqueue() = false, want true") + } + service := &ExternalNotificationService{ + data: &basedata.Data{Cache: cache}, + userNotificationConfigRepo: &newQuestionNotificationTestUserNotificationConfigRepo{ + allQuestionConfigs: []*entity.UserNotificationConfig{ + newQuestionNotificationConfig("all-user", constant.AllNewQuestionSource, true), + }, + }, + followRepo: &newQuestionNotificationTestFollowRepo{ + followersByObjectID: map[string][]string{}, + }, + userRepo: &newQuestionNotificationTestUserRepo{ + users: map[string]*entity.User{ + "all-user": newQuestionNotificationTestUser("all-user"), + }, + }, + newQuestionEmailWorker: worker, + } + + err = service.handleNewQuestionNotification(context.Background(), &schema.ExternalNotificationMsg{ + NewQuestionTemplateRawData: &schema.NewQuestionTemplateRawData{ + QuestionTitle: "New question", + QuestionID: "1", + }, + }) + if err != nil { + t.Fatalf("handleNewQuestionNotification() error = %v", err) + } + if got := len(worker.tasks); got != 1 { + t.Fatalf("worker queue length = %d, want 1", got) + } +} + +func TestHandleNewQuestionNotificationSyncsPluginBeforeEmailEnqueue(t *testing.T) { + cache, cleanup, err := basedata.NewCache(&basedata.CacheConf{}) + if err != nil { + t.Fatalf("new cache: %v", err) + } + t.Cleanup(cleanup) + + ctrl := gomock.NewController(t) + siteInfoService := mock.NewMockSiteInfoCommonService(ctrl) + siteInfoService.EXPECT().GetSiteGeneral(gomock.Any()).Return(&schema.SiteGeneralResp{ + Name: "Answer", + SiteUrl: "https://answer.test", + ContactEmail: "support@answer.test", + }, nil).AnyTimes() + siteInfoService.EXPECT().GetSiteSeo(gomock.Any()).Return(&schema.SiteSeoResp{ + Permalink: constant.PermalinkQuestionIDAndTitle, + }, nil).AnyTimes() + siteInfoService.EXPECT().GetSiteInterface(gomock.Any()).Return(&schema.SiteInterfaceSettingsResp{ + Language: "en", + }, nil).AnyTimes() + + notifyStarted := make(chan plugin.NotificationMessage, 1) + releaseNotify := make(chan struct{}) + enableNewQuestionNotificationTestPlugin(t, notifyStarted, releaseNotify) + + worker := newUnstartedNewQuestionEmailWorkerForTest(1) + service := &ExternalNotificationService{ + data: &basedata.Data{Cache: cache}, + userNotificationConfigRepo: &newQuestionNotificationTestUserNotificationConfigRepo{ + followedTagConfigs: map[string]*entity.UserNotificationConfig{ + "tag-user": newQuestionNotificationConfig( + "tag-user", constant.AllNewQuestionForFollowingTagsSource, true), + }, + }, + followRepo: &newQuestionNotificationTestFollowRepo{ + followersByObjectID: map[string][]string{"tag-1": {"tag-user"}}, + }, + userRepo: &newQuestionNotificationTestUserRepo{ + users: map[string]*entity.User{ + "tag-user": newQuestionNotificationTestUser("tag-user"), + }, + }, + userExternalLoginRepo: newQuestionNotificationTestUserExternalLoginRepo{}, + siteInfoService: siteInfoService, + newQuestionEmailWorker: worker, + } + + errCh := make(chan error, 1) + go func() { + errCh <- service.handleNewQuestionNotification(context.Background(), &schema.ExternalNotificationMsg{ + NewQuestionTemplateRawData: &schema.NewQuestionTemplateRawData{ + QuestionTitle: "New question", + QuestionID: "1", + Tags: []string{"go"}, + TagIDs: []string{"tag-1"}, + }, + }) + }() + + select { + case <-notifyStarted: + case <-time.After(time.Second): + t.Fatalf("plugin notification was not sent") + } + select { + case task := <-worker.tasks: + t.Fatalf("email task enqueued before plugin sync completed: %+v", task) + default: + } + close(releaseNotify) + + select { + case err := <-errCh: + if err != nil { + t.Fatalf("handleNewQuestionNotification() error = %v", err) + } + case <-time.After(time.Second): + t.Fatalf("handleNewQuestionNotification() did not return") + } + select { + case task := <-worker.tasks: + assertStringSet(t, task.UserIDs, []string{"tag-user"}) + default: + t.Fatalf("expected email task after plugin sync completed") + } +} + +func assertUniqueNewQuestionUnsubscribeCodes(t *testing.T, codes []string) { + t.Helper() + + seen := make(map[string]bool) + for _, code := range codes { + if seen[code] { + t.Fatalf("duplicate unsubscribe code %q", code) + } + seen[code] = true + } +} + +func setNewQuestionNotificationEmailSendIntervalEnv(t *testing.T, value string, set bool) { + t.Helper() + + oldValue, oldSet := os.LookupEnv(newQuestionNotificationEmailSendIntervalEnv) + if set { + if err := os.Setenv(newQuestionNotificationEmailSendIntervalEnv, value); err != nil { + t.Fatalf("set env: %v", err) + } + } else { + if err := os.Unsetenv(newQuestionNotificationEmailSendIntervalEnv); err != nil { + t.Fatalf("unset env: %v", err) + } + } + t.Cleanup(func() { + if oldSet { + _ = os.Setenv(newQuestionNotificationEmailSendIntervalEnv, oldValue) + } else { + _ = os.Unsetenv(newQuestionNotificationEmailSendIntervalEnv) + } + }) +} + +func newQuestionSubscriber(userID string, channels ...*schema.NotificationChannelConfig) *NewQuestionSubscriber { + return &NewQuestionSubscriber{ + UserID: userID, + Channels: channels, + } +} + +func newQuestionEmailChannel(enable bool) *schema.NotificationChannelConfig { + return &schema.NotificationChannelConfig{ + Key: constant.EmailChannel, + Enable: enable, + } +} + +func newQuestionNonEmailChannel(enable bool) *schema.NotificationChannelConfig { + return &schema.NotificationChannelConfig{ + Key: constant.NotificationChannelKey("inbox"), + Enable: enable, + } +} + +func newQuestionNotificationConfig( + userID string, source constant.NotificationSource, emailEnabled bool) *entity.UserNotificationConfig { + channels := schema.NotificationChannels{ + newQuestionEmailChannel(emailEnabled), + } + return &entity.UserNotificationConfig{ + UserID: userID, + Source: string(source), + Channels: channels.ToJsonString(), + Enabled: emailEnabled, + } +} + +func newQuestionNotificationTestUser(userID string) *entity.User { + return &entity.User{ + ID: userID, + Username: userID, + DisplayName: userID, + EMail: userID + "@example.com", + Status: entity.UserStatusAvailable, + MailStatus: entity.EmailStatusAvailable, + } +} + +func assertStringSet(t *testing.T, got, want []string) { + t.Helper() + + gotSet := make(map[string]bool) + for _, value := range got { + gotSet[value] = true + } + wantSet := make(map[string]bool) + for _, value := range want { + wantSet[value] = true + } + if !reflect.DeepEqual(gotSet, wantSet) { + t.Fatalf("values = %v, want %v", got, want) + } +} + +type newQuestionNotificationTestFollowRepo struct { + followersByObjectID map[string][]string +} + +func (r *newQuestionNotificationTestFollowRepo) GetFollowIDs( + context.Context, string, string) ([]string, error) { + return nil, nil +} + +func (r *newQuestionNotificationTestFollowRepo) GetFollowAmount(context.Context, string) (int, error) { + return 0, nil +} + +func (r *newQuestionNotificationTestFollowRepo) GetFollowUserIDs( + _ context.Context, objectID string) ([]string, error) { + return r.followersByObjectID[objectID], nil +} + +func (r *newQuestionNotificationTestFollowRepo) IsFollowed(context.Context, string, string) (bool, error) { + return false, nil +} + +func (r *newQuestionNotificationTestFollowRepo) MigrateFollowers( + context.Context, string, string, string) error { + return nil +} + +type newQuestionNotificationTestUserNotificationConfigRepo struct { + followedTagConfigs map[string]*entity.UserNotificationConfig + allQuestionConfigs []*entity.UserNotificationConfig +} + +func (r *newQuestionNotificationTestUserNotificationConfigRepo) Add( + context.Context, []string, string, string) error { + return nil +} + +func (r *newQuestionNotificationTestUserNotificationConfigRepo) Save( + context.Context, *entity.UserNotificationConfig) error { + return nil +} + +func (r *newQuestionNotificationTestUserNotificationConfigRepo) GetByUserID( + context.Context, string) ([]*entity.UserNotificationConfig, error) { + return nil, nil +} + +func (r *newQuestionNotificationTestUserNotificationConfigRepo) GetBySource( + _ context.Context, source constant.NotificationSource) ([]*entity.UserNotificationConfig, error) { + if source == constant.AllNewQuestionSource { + return r.allQuestionConfigs, nil + } + return nil, nil +} + +func (r *newQuestionNotificationTestUserNotificationConfigRepo) GetByUserIDAndSource( + context.Context, string, constant.NotificationSource) (*entity.UserNotificationConfig, bool, error) { + return nil, false, nil +} + +func (r *newQuestionNotificationTestUserNotificationConfigRepo) GetByUsersAndSource( + _ context.Context, userIDs []string, source constant.NotificationSource) ( + []*entity.UserNotificationConfig, error) { + if source != constant.AllNewQuestionForFollowingTagsSource { + return nil, nil + } + configs := make([]*entity.UserNotificationConfig, 0, len(userIDs)) + for _, userID := range userIDs { + if config, ok := r.followedTagConfigs[userID]; ok { + configs = append(configs, config) + } + } + return configs, nil +} + +type newQuestionNotificationTestUserRepo struct { + users map[string]*entity.User +} + +func (r *newQuestionNotificationTestUserRepo) AddUser(context.Context, *entity.User) error { + return nil +} + +func (r *newQuestionNotificationTestUserRepo) IncreaseAnswerCount(context.Context, string, int) error { + return nil +} + +func (r *newQuestionNotificationTestUserRepo) IncreaseQuestionCount(context.Context, string, int) error { + return nil +} + +func (r *newQuestionNotificationTestUserRepo) UpdateQuestionCount(context.Context, string, int64) error { + return nil +} + +func (r *newQuestionNotificationTestUserRepo) UpdateAnswerCount(context.Context, string, int) error { + return nil +} + +func (r *newQuestionNotificationTestUserRepo) UpdateLastLoginDate(context.Context, string) error { + return nil +} + +func (r *newQuestionNotificationTestUserRepo) UpdateEmailStatus(context.Context, string, int) error { + return nil +} + +func (r *newQuestionNotificationTestUserRepo) UpdateNoticeStatus(context.Context, string, int) error { + return nil +} + +func (r *newQuestionNotificationTestUserRepo) UpdateEmail(context.Context, string, string) error { + return nil +} + +func (r *newQuestionNotificationTestUserRepo) UpdateUserInterface( + context.Context, string, string, string) error { + return nil +} + +func (r *newQuestionNotificationTestUserRepo) UpdatePass(context.Context, string, string) error { + return nil +} + +func (r *newQuestionNotificationTestUserRepo) UpdateInfo(context.Context, *entity.User) error { + return nil +} + +func (r *newQuestionNotificationTestUserRepo) UpdateUserProfile(context.Context, *entity.User) error { + return nil +} + +func (r *newQuestionNotificationTestUserRepo) GetByUserID( + _ context.Context, userID string) (*entity.User, bool, error) { + user, ok := r.users[userID] + return user, ok, nil +} + +func (r *newQuestionNotificationTestUserRepo) BatchGetByID( + context.Context, []string) ([]*entity.User, error) { + return nil, nil +} + +func (r *newQuestionNotificationTestUserRepo) GetByUsername( + context.Context, string) (*entity.User, bool, error) { + return nil, false, nil +} + +func (r *newQuestionNotificationTestUserRepo) GetByUsernames( + context.Context, []string) ([]*entity.User, error) { + return nil, nil +} + +func (r *newQuestionNotificationTestUserRepo) GetByEmail( + context.Context, string) (*entity.User, bool, error) { + return nil, false, nil +} + +func (r *newQuestionNotificationTestUserRepo) GetUserCount(context.Context) (int64, error) { + return 0, nil +} + +func (r *newQuestionNotificationTestUserRepo) SearchUserListByName( + context.Context, string, int, bool) ([]*entity.User, error) { + return nil, nil +} + +func (r *newQuestionNotificationTestUserRepo) IsAvatarFileUsed(context.Context, string) (bool, error) { + return false, nil +} + +type newQuestionNotificationTestConfigRepo struct{} + +func (newQuestionNotificationTestConfigRepo) GetConfigByID( + context.Context, int) (*entity.Config, error) { + return nil, nil +} + +func (newQuestionNotificationTestConfigRepo) GetConfigByKey( + context.Context, string) (*entity.Config, error) { + config := export.EmailConfig{ + FromEmail: "noreply@answer.test", + FromName: "Answer", + } + value, _ := json.Marshal(config) + return &entity.Config{ + Value: string(value), + }, nil +} + +func (newQuestionNotificationTestConfigRepo) GetConfigByKeyFromDB( + context.Context, string) (*entity.Config, error) { + return nil, nil +} + +func (newQuestionNotificationTestConfigRepo) UpdateConfig(context.Context, string, string) error { + return nil +} + +type newQuestionNotificationTestEmailRepo struct { + codesByUserID map[string][]string +} + +func (r *newQuestionNotificationTestEmailRepo) SetCode( + _ context.Context, userID, code, _ string, _ time.Duration) error { + r.codesByUserID[userID] = append(r.codesByUserID[userID], code) + return nil +} + +func (r *newQuestionNotificationTestEmailRepo) VerifyCode(context.Context, string) (string, error) { + return "", nil +} + +func (r *newQuestionNotificationTestEmailRepo) userIDs() []string { + userIDs := make([]string, 0, len(r.codesByUserID)) + for userID := range r.codesByUserID { + userIDs = append(userIDs, userID) + } + return userIDs +} + +var ( + newQuestionNotificationTestPluginOnce sync.Once + newQuestionNotificationTestPluginInst = &newQuestionNotificationTestPlugin{} +) + +func enableNewQuestionNotificationTestPlugin( + t *testing.T, + notifyStarted chan plugin.NotificationMessage, + releaseNotify <-chan struct{}, +) { + t.Helper() + + newQuestionNotificationTestPluginInst.setChannels(notifyStarted, releaseNotify) + newQuestionNotificationTestPluginOnce.Do(func() { + plugin.Register(newQuestionNotificationTestPluginInst) + }) + plugin.StatusManager.Enable(newQuestionNotificationTestPluginInst.Info().SlugName, true) + t.Cleanup(func() { + plugin.StatusManager.Enable(newQuestionNotificationTestPluginInst.Info().SlugName, false) + newQuestionNotificationTestPluginInst.setChannels(nil, nil) + }) +} + +type newQuestionNotificationTestPlugin struct { + mu sync.Mutex + notifyStarted chan plugin.NotificationMessage + releaseNotify <-chan struct{} +} + +func (p *newQuestionNotificationTestPlugin) Info() plugin.Info { + return plugin.Info{SlugName: "new-question-notification-test-plugin"} +} + +func (p *newQuestionNotificationTestPlugin) GetNewQuestionSubscribers() []string { + return nil +} + +func (p *newQuestionNotificationTestPlugin) Notify(msg plugin.NotificationMessage) { + p.mu.Lock() + notifyStarted := p.notifyStarted + releaseNotify := p.releaseNotify + p.mu.Unlock() + + if notifyStarted != nil { + select { + case notifyStarted <- msg: + default: + } + } + if releaseNotify != nil { + <-releaseNotify + } +} + +func (p *newQuestionNotificationTestPlugin) setChannels( + notifyStarted chan plugin.NotificationMessage, + releaseNotify <-chan struct{}, +) { + p.mu.Lock() + defer p.mu.Unlock() + p.notifyStarted = notifyStarted + p.releaseNotify = releaseNotify +} + +type newQuestionNotificationTestUserExternalLoginRepo struct{} + +func (newQuestionNotificationTestUserExternalLoginRepo) AddUserExternalLogin( + context.Context, *entity.UserExternalLogin) error { + return nil +} + +func (newQuestionNotificationTestUserExternalLoginRepo) UpdateInfo( + context.Context, *entity.UserExternalLogin) error { + return nil +} + +func (newQuestionNotificationTestUserExternalLoginRepo) GetByExternalID( + context.Context, string, string) (*entity.UserExternalLogin, bool, error) { + return nil, false, nil +} + +func (newQuestionNotificationTestUserExternalLoginRepo) GetByUserID( + context.Context, string, string) (*entity.UserExternalLogin, bool, error) { + return nil, false, nil +} + +func (newQuestionNotificationTestUserExternalLoginRepo) GetUserExternalLoginList( + context.Context, string) ([]*entity.UserExternalLogin, error) { + return nil, nil +} + +func (newQuestionNotificationTestUserExternalLoginRepo) DeleteUserExternalLogin( + context.Context, string, string) error { + return nil +} + +func (newQuestionNotificationTestUserExternalLoginRepo) DeleteUserExternalLoginByUserID( + context.Context, string) error { + return nil +} + +func (newQuestionNotificationTestUserExternalLoginRepo) SetCacheUserExternalLoginInfo( + context.Context, string, *schema.ExternalLoginUserInfoCache) error { + return nil +} + +func (newQuestionNotificationTestUserExternalLoginRepo) GetCacheUserExternalLoginInfo( + context.Context, string) (*schema.ExternalLoginUserInfoCache, error) { + return nil, nil +}