(kv *Client, name string, ringKey string, portToConnect int, casInterval time.Duration, start <-chan struct{}, stop <-chan struct{})
| 1394 | } |
| 1395 | |
| 1396 | func runClient(kv *Client, name string, ringKey string, portToConnect int, casInterval time.Duration, start <-chan struct{}, stop <-chan struct{}) error { |
| 1397 | // stop gossipping about the ring(s) |
| 1398 | defer services.StopAndAwaitTerminated(context.Background(), kv.kv) //nolint:errcheck |
| 1399 | |
| 1400 | for { |
| 1401 | select { |
| 1402 | case <-start: |
| 1403 | start = nil |
| 1404 | |
| 1405 | // let's join the first member |
| 1406 | if portToConnect > 0 { |
| 1407 | _, err := kv.kv.JoinMembers([]string{net.JoinHostPort(getLocalhostAddr(), strconv.Itoa(portToConnect))}) |
| 1408 | if err != nil { |
| 1409 | return fmt.Errorf("%s failed to join the cluster: %f", name, err) |
| 1410 | } |
| 1411 | } |
| 1412 | case <-stop: |
| 1413 | return nil |
| 1414 | case <-time.After(casInterval): |
| 1415 | err := cas(kv, ringKey, updateFn(name)) |
| 1416 | if err != nil { |
| 1417 | return fmt.Errorf("failed to cas the ring: %f", err) |
| 1418 | } |
| 1419 | } |
| 1420 | } |
| 1421 | } |
| 1422 | |
| 1423 | // avoid dependency on ring package |
| 1424 | func generateTokens(numTokens int) []uint32 { |
no test coverage detected