(t *testing.T)
| 342 | } |
| 343 | |
| 344 | func TestDoBatch_QuorumError(t *testing.T) { |
| 345 | const ( |
| 346 | // we should run several write request to make sure we don't have any race condition on the batchTracker code |
| 347 | numberOfOperations = 10000 |
| 348 | replicationFactor = 3 |
| 349 | ) |
| 350 | |
| 351 | gen := initTokenGenerator(t) |
| 352 | |
| 353 | desc := NewDesc() |
| 354 | for address := 0; address < replicationFactor; address++ { |
| 355 | instTokens := gen.GenerateTokens(128, nil) |
| 356 | instanceID := fmt.Sprintf("%d", address) |
| 357 | desc.AddIngester(instanceID, instanceID, "", instTokens, ACTIVE, time.Now(), false, time.Time{}, nil) |
| 358 | } |
| 359 | ringConfig := Config{ |
| 360 | HeartbeatTimeout: time.Hour, |
| 361 | ReplicationFactor: replicationFactor, |
| 362 | } |
| 363 | ring, err := NewWithStoreClientAndStrategy(ringConfig, "ingester", ringKey, nil, NewDefaultReplicationStrategy(), nil, log.NewNopLogger()) |
| 364 | require.NoError(t, err) |
| 365 | ring.updateRingState(desc) |
| 366 | operationKeys := []uint32{1, 10, 100} |
| 367 | ctx := context.Background() |
| 368 | unfinishedDoBatchCalls := sync.WaitGroup{} |
| 369 | runDoBatch := func(instanceReturnErrors [replicationFactor]error, isClientError func(error) bool) error { |
| 370 | unfinishedDoBatchCalls.Add(1) |
| 371 | returnInstanceError := func(i InstanceDesc, _ []int) error { |
| 372 | instanceID, err := strconv.Atoi(i.Addr) |
| 373 | require.NoError(t, err) |
| 374 | return instanceReturnErrors[instanceID] |
| 375 | } |
| 376 | return DoBatchWithOptions(ctx, Write, ring, operationKeys, returnInstanceError, DoBatchOptions{ |
| 377 | Cleanup: func() { unfinishedDoBatchCalls.Done() }, |
| 378 | IsClientError: isClientError, |
| 379 | }) |
| 380 | } |
| 381 | |
| 382 | updateState := func(instanceIDs []string, state InstanceState) { |
| 383 | for _, instanceID := range instanceIDs { |
| 384 | inst := ring.ringDesc.Ingesters[instanceID] |
| 385 | inst.State = state |
| 386 | inst.Timestamp = time.Now().Unix() |
| 387 | ring.ringDesc.Ingesters[instanceID] = inst |
| 388 | ring.updateRingState(ring.ringDesc) |
| 389 | } |
| 390 | } |
| 391 | |
| 392 | http429Error := httpgrpc.Errorf(429, "Throttling") |
| 393 | http500Error := httpgrpc.Errorf(500, "InternalServerError") |
| 394 | mockClientError := mockError{isClientErr: true} |
| 395 | mockServerError := mockError{isClientErr: false} |
| 396 | |
| 397 | isClientErr := func(err error) bool { |
| 398 | if mockErr, ok := err.(mockError); ok { |
| 399 | return mockErr.isClientError() |
| 400 | } |
| 401 | return false |
nothing calls this directly
no test coverage detected