StartManagementServer initializes a management server which implements the AggregatedDiscoveryService endpoint. The management server is initialized with no resources. Tests should call the Update() method to change the resource snapshot held by the management server, as per by the test logic. Regi
(t *testing.T, opts ManagementServerOptions)
| 131 | // |
| 132 | // Registers a cleanup function on t to stop the management server. |
| 133 | func StartManagementServer(t *testing.T, opts ManagementServerOptions) *ManagementServer { |
| 134 | t.Helper() |
| 135 | |
| 136 | // Create a snapshot cache. The first parameter to NewSnapshotCache() |
| 137 | // controls whether the server should wait for all resources to be |
| 138 | // explicitly named in the request before responding to any of them. |
| 139 | wait := !opts.AllowResourceSubset |
| 140 | cache := v3cache.NewSnapshotCache(wait, v3cache.IDHash{}, serverLogger{t}) |
| 141 | t.Logf("Created new snapshot cache...") |
| 142 | |
| 143 | lis := opts.Listener |
| 144 | if lis == nil { |
| 145 | var err error |
| 146 | lis, err = net.Listen("tcp", "localhost:0") |
| 147 | if err != nil { |
| 148 | t.Fatalf("Failed to listen on localhost:0: %v", err) |
| 149 | } |
| 150 | } |
| 151 | |
| 152 | // Cancelling the context passed to the server is the only way of stopping it |
| 153 | // at the end of the test. |
| 154 | ctx, cancel := context.WithCancel(context.Background()) |
| 155 | callbacks := v3server.CallbackFuncs{ |
| 156 | StreamOpenFunc: opts.OnStreamOpen, |
| 157 | StreamClosedFunc: opts.OnStreamClosed, |
| 158 | StreamRequestFunc: opts.OnStreamRequest, |
| 159 | StreamResponseFunc: opts.OnStreamResponse, |
| 160 | } |
| 161 | |
| 162 | // Create an xDS management server and register the ADS implementation |
| 163 | // provided by it on a gRPC server. |
| 164 | xs := v3server.NewServer(ctx, cache, callbacks) |
| 165 | gs := grpc.NewServer() |
| 166 | v3discoverygrpc.RegisterAggregatedDiscoveryServiceServer(gs, xs) |
| 167 | t.Logf("Registered Aggregated Discovery Service (ADS)...") |
| 168 | |
| 169 | mgmtServer := &ManagementServer{ |
| 170 | Address: lis.Addr().String(), |
| 171 | cancel: cancel, |
| 172 | version: 0, |
| 173 | gs: gs, |
| 174 | xs: xs, |
| 175 | cache: cache, |
| 176 | logger: t, |
| 177 | } |
| 178 | if opts.SupportLoadReportingService { |
| 179 | lrs := fakeserver.NewServer(lis.Addr().String()) |
| 180 | v3lrsgrpc.RegisterLoadReportingServiceServer(gs, lrs) |
| 181 | mgmtServer.LRSServer = lrs |
| 182 | t.Logf("Registered Load Reporting Service (LRS)...") |
| 183 | } |
| 184 | |
| 185 | // Start serving. |
| 186 | go gs.Serve(lis) |
| 187 | t.Logf("xDS management server serving at: %v...", lis.Addr().String()) |
| 188 | t.Cleanup(mgmtServer.Stop) |
| 189 | return mgmtServer |
| 190 | } |