(t *testing.T)
| 1413 | } |
| 1414 | |
| 1415 | func TestConsumerGroupWithMissingTopic(t *testing.T) { |
| 1416 | t.Skip("this test doesn't work when the cluster is configured to auto-create topics") |
| 1417 | |
| 1418 | ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
| 1419 | defer cancel() |
| 1420 | |
| 1421 | conf := ReaderConfig{ |
| 1422 | Brokers: []string{"localhost:9092"}, |
| 1423 | GroupID: makeGroupID(), |
| 1424 | Topic: makeTopic(), |
| 1425 | MaxWait: time.Second, |
| 1426 | PartitionWatchInterval: 100 * time.Millisecond, |
| 1427 | WatchPartitionChanges: true, |
| 1428 | } |
| 1429 | |
| 1430 | r := NewReader(conf) |
| 1431 | defer r.Close() |
| 1432 | |
| 1433 | recvErr := make(chan error, 1) |
| 1434 | go func() { |
| 1435 | _, err := r.ReadMessage(ctx) |
| 1436 | recvErr <- err |
| 1437 | }() |
| 1438 | |
| 1439 | time.Sleep(time.Second) |
| 1440 | client, shutdown := newLocalClientWithTopic(conf.Topic, 1) |
| 1441 | defer shutdown() |
| 1442 | |
| 1443 | w := &Writer{ |
| 1444 | Addr: TCP(r.config.Brokers...), |
| 1445 | Topic: r.config.Topic, |
| 1446 | BatchTimeout: 10 * time.Millisecond, |
| 1447 | BatchSize: 1, |
| 1448 | Transport: client.Transport, |
| 1449 | } |
| 1450 | defer w.Close() |
| 1451 | if err := w.WriteMessages(ctx, Message{}); err != nil { |
| 1452 | t.Fatalf("write error: %+v", err) |
| 1453 | } |
| 1454 | |
| 1455 | if err := <-recvErr; err != nil { |
| 1456 | t.Fatalf("read error: %+v", err) |
| 1457 | } |
| 1458 | |
| 1459 | nMsgs := r.Stats().Messages |
| 1460 | if nMsgs != 1 { |
| 1461 | t.Fatalf("expected to receive one message, but got %d", nMsgs) |
| 1462 | } |
| 1463 | } |
| 1464 | |
| 1465 | func TestConsumerGroupWithTopic(t *testing.T) { |
| 1466 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
nothing calls this directly
no test coverage detected
searching dependent graphs…