| 170 | } |
| 171 | |
| 172 | func TestWatchPrefix(t *testing.T) { |
| 173 | withFixtures(t, func(t *testing.T, client Client) { |
| 174 | const prefix = "test/" |
| 175 | const prefix2 = "ignore/" |
| 176 | |
| 177 | // We are going to generate this number of updates, sleeping between each update. |
| 178 | const max = 100 |
| 179 | const sleep = time.Millisecond * 10 |
| 180 | // etcd seems to be quite slow. If we finish faster, test will end sooner. |
| 181 | // (We regularly see generators taking up to 5 seconds to produce all messages on some platforms!) |
| 182 | const totalTestTimeout = 10 * time.Second |
| 183 | |
| 184 | observedKeysCh := make(chan string, max) |
| 185 | |
| 186 | ctx, cancel := context.WithCancel(context.Background()) |
| 187 | defer cancel() |
| 188 | |
| 189 | wg := sync.WaitGroup{} |
| 190 | |
| 191 | wg.Add(1) |
| 192 | go func() { |
| 193 | defer wg.Done() |
| 194 | |
| 195 | // start watching before we even start generating values. values will be buffered |
| 196 | client.WatchPrefix(ctx, prefix, func(key string, _ interface{}) bool { |
| 197 | observedKeysCh <- key |
| 198 | return true |
| 199 | }) |
| 200 | }() |
| 201 | |
| 202 | gen := func(p string) { |
| 203 | defer wg.Done() |
| 204 | |
| 205 | start := time.Now() |
| 206 | for i := 0; i < max && ctx.Err() == nil; i++ { |
| 207 | // Start with sleeping, so that watching client can see empty KV store at the beginning. |
| 208 | time.Sleep(sleep) |
| 209 | |
| 210 | key := fmt.Sprintf("%s%d", p, i) |
| 211 | err := client.CAS(ctx, key, func(interface{}) (out interface{}, retry bool, err error) { |
| 212 | return key, true, nil |
| 213 | }) |
| 214 | |
| 215 | if ctx.Err() != nil { |
| 216 | break |
| 217 | } |
| 218 | require.NoError(t, err) |
| 219 | } |
| 220 | t.Log("Generator finished in", time.Since(start)) |
| 221 | } |
| 222 | |
| 223 | wg.Add(2) |
| 224 | go gen(prefix) |
| 225 | go gen(prefix2) // we don't want to see these keys reported |
| 226 | |
| 227 | observedKeys := map[string]int{} |
| 228 | |
| 229 | totalDeadline := time.After(totalTestTimeout) |