todo : test for multi-topic?
(t *testing.T)
| 597 | // todo : test for multi-topic? |
| 598 | |
| 599 | func TestGenerationExitsOnPartitionChange(t *testing.T) { |
| 600 | var count int |
| 601 | partitions := [][]Partition{ |
| 602 | { |
| 603 | Partition{ |
| 604 | Topic: "topic-1", |
| 605 | ID: 0, |
| 606 | }, |
| 607 | }, |
| 608 | { |
| 609 | Partition{ |
| 610 | Topic: "topic-1", |
| 611 | ID: 0, |
| 612 | }, |
| 613 | { |
| 614 | Topic: "topic-1", |
| 615 | ID: 1, |
| 616 | }, |
| 617 | }, |
| 618 | } |
| 619 | |
| 620 | conn := mockCoordinator{ |
| 621 | readPartitionsFunc: func(...string) ([]Partition, error) { |
| 622 | p := partitions[count] |
| 623 | // cap the count at len(partitions) -1 so ReadPartitions doesn't even go out of bounds |
| 624 | // and long running tests don't fail |
| 625 | if count < len(partitions) { |
| 626 | count++ |
| 627 | } |
| 628 | return p, nil |
| 629 | }, |
| 630 | } |
| 631 | |
| 632 | // Sadly this test is time based, so at the end will be seeing if the runGroup run to completion within the |
| 633 | // allotted time. The allotted time is 4x the PartitionWatchInterval. |
| 634 | now := time.Now() |
| 635 | watchTime := 500 * time.Millisecond |
| 636 | |
| 637 | gen := Generation{ |
| 638 | conn: conn, |
| 639 | done: make(chan struct{}), |
| 640 | joined: make(chan struct{}), |
| 641 | log: func(func(Logger)) {}, |
| 642 | logError: func(func(Logger)) {}, |
| 643 | } |
| 644 | |
| 645 | done := make(chan struct{}) |
| 646 | go func() { |
| 647 | gen.partitionWatcher(watchTime, "topic-1") |
| 648 | close(done) |
| 649 | }() |
| 650 | |
| 651 | select { |
| 652 | case <-time.After(5 * time.Second): |
| 653 | t.Fatal("timed out waiting for partition watcher to exit") |
| 654 | case <-done: |
| 655 | if time.Since(now).Seconds() > watchTime.Seconds()*4 { |
| 656 | t.Error("partitionWatcher didn't see update") |
nothing calls this directly
no test coverage detected