(t *testing.T, ctx context.Context, r *Reader, msgs ...Message)
| 502 | } |
| 503 | |
| 504 | func prepareReader(t *testing.T, ctx context.Context, r *Reader, msgs ...Message) { |
| 505 | config := r.Config() |
| 506 | var conn *Conn |
| 507 | var err error |
| 508 | |
| 509 | for { |
| 510 | if conn, err = DialLeader(ctx, "tcp", "localhost:9092", config.Topic, config.Partition); err == nil { |
| 511 | break |
| 512 | } |
| 513 | select { |
| 514 | case <-time.After(time.Second): |
| 515 | case <-ctx.Done(): |
| 516 | t.Fatal(ctx.Err()) |
| 517 | } |
| 518 | } |
| 519 | |
| 520 | defer conn.Close() |
| 521 | |
| 522 | if _, err := conn.WriteMessages(msgs...); err != nil { |
| 523 | t.Fatal(err) |
| 524 | } |
| 525 | } |
| 526 | |
| 527 | var ( |
| 528 | benchmarkReaderOnce sync.Once |
no test coverage detected