NewPartitionRingWithOptions creates a new PartitionRing with custom options.
(desc PartitionRingDesc, opts PartitionRingOptions)
| 80 | |
| 81 | // NewPartitionRingWithOptions creates a new PartitionRing with custom options. |
| 82 | func NewPartitionRingWithOptions(desc PartitionRingDesc, opts PartitionRingOptions) (*PartitionRing, error) { |
| 83 | shuffleShardCache, err := newPartitionRingShuffleShardCache(opts.ShuffleShardCacheSize) |
| 84 | if err != nil { |
| 85 | return nil, fmt.Errorf("failed to create shuffle shard cache: %w", err) |
| 86 | } |
| 87 | |
| 88 | ringTokens := desc.tokens() |
| 89 | partitionByToken := desc.partitionByToken() |
| 90 | ringPartitionIDs, ringPartitionActive, err := buildRingTokenPartitionLookups(ringTokens, partitionByToken, desc.Partitions) |
| 91 | if err != nil { |
| 92 | return nil, err |
| 93 | } |
| 94 | |
| 95 | return &PartitionRing{ |
| 96 | desc: desc, |
| 97 | ringTokens: ringTokens, |
| 98 | partitionByToken: partitionByToken, |
| 99 | ringPartitionIDs: ringPartitionIDs, |
| 100 | ringPartitionActive: ringPartitionActive, |
| 101 | ownersByPartition: desc.ownersByPartition(), |
| 102 | activePartitionsCount: desc.activePartitionsCount(), |
| 103 | maxPartitionID: desc.maxPartitionID(), |
| 104 | shuffleShardCache: shuffleShardCache, |
| 105 | opts: opts, |
| 106 | }, nil |
| 107 | } |
| 108 | |
| 109 | // buildRingTokenPartitionLookups builds two slices parallel to ringTokens: |
| 110 | // - ringPartitionIDs[i] is the partition ID that owns ringTokens[i] |