(t *testing.T)
| 171 | } |
| 172 | |
| 173 | func TestDialerTLS(t *testing.T) { |
| 174 | client, topic, shutdown := newLocalClientAndTopic() |
| 175 | defer shutdown() |
| 176 | |
| 177 | // Write a message to ensure the partition gets created. |
| 178 | w := &Writer{ |
| 179 | Addr: TCP("localhost:9092"), |
| 180 | Topic: topic, |
| 181 | Transport: client.Transport, |
| 182 | } |
| 183 | w.WriteMessages(context.Background(), Message{}) |
| 184 | w.Close() |
| 185 | |
| 186 | // Create an SSL proxy using the tls.Config that connects to the |
| 187 | // docker-composed kafka |
| 188 | config := tlsConfig(t) |
| 189 | l, err := tls.Listen("tcp", "127.0.0.1:", config) |
| 190 | if err != nil { |
| 191 | t.Error(err) |
| 192 | return |
| 193 | } |
| 194 | defer l.Close() |
| 195 | |
| 196 | go func() { |
| 197 | for { |
| 198 | conn, err := l.Accept() |
| 199 | if err != nil { |
| 200 | return // intentionally ignored |
| 201 | } |
| 202 | |
| 203 | go func(in net.Conn) { |
| 204 | out, err := net.Dial("tcp", "localhost:9092") |
| 205 | if err != nil { |
| 206 | t.Error(err) |
| 207 | return |
| 208 | } |
| 209 | defer out.Close() |
| 210 | |
| 211 | go io.Copy(in, out) |
| 212 | io.Copy(out, in) |
| 213 | }(conn) |
| 214 | } |
| 215 | }() |
| 216 | |
| 217 | // Use the tls.Config and connect to the SSL proxy |
| 218 | d := &Dialer{ |
| 219 | TLS: config, |
| 220 | } |
| 221 | partitions, err := d.LookupPartitions(context.Background(), "tcp", l.Addr().String(), topic) |
| 222 | if err != nil { |
| 223 | t.Error(err) |
| 224 | return |
| 225 | } |
| 226 | |
| 227 | // Verify returned partition data is what we expect |
| 228 | sort.Slice(partitions, func(i int, j int) bool { |
| 229 | return partitions[i].ID < partitions[j].ID |
| 230 | }) |
nothing calls this directly
no test coverage detected