MCPcopy
hub / github.com/segmentio/kafka-go / TestGenerationExitsOnPartitionChange

Function TestGenerationExitsOnPartitionChange

consumergroup_test.go:599–659  ·  view source on GitHub ↗

todo : test for multi-topic?

(t *testing.T)

Source from the content-addressed store, hash-verified

597// todo : test for multi-topic?
598
599func TestGenerationExitsOnPartitionChange(t *testing.T) {
600 var count int
601 partitions := [][]Partition{
602 {
603 Partition{
604 Topic: "topic-1",
605 ID: 0,
606 },
607 },
608 {
609 Partition{
610 Topic: "topic-1",
611 ID: 0,
612 },
613 {
614 Topic: "topic-1",
615 ID: 1,
616 },
617 },
618 }
619
620 conn := mockCoordinator{
621 readPartitionsFunc: func(...string) ([]Partition, error) {
622 p := partitions[count]
623 // cap the count at len(partitions) -1 so ReadPartitions doesn't even go out of bounds
624 // and long running tests don't fail
625 if count < len(partitions) {
626 count++
627 }
628 return p, nil
629 },
630 }
631
632 // Sadly this test is time based, so at the end will be seeing if the runGroup run to completion within the
633 // allotted time. The allotted time is 4x the PartitionWatchInterval.
634 now := time.Now()
635 watchTime := 500 * time.Millisecond
636
637 gen := Generation{
638 conn: conn,
639 done: make(chan struct{}),
640 joined: make(chan struct{}),
641 log: func(func(Logger)) {},
642 logError: func(func(Logger)) {},
643 }
644
645 done := make(chan struct{})
646 go func() {
647 gen.partitionWatcher(watchTime, "topic-1")
648 close(done)
649 }()
650
651 select {
652 case <-time.After(5 * time.Second):
653 t.Fatal("timed out waiting for partition watcher to exit")
654 case <-done:
655 if time.Since(now).Seconds() > watchTime.Seconds()*4 {
656 t.Error("partitionWatcher didn't see update")

Callers

nothing calls this directly

Calls 2

partitionWatcherMethod · 0.95
ErrorMethod · 0.45

Tested by

no test coverage detected