MCPcopy
hub / github.com/segmentio/kafka-go / testDialerLookupPartitions

Function testDialerLookupPartitions

dialer_test.go:39–75  ·  view source on GitHub ↗
(t *testing.T, ctx context.Context, d *Dialer)

Source from the content-addressed store, hash-verified

37}
38
39func testDialerLookupPartitions(t *testing.T, ctx context.Context, d *Dialer) {
40 client, topic, shutdown := newLocalClientAndTopic()
41 defer shutdown()
42
43 // Write a message to ensure the partition gets created.
44 w := &Writer{
45 Addr: TCP("localhost:9092"),
46 Topic: topic,
47 Transport: client.Transport,
48 }
49 w.WriteMessages(ctx, Message{})
50 w.Close()
51
52 partitions, err := d.LookupPartitions(ctx, "tcp", "localhost:9092", topic)
53 if err != nil {
54 t.Error(err)
55 return
56 }
57
58 sort.Slice(partitions, func(i int, j int) bool {
59 return partitions[i].ID < partitions[j].ID
60 })
61
62 want := []Partition{
63 {
64 Topic: topic,
65 Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
66 Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
67 Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
68 OfflineReplicas: []Broker{},
69 ID: 0,
70 },
71 }
72 if !reflect.DeepEqual(partitions, want) {
73 t.Errorf("bad partitions:\ngot: %+v\nwant: %+v", partitions, want)
74 }
75}
76
77func tlsConfig(t *testing.T) *tls.Config {
78 const (

Callers

nothing calls this directly

Calls 6

WriteMessagesMethod · 0.95
CloseMethod · 0.95
TCPFunction · 0.85
LookupPartitionsMethod · 0.80
newLocalClientAndTopicFunction · 0.70
ErrorMethod · 0.45

Tested by

no test coverage detected