(ctx context.Context)
| 607 | } |
| 608 | |
| 609 | func (m *KV) starting(ctx context.Context) error { |
| 610 | mlCfg, err := m.buildMemberlistConfig() |
| 611 | if err != nil { |
| 612 | return err |
| 613 | } |
| 614 | |
| 615 | // Wait for memberlist and broadcasts fields creation because |
| 616 | // memberlist may start calling delegate methods if it |
| 617 | // receives traffic. |
| 618 | // See https://godoc.org/github.com/hashicorp/memberlist#Delegate |
| 619 | // |
| 620 | // Note: We cannot check for Starting state, as we want to use delegate during cluster joining process |
| 621 | // that happens in Starting state. |
| 622 | list, err := memberlist.Create(mlCfg) |
| 623 | if err != nil { |
| 624 | return fmt.Errorf("failed to create memberlist: %v", err) |
| 625 | } |
| 626 | // Finish delegate initialization. |
| 627 | m.memberlist = list |
| 628 | m.localBroadcasts = &memberlist.TransmitLimitedQueue{ |
| 629 | NumNodes: list.NumMembers, |
| 630 | RetransmitMult: mlCfg.RetransmitMult, |
| 631 | } |
| 632 | m.gossipBroadcasts = &memberlist.TransmitLimitedQueue{ |
| 633 | NumNodes: list.NumMembers, |
| 634 | RetransmitMult: mlCfg.RetransmitMult, |
| 635 | } |
| 636 | m.delegateReady.Store(true) |
| 637 | |
| 638 | // Try to fast-join memberlist cluster in Starting state, so that we don't start with empty KV store. |
| 639 | if len(m.cfg.JoinMembers) > 0 { |
| 640 | if err := m.fastJoinMembersOnStartup(ctx); err != nil { |
| 641 | level.Error(m.logger).Log("msg", "failed to fast-join the memberlist cluster at startup", "err", err) |
| 642 | |
| 643 | if m.cfg.AbortIfFastJoinFails { |
| 644 | return fmt.Errorf("failed to fast-join the memberlist cluster at startup: %w", err) |
| 645 | } |
| 646 | } |
| 647 | } |
| 648 | |
| 649 | return nil |
| 650 | } |
| 651 | |
| 652 | var errFailedToJoinCluster = errors.New("failed to join memberlist cluster on startup") |
| 653 |
nothing calls this directly
no test coverage detected