DoBatchWithOptions request against a set of keys in the ring, handling replication and failures. For example if we want to write N items where they may all hit different instances, and we want them all replicated R ways with quorum writes, we track the relationship between batch RPCs and the items w
(ctx context.Context, op Operation, r DoBatchRing, keys []uint32, callback func(InstanceDesc, []int) error, o DoBatchOptions)
| 112 | // |
| 113 | // Not implemented as a method on Ring, so we can test separately. |
| 114 | func DoBatchWithOptions(ctx context.Context, op Operation, r DoBatchRing, keys []uint32, callback func(InstanceDesc, []int) error, o DoBatchOptions) error { |
| 115 | o.replaceZeroValuesWithDefaults() |
| 116 | |
| 117 | if r.InstancesCount() <= 0 { |
| 118 | o.Cleanup() |
| 119 | return fmt.Errorf("DoBatch: InstancesCount <= 0") |
| 120 | } |
| 121 | expectedTrackersPerInstance := len(keys) * (r.ReplicationFactor() + 1) / r.InstancesCount() |
| 122 | itemTrackers := make([]itemTracker, len(keys)) |
| 123 | instances := make(map[string]instance, r.InstancesCount()) |
| 124 | |
| 125 | var ( |
| 126 | bufDescs [GetBufferSize]InstanceDesc |
| 127 | bufHosts [GetBufferSize]string |
| 128 | bufZones [GetBufferSize]string |
| 129 | ) |
| 130 | for i, key := range keys { |
| 131 | // Get call below takes ~1 microsecond for ~500 instances. |
| 132 | // Checking every 10K calls would be every 10ms. |
| 133 | if i%10e3 == 0 { |
| 134 | if err := context.Cause(ctx); err != nil { |
| 135 | o.Cleanup() |
| 136 | return err |
| 137 | } |
| 138 | } |
| 139 | |
| 140 | replicationSet, err := r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones[:0]) |
| 141 | if err != nil { |
| 142 | o.Cleanup() |
| 143 | return err |
| 144 | } |
| 145 | itemTrackers[i].minSuccess = len(replicationSet.Instances) - replicationSet.MaxErrors |
| 146 | itemTrackers[i].maxFailures = replicationSet.MaxErrors |
| 147 | itemTrackers[i].remaining.Store(int32(len(replicationSet.Instances))) |
| 148 | |
| 149 | for _, desc := range replicationSet.Instances { |
| 150 | curr, found := instances[desc.Addr] |
| 151 | if !found { |
| 152 | curr.itemTrackers = make([]*itemTracker, 0, expectedTrackersPerInstance) |
| 153 | curr.indexes = make([]int, 0, expectedTrackersPerInstance) |
| 154 | } |
| 155 | instances[desc.Addr] = instance{ |
| 156 | desc: desc, |
| 157 | itemTrackers: append(curr.itemTrackers, &itemTrackers[i]), |
| 158 | indexes: append(curr.indexes, i), |
| 159 | } |
| 160 | } |
| 161 | } |
| 162 | |
| 163 | // One last check before calling the callbacks: it doesn't make sense if context is canceled. |
| 164 | if err := context.Cause(ctx); err != nil { |
| 165 | o.Cleanup() |
| 166 | return err |
| 167 | } |
| 168 | |
| 169 | tracker := batchTracker{ |
| 170 | done: make(chan struct{}, 1), |
| 171 | err: make(chan error, 1), |