| 72 | } |
| 73 | |
| 74 | func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed { |
| 75 | kvClient, err := kv.NewClient(rep.kvConfig, JSONCodec, nil, rep.logger) |
| 76 | if err != nil { |
| 77 | level.Warn(rep.logger).Log("msg", "failed to create kv client", "err", err) |
| 78 | return nil |
| 79 | } |
| 80 | // Try to become leader via the kv client |
| 81 | backoff := backoff.New(ctx, rep.conf.Backoff) |
| 82 | for backoff.Ongoing() { |
| 83 | // create a new cluster seed |
| 84 | seed := ClusterSeed{ |
| 85 | UID: uuid.NewString(), |
| 86 | PrometheusVersion: build.GetVersion(), |
| 87 | CreatedAt: time.Now(), |
| 88 | } |
| 89 | copySeed := seed |
| 90 | if err := kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) { |
| 91 | // The key is already set, so we don't need to do anything |
| 92 | if in != nil { |
| 93 | if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed != nil && kvSeed.UID != copySeed.UID { |
| 94 | copySeed = *kvSeed |
| 95 | return nil, false, nil |
| 96 | } |
| 97 | } |
| 98 | return ©Seed, true, nil |
| 99 | }); err != nil { |
| 100 | level.Warn(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) |
| 101 | continue |
| 102 | } |
| 103 | // ensure stability of the cluster seed |
| 104 | stableSeed := ensureStableKey(ctx, kvClient, rep.logger) |
| 105 | if stableSeed == nil { |
| 106 | return nil |
| 107 | } |
| 108 | |
| 109 | seed = *stableSeed |
| 110 | // Fetch the remote cluster seed. |
| 111 | remoteSeed, err := rep.fetchSeed(ctx, |
| 112 | func(err error) bool { |
| 113 | // we only want to retry if the error is not an object not found error or a bad see file error |
| 114 | return !errors.Is(err, backend.ErrDoesNotExist) && !errors.Is(err, backend.ErrBadSeedFile) |
| 115 | }) |
| 116 | if err != nil { |
| 117 | if errors.Is(err, backend.ErrDoesNotExist) || errors.Is(err, backend.ErrBadSeedFile) { |
| 118 | // we are the leader and we need to save the file. |
| 119 | if err := rep.writeSeedFile(ctx, seed); err != nil { |
| 120 | level.Warn(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) |
| 121 | backoff.Wait() |
| 122 | continue |
| 123 | } |
| 124 | return &seed |
| 125 | } |
| 126 | backoff.Wait() |
| 127 | continue |
| 128 | } |
| 129 | return remoteSeed |
| 130 | } |
| 131 | return nil |