Test concurrent operations and performance
(t *testing.T)
| 299 | |
| 300 | // Test concurrent operations and performance |
| 301 | func TestConcurrency(t *testing.T) { |
| 302 | setupTestEnvironment() |
| 303 | kv := testPlugin.operator |
| 304 | ctx := context.Background() |
| 305 | |
| 306 | t.Run("BasicConcurrency", func(t *testing.T) { |
| 307 | parallel := 10 |
| 308 | var wg sync.WaitGroup |
| 309 | wg.Add(parallel) |
| 310 | |
| 311 | for i := range parallel { |
| 312 | go func(index int) { |
| 313 | defer wg.Done() |
| 314 | time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond) |
| 315 | mustSet(t, kv, ctx, "concurrent", fmt.Sprintf("key%d", index), "value") |
| 316 | }(i) |
| 317 | } |
| 318 | wg.Wait() |
| 319 | |
| 320 | // Verify results |
| 321 | wg.Add(parallel) |
| 322 | for i := range parallel { |
| 323 | go func(index int) { |
| 324 | defer wg.Done() |
| 325 | time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond) |
| 326 | mustGet(t, kv, ctx, "concurrent", fmt.Sprintf("key%d", index), "value") |
| 327 | }(i) |
| 328 | } |
| 329 | wg.Wait() |
| 330 | }) |
| 331 | |
| 332 | t.Run("StressTest", func(t *testing.T) { |
| 333 | if testing.Short() { |
| 334 | t.Skip("Skipping stress test in short mode") |
| 335 | } |
| 336 | |
| 337 | totalOps := 1000 |
| 338 | workerCount := 20 |
| 339 | prefix := "stress_test" |
| 340 | opsPerWorker := totalOps / workerCount |
| 341 | |
| 342 | log.Info("Starting KV storage stress test...") |
| 343 | startTime := time.Now() |
| 344 | |
| 345 | // Concurrent write test |
| 346 | var wg sync.WaitGroup |
| 347 | errorCount := int64(0) |
| 348 | |
| 349 | for w := range workerCount { |
| 350 | wg.Add(1) |
| 351 | go func(workerID int) { |
| 352 | defer wg.Done() |
| 353 | startIdx := workerID * opsPerWorker |
| 354 | |
| 355 | for i := range opsPerWorker { |
| 356 | i := startIdx + i |
| 357 | err := kv.Set(ctx, plugin.KVParams{ |
| 358 | Group: prefix, |
nothing calls this directly
no test coverage detected