(ctx context.Context, keys []string, opts ...memcache.Option)
| 674 | } |
| 675 | |
| 676 | func (c *MemcachedClient) getMultiBatched(ctx context.Context, keys []string, opts ...memcache.Option) ([]map[string]*memcache.Item, error) { |
| 677 | // Do not batch if the input keys are less than the max batch size. |
| 678 | if (c.config.MaxGetMultiBatchSize <= 0) || (len(keys) <= c.config.MaxGetMultiBatchSize) { |
| 679 | // Even if we're not splitting the input into batches, make sure that our single request |
| 680 | // still counts against the concurrency limit. |
| 681 | if c.config.MaxGetMultiConcurrency > 0 { |
| 682 | if err := c.getMultiGate.Start(ctx); err != nil { |
| 683 | return nil, errors.Wrapf(err, "failed to wait for turn. Instance: %s", c.name) |
| 684 | } |
| 685 | |
| 686 | defer c.getMultiGate.Done() |
| 687 | } |
| 688 | |
| 689 | items, err := c.getMultiSingle(ctx, keys, opts...) |
| 690 | if err != nil { |
| 691 | return nil, err |
| 692 | } |
| 693 | |
| 694 | return []map[string]*memcache.Item{items}, nil |
| 695 | } |
| 696 | |
| 697 | // Calculate the number of expected results. |
| 698 | batchSize := c.config.MaxGetMultiBatchSize |
| 699 | numResults := len(keys) / batchSize |
| 700 | if len(keys)%batchSize != 0 { |
| 701 | numResults++ |
| 702 | } |
| 703 | |
| 704 | // If max concurrency is disabled, use a nil gate for the doWithBatch method which will |
| 705 | // not apply any limit to the number goroutines started to make batch requests in that case. |
| 706 | var getMultiGate gate.Gate |
| 707 | if c.config.MaxGetMultiConcurrency > 0 { |
| 708 | getMultiGate = c.getMultiGate |
| 709 | } |
| 710 | |
| 711 | // Sort keys based on which memcached server they will be sharded to. Sorting keys that |
| 712 | // are on the same server together before splitting into batches reduces the number of |
| 713 | // connections required and increases the number of "gets" per connection. |
| 714 | sortedKeys := c.sortKeysByServer(keys) |
| 715 | |
| 716 | // Allocate a channel to store results for each batch request. The max concurrency will be |
| 717 | // enforced by doWithBatch. |
| 718 | results := make(chan *memcachedGetMultiResult, numResults) |
| 719 | defer close(results) |
| 720 | |
| 721 | // Ignore the error here since it can only be returned by our provided function which |
| 722 | // always returns nil. NOTE also we are using a background context here for the doWithBatch |
| 723 | // method. This is to ensure that it runs the expected number of batches _even if_ our |
| 724 | // context (`ctx`) is canceled since we expect a certain number of batches to be read |
| 725 | // from `results` below. The wrapped `getMultiSingle` method will still check our context |
| 726 | // and short-circuit if it has been canceled. |
| 727 | _ = doWithBatch(context.Background(), len(keys), c.config.MaxGetMultiBatchSize, getMultiGate, func(startIndex, endIndex int) error { |
| 728 | batchKeys := sortedKeys[startIndex:endIndex] |
| 729 | |
| 730 | res := &memcachedGetMultiResult{} |
| 731 | res.items, res.err = c.getMultiSingle(ctx, batchKeys, opts...) |
| 732 | |
| 733 | results <- res |
no test coverage detected