DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds. If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new stub that is created with the provided server address and dial options.
(ctx context.Context, soakConfig SoakTestConfig)
| 149 | // stub that is created with the provided server address and dial options. |
| 150 | // TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method. |
| 151 | func DoSoakTest(ctx context.Context, soakConfig SoakTestConfig) { |
| 152 | if soakConfig.Iterations%soakConfig.NumWorkers != 0 { |
| 153 | fmt.Fprintf(os.Stderr, "soakIterations must be evenly divisible by soakNumWThreads\n") |
| 154 | } |
| 155 | startTime := time.Now() |
| 156 | var wg sync.WaitGroup |
| 157 | soakWorkerResults := make([]SoakWorkerResults, soakConfig.NumWorkers) |
| 158 | for i := 0; i < soakConfig.NumWorkers; i++ { |
| 159 | wg.Add(1) |
| 160 | go func(workerID int) { |
| 161 | defer wg.Done() |
| 162 | executeSoakTestInWorker(ctx, soakConfig, startTime, workerID, &soakWorkerResults[workerID]) |
| 163 | }(i) |
| 164 | } |
| 165 | // Wait for all goroutines to complete. |
| 166 | wg.Wait() |
| 167 | |
| 168 | // Handle results. |
| 169 | totalSuccesses := 0 |
| 170 | totalFailures := 0 |
| 171 | latencies := stats.NewHistogram(stats.HistogramOptions{ |
| 172 | NumBuckets: 20, |
| 173 | GrowthFactor: 1, |
| 174 | BaseBucketSize: 1, |
| 175 | MinValue: 0, |
| 176 | }) |
| 177 | for _, worker := range soakWorkerResults { |
| 178 | totalSuccesses += worker.iterationsSucceeded |
| 179 | totalFailures += worker.Failures |
| 180 | if worker.Latencies != nil { |
| 181 | // Add latencies from the worker's Histogram to the main latencies. |
| 182 | latencies.Merge(worker.Latencies) |
| 183 | } |
| 184 | } |
| 185 | var b bytes.Buffer |
| 186 | totalIterations := totalSuccesses + totalFailures |
| 187 | latencies.Print(&b) |
| 188 | fmt.Fprintf(os.Stderr, |
| 189 | "(server_uri: %s) soak test successes: %d / %d iterations. Total failures: %d. Latencies in milliseconds: %s\n", |
| 190 | soakConfig.ServerAddr, totalSuccesses, soakConfig.Iterations, totalFailures, b.String()) |
| 191 | |
| 192 | if totalIterations != soakConfig.Iterations { |
| 193 | logger.Fatalf("Soak test consumed all %v of time and quit early, ran %d out of %d iterations.\n", soakConfig.OverallTimeout, totalIterations, soakConfig.Iterations) |
| 194 | } |
| 195 | |
| 196 | if totalFailures > soakConfig.MaxFailures { |
| 197 | logger.Fatalf("Soak test total failures: %d exceeded max failures threshold: %d\n", totalFailures, soakConfig.MaxFailures) |
| 198 | } |
| 199 | if soakConfig.ChannelForTest != nil { |
| 200 | _, cleanup := soakConfig.ChannelForTest() |
| 201 | defer cleanup() |
| 202 | } |
| 203 | } |