(t *testing.T)
| 159 | } |
| 160 | |
| 161 | func TestQueue_ConcurrentSend(t *testing.T) { |
| 162 | q := New[*testMessage]("test", 100) |
| 163 | defer q.Close() |
| 164 | |
| 165 | var count atomic.Int32 |
| 166 | q.RegisterHandler(func(ctx context.Context, msg *testMessage) error { |
| 167 | count.Add(1) |
| 168 | return nil |
| 169 | }) |
| 170 | |
| 171 | var wg sync.WaitGroup |
| 172 | numGoroutines := 10 |
| 173 | messagesPerGoroutine := 100 |
| 174 | |
| 175 | for i := range numGoroutines { |
| 176 | wg.Add(1) |
| 177 | go func(id int) { |
| 178 | defer wg.Done() |
| 179 | for j := range messagesPerGoroutine { |
| 180 | q.Send(context.Background(), &testMessage{ID: id*1000 + j}) |
| 181 | } |
| 182 | }(i) |
| 183 | } |
| 184 | |
| 185 | wg.Wait() |
| 186 | |
| 187 | // Wait for processing |
| 188 | time.Sleep(500 * time.Millisecond) |
| 189 | |
| 190 | expected := int32(numGoroutines * messagesPerGoroutine) |
| 191 | if count.Load() != expected { |
| 192 | t.Errorf("expected %d messages, got %d", expected, count.Load()) |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | func TestQueue_ConcurrentRegisterHandler(t *testing.T) { |
| 197 | q := New[*testMessage]("test", 10) |
nothing calls this directly
no test coverage detected