(t *testing.T)
| 1646 | } |
| 1647 | |
| 1648 | func TestPartitionConsumerBrokerRace(t *testing.T) { |
| 1649 | oldMaxProcs := runtime.GOMAXPROCS(2) |
| 1650 | defer runtime.GOMAXPROCS(oldMaxProcs) |
| 1651 | |
| 1652 | const iterations = 2048 |
| 1653 | |
| 1654 | config := NewTestConfig() |
| 1655 | config.ChannelBufferSize = 0 |
| 1656 | config.Consumer.MaxProcessingTime = time.Hour |
| 1657 | |
| 1658 | broker := &brokerConsumer{ |
| 1659 | input: make(chan *brokerSubscription, 1), |
| 1660 | stop: make(chan none), |
| 1661 | } |
| 1662 | |
| 1663 | child := &partitionConsumer{ |
| 1664 | conf: config, |
| 1665 | broker: broker, |
| 1666 | messages: make(chan *ConsumerMessage, 1), |
| 1667 | errors: make(chan *ConsumerError, 1), |
| 1668 | feeder: make(chan *partitionConsumerResponse, 1), |
| 1669 | trigger: make(chan none, 1), |
| 1670 | dying: make(chan none), |
| 1671 | dispatcherStop: make(chan none), |
| 1672 | topic: "my_topic", |
| 1673 | partition: 0, |
| 1674 | fetchSize: config.Consumer.Fetch.Default, |
| 1675 | } |
| 1676 | child.brokerSubscription = newBrokerSubscription(child) |
| 1677 | |
| 1678 | response := &FetchResponse{} |
| 1679 | response.AddMessage("my_topic", 0, nil, testMsg, 0) |
| 1680 | |
| 1681 | done := make(chan none) |
| 1682 | feederDone := make(chan none) |
| 1683 | messagesDone := make(chan none) |
| 1684 | |
| 1685 | go func() { |
| 1686 | defer close(feederDone) |
| 1687 | child.responseFeeder() |
| 1688 | }() |
| 1689 | |
| 1690 | go func() { |
| 1691 | defer close(messagesDone) |
| 1692 | for range child.messages { |
| 1693 | } |
| 1694 | }() |
| 1695 | |
| 1696 | go func() { |
| 1697 | for { |
| 1698 | select { |
| 1699 | case <-done: |
| 1700 | return |
| 1701 | default: |
| 1702 | child.broker = broker |
| 1703 | runtime.Gosched() |
| 1704 | } |
| 1705 | } |
nothing calls this directly
no test coverage detected