MCPcopy
hub / github.com/apache/answer / TestQueue_ConcurrentSend

Function TestQueue_ConcurrentSend

internal/base/queue/queue_test.go:161–194  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

159}
160
161func TestQueue_ConcurrentSend(t *testing.T) {
162 q := New[*testMessage]("test", 100)
163 defer q.Close()
164
165 var count atomic.Int32
166 q.RegisterHandler(func(ctx context.Context, msg *testMessage) error {
167 count.Add(1)
168 return nil
169 })
170
171 var wg sync.WaitGroup
172 numGoroutines := 10
173 messagesPerGoroutine := 100
174
175 for i := range numGoroutines {
176 wg.Add(1)
177 go func(id int) {
178 defer wg.Done()
179 for j := range messagesPerGoroutine {
180 q.Send(context.Background(), &testMessage{ID: id*1000 + j})
181 }
182 }(i)
183 }
184
185 wg.Wait()
186
187 // Wait for processing
188 time.Sleep(500 * time.Millisecond)
189
190 expected := int32(numGoroutines * messagesPerGoroutine)
191 if count.Load() != expected {
192 t.Errorf("expected %d messages, got %d", expected, count.Load())
193 }
194}
195
196func TestQueue_ConcurrentRegisterHandler(t *testing.T) {
197 q := New[*testMessage]("test", 10)

Callers

nothing calls this directly

Calls 4

CloseMethod · 0.65
RegisterHandlerMethod · 0.65
AddMethod · 0.65
SendMethod · 0.65

Tested by

no test coverage detected