MCPcopy
hub / github.com/segmentio/kafka-go / TestDialerTLS

Function TestDialerTLS

dialer_test.go:173–245  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

171}
172
173func TestDialerTLS(t *testing.T) {
174 client, topic, shutdown := newLocalClientAndTopic()
175 defer shutdown()
176
177 // Write a message to ensure the partition gets created.
178 w := &Writer{
179 Addr: TCP("localhost:9092"),
180 Topic: topic,
181 Transport: client.Transport,
182 }
183 w.WriteMessages(context.Background(), Message{})
184 w.Close()
185
186 // Create an SSL proxy using the tls.Config that connects to the
187 // docker-composed kafka
188 config := tlsConfig(t)
189 l, err := tls.Listen("tcp", "127.0.0.1:", config)
190 if err != nil {
191 t.Error(err)
192 return
193 }
194 defer l.Close()
195
196 go func() {
197 for {
198 conn, err := l.Accept()
199 if err != nil {
200 return // intentionally ignored
201 }
202
203 go func(in net.Conn) {
204 out, err := net.Dial("tcp", "localhost:9092")
205 if err != nil {
206 t.Error(err)
207 return
208 }
209 defer out.Close()
210
211 go io.Copy(in, out)
212 io.Copy(out, in)
213 }(conn)
214 }
215 }()
216
217 // Use the tls.Config and connect to the SSL proxy
218 d := &Dialer{
219 TLS: config,
220 }
221 partitions, err := d.LookupPartitions(context.Background(), "tcp", l.Addr().String(), topic)
222 if err != nil {
223 t.Error(err)
224 return
225 }
226
227 // Verify returned partition data is what we expect
228 sort.Slice(partitions, func(i int, j int) bool {
229 return partitions[i].ID < partitions[j].ID
230 })

Callers

nothing calls this directly

Calls 10

WriteMessagesMethod · 0.95
CloseMethod · 0.95
LookupPartitionsMethod · 0.95
TCPFunction · 0.85
tlsConfigFunction · 0.85
newLocalClientAndTopicFunction · 0.70
ErrorMethod · 0.45
CloseMethod · 0.45
DialMethod · 0.45
StringMethod · 0.45

Tested by

no test coverage detected