Heartbeat monitors state of each shard in the ring.
(ctx context.Context, frequency time.Duration)
| 491 | |
| 492 | // Heartbeat monitors state of each shard in the ring. |
| 493 | func (c *ringSharding) Heartbeat(ctx context.Context, frequency time.Duration) { |
| 494 | ticker := time.NewTicker(frequency) |
| 495 | defer ticker.Stop() |
| 496 | |
| 497 | for { |
| 498 | select { |
| 499 | case <-ticker.C: |
| 500 | var rebalance bool |
| 501 | |
| 502 | // note: `c.List()` return a shadow copy of `[]*ringShard`. |
| 503 | for _, shard := range c.List() { |
| 504 | isUp := c.opt.HeartbeatFn(ctx, shard.Client) |
| 505 | if shard.Vote(isUp) { |
| 506 | internal.Logger.Printf(ctx, "ring shard state changed: %s", shard) |
| 507 | rebalance = true |
| 508 | } |
| 509 | } |
| 510 | |
| 511 | if rebalance { |
| 512 | c.mu.Lock() |
| 513 | c.rebalanceLocked() |
| 514 | c.mu.Unlock() |
| 515 | } |
| 516 | case <-ctx.Done(): |
| 517 | return |
| 518 | } |
| 519 | } |
| 520 | } |
| 521 | |
| 522 | // rebalanceLocked removes dead shards from the Ring. |
| 523 | // Requires c.mu locked. |