MCPcopy
hub / github.com/grafana/dskit / TestDoBatch_QuorumError

Function TestDoBatch_QuorumError

ring/ring_test.go:344–507  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

342}
343
344func TestDoBatch_QuorumError(t *testing.T) {
345 const (
346 // we should run several write request to make sure we don't have any race condition on the batchTracker code
347 numberOfOperations = 10000
348 replicationFactor = 3
349 )
350
351 gen := initTokenGenerator(t)
352
353 desc := NewDesc()
354 for address := 0; address < replicationFactor; address++ {
355 instTokens := gen.GenerateTokens(128, nil)
356 instanceID := fmt.Sprintf("%d", address)
357 desc.AddIngester(instanceID, instanceID, "", instTokens, ACTIVE, time.Now(), false, time.Time{}, nil)
358 }
359 ringConfig := Config{
360 HeartbeatTimeout: time.Hour,
361 ReplicationFactor: replicationFactor,
362 }
363 ring, err := NewWithStoreClientAndStrategy(ringConfig, "ingester", ringKey, nil, NewDefaultReplicationStrategy(), nil, log.NewNopLogger())
364 require.NoError(t, err)
365 ring.updateRingState(desc)
366 operationKeys := []uint32{1, 10, 100}
367 ctx := context.Background()
368 unfinishedDoBatchCalls := sync.WaitGroup{}
369 runDoBatch := func(instanceReturnErrors [replicationFactor]error, isClientError func(error) bool) error {
370 unfinishedDoBatchCalls.Add(1)
371 returnInstanceError := func(i InstanceDesc, _ []int) error {
372 instanceID, err := strconv.Atoi(i.Addr)
373 require.NoError(t, err)
374 return instanceReturnErrors[instanceID]
375 }
376 return DoBatchWithOptions(ctx, Write, ring, operationKeys, returnInstanceError, DoBatchOptions{
377 Cleanup: func() { unfinishedDoBatchCalls.Done() },
378 IsClientError: isClientError,
379 })
380 }
381
382 updateState := func(instanceIDs []string, state InstanceState) {
383 for _, instanceID := range instanceIDs {
384 inst := ring.ringDesc.Ingesters[instanceID]
385 inst.State = state
386 inst.Timestamp = time.Now().Unix()
387 ring.ringDesc.Ingesters[instanceID] = inst
388 ring.updateRingState(ring.ringDesc)
389 }
390 }
391
392 http429Error := httpgrpc.Errorf(429, "Throttling")
393 http500Error := httpgrpc.Errorf(500, "InternalServerError")
394 mockClientError := mockError{isClientErr: true}
395 mockServerError := mockError{isClientErr: false}
396
397 isClientErr := func(err error) bool {
398 if mockErr, ok := err.(mockError); ok {
399 return mockErr.isClientError()
400 }
401 return false

Callers

nothing calls this directly

Calls 14

AddIngesterMethod · 0.95
ErrorfFunction · 0.92
initTokenGeneratorFunction · 0.85
NewDescFunction · 0.85
DoBatchWithOptionsFunction · 0.85
updateRingStateMethod · 0.80
isClientErrorMethod · 0.80
RunMethod · 0.80
GenerateTokensMethod · 0.65
AddMethod · 0.65

Tested by

no test coverage detected