MCPcopy
hub / github.com/grpc/grpc-go / TestProducer

Method TestProducer

orca/producer_test.go:122–215  ·  orca/producer_test.go::s.TestProducer

TestProducer is a basic, end-to-end style test of an LB policy with an OOBListener communicating with a server with an ORCA service.

(t *testing.T)

Source from the content-addressed store, hash-verified

120// TestProducer is a basic, end-to-end style test of an LB policy with an
121// OOBListener communicating with a server with an ORCA service.
122func (s) TestProducer(t *testing.T) {
123 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
124 defer cancel()
125
126 // Use a fixed backoff for stream recreation.
127 oldBackoff := internal.DefaultBackoffFunc
128 internal.DefaultBackoffFunc = func(int) time.Duration { return 10 * time.Millisecond }
129 defer func() { internal.DefaultBackoffFunc = oldBackoff }()
130
131 // Initialize listener for our ORCA server.
132 lis, err := testutils.LocalTCPListener()
133 if err != nil {
134 t.Fatal(err)
135 }
136
137 // Register the OpenRCAService with a very short metrics reporting interval.
138 const shortReportingInterval = 50 * time.Millisecond
139 smr := orca.NewServerMetricsRecorder()
140 opts := orca.ServiceOptions{MinReportingInterval: shortReportingInterval, ServerMetricsProvider: smr}
141 internal.AllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&opts)
142 s := grpc.NewServer()
143 if err := orca.Register(s, opts); err != nil {
144 t.Fatalf("orca.Register failed: %v", err)
145 }
146 go s.Serve(lis)
147 defer s.Stop()
148
149 // Create our client with an OOB listener in the LB policy it selects.
150 r := manual.NewBuilderWithScheme("whatever")
151 oobLis := newTestOOBListener()
152
153 lisOpts := orca.OOBListenerOptions{ReportInterval: 50 * time.Millisecond}
154 li := &listenerInfo{scChan: make(chan balancer.SubConn, 1), listener: oobLis, opts: lisOpts}
155 addr := setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)
156 r.InitialState(resolver.State{Addresses: []resolver.Address{addr}})
157 dopts := []grpc.DialOption{
158 grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`),
159 grpc.WithResolvers(r),
160 grpc.WithTransportCredentials(insecure.NewCredentials()),
161 }
162 cc, err := grpc.NewClient("whatever:///whatever", dopts...)
163 if err != nil {
164 t.Fatalf("grpc.NewClient() failed: %v", err)
165 }
166 defer cc.Close()
167 cc.Connect()
168 // Set a few metrics and wait for them on the client side.
169 smr.SetCPUUtilization(10)
170 smr.SetMemoryUtilization(0.1)
171 smr.SetNamedUtilization("bob", 0.555)
172 loadReportWant := &v3orcapb.OrcaLoadReport{
173 CpuUtilization: 10,
174 MemUtilization: 0.1,
175 Utilization: map[string]float64{"bob": 0.555},
176 }
177
178testReport:
179 for {

Callers

nothing calls this directly

Calls 15

SetCPUUtilizationMethod · 0.95
SetMemoryUtilizationMethod · 0.95
SetNamedUtilizationMethod · 0.95
LocalTCPListenerFunction · 0.92
NewServerMetricsRecorderFunction · 0.92
NewServerFunction · 0.92
RegisterFunction · 0.92
NewBuilderWithSchemeFunction · 0.92
WithDefaultServiceConfigFunction · 0.92
WithResolversFunction · 0.92
WithTransportCredentialsFunction · 0.92
NewCredentialsFunction · 0.92

Tested by

no test coverage detected