MCPcopy
hub / github.com/grafana/dskit / getMultiBatched

Method getMultiBatched

cache/memcached_client.go:676–753  ·  view source on GitHub ↗
(ctx context.Context, keys []string, opts ...memcache.Option)

Source from the content-addressed store, hash-verified

674}
675
676func (c *MemcachedClient) getMultiBatched(ctx context.Context, keys []string, opts ...memcache.Option) ([]map[string]*memcache.Item, error) {
677 // Do not batch if the input keys are less than the max batch size.
678 if (c.config.MaxGetMultiBatchSize <= 0) || (len(keys) <= c.config.MaxGetMultiBatchSize) {
679 // Even if we're not splitting the input into batches, make sure that our single request
680 // still counts against the concurrency limit.
681 if c.config.MaxGetMultiConcurrency > 0 {
682 if err := c.getMultiGate.Start(ctx); err != nil {
683 return nil, errors.Wrapf(err, "failed to wait for turn. Instance: %s", c.name)
684 }
685
686 defer c.getMultiGate.Done()
687 }
688
689 items, err := c.getMultiSingle(ctx, keys, opts...)
690 if err != nil {
691 return nil, err
692 }
693
694 return []map[string]*memcache.Item{items}, nil
695 }
696
697 // Calculate the number of expected results.
698 batchSize := c.config.MaxGetMultiBatchSize
699 numResults := len(keys) / batchSize
700 if len(keys)%batchSize != 0 {
701 numResults++
702 }
703
704 // If max concurrency is disabled, use a nil gate for the doWithBatch method which will
705 // not apply any limit to the number goroutines started to make batch requests in that case.
706 var getMultiGate gate.Gate
707 if c.config.MaxGetMultiConcurrency > 0 {
708 getMultiGate = c.getMultiGate
709 }
710
711 // Sort keys based on which memcached server they will be sharded to. Sorting keys that
712 // are on the same server together before splitting into batches reduces the number of
713 // connections required and increases the number of "gets" per connection.
714 sortedKeys := c.sortKeysByServer(keys)
715
716 // Allocate a channel to store results for each batch request. The max concurrency will be
717 // enforced by doWithBatch.
718 results := make(chan *memcachedGetMultiResult, numResults)
719 defer close(results)
720
721 // Ignore the error here since it can only be returned by our provided function which
722 // always returns nil. NOTE also we are using a background context here for the doWithBatch
723 // method. This is to ensure that it runs the expected number of batches _even if_ our
724 // context (`ctx`) is canceled since we expect a certain number of batches to be read
725 // from `results` below. The wrapped `getMultiSingle` method will still check our context
726 // and short-circuit if it has been canceled.
727 _ = doWithBatch(context.Background(), len(keys), c.config.MaxGetMultiBatchSize, getMultiGate, func(startIndex, endIndex int) error {
728 batchKeys := sortedKeys[startIndex:endIndex]
729
730 res := &memcachedGetMultiResult{}
731 res.items, res.err = c.getMultiSingle(ctx, batchKeys, opts...)
732
733 results <- res

Callers 1

GetMultiWithErrorMethod · 0.95

Calls 5

getMultiSingleMethod · 0.95
sortKeysByServerMethod · 0.95
doWithBatchFunction · 0.85
StartMethod · 0.65
DoneMethod · 0.65

Tested by

no test coverage detected