MCPcopy
hub / github.com/IBM/sarama / TestPartitionConsumerBrokerRace

Function TestPartitionConsumerBrokerRace

consumer_test.go:1648–1722  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1646}
1647
1648func TestPartitionConsumerBrokerRace(t *testing.T) {
1649 oldMaxProcs := runtime.GOMAXPROCS(2)
1650 defer runtime.GOMAXPROCS(oldMaxProcs)
1651
1652 const iterations = 2048
1653
1654 config := NewTestConfig()
1655 config.ChannelBufferSize = 0
1656 config.Consumer.MaxProcessingTime = time.Hour
1657
1658 broker := &brokerConsumer{
1659 input: make(chan *brokerSubscription, 1),
1660 stop: make(chan none),
1661 }
1662
1663 child := &partitionConsumer{
1664 conf: config,
1665 broker: broker,
1666 messages: make(chan *ConsumerMessage, 1),
1667 errors: make(chan *ConsumerError, 1),
1668 feeder: make(chan *partitionConsumerResponse, 1),
1669 trigger: make(chan none, 1),
1670 dying: make(chan none),
1671 dispatcherStop: make(chan none),
1672 topic: "my_topic",
1673 partition: 0,
1674 fetchSize: config.Consumer.Fetch.Default,
1675 }
1676 child.brokerSubscription = newBrokerSubscription(child)
1677
1678 response := &FetchResponse{}
1679 response.AddMessage("my_topic", 0, nil, testMsg, 0)
1680
1681 done := make(chan none)
1682 feederDone := make(chan none)
1683 messagesDone := make(chan none)
1684
1685 go func() {
1686 defer close(feederDone)
1687 child.responseFeeder()
1688 }()
1689
1690 go func() {
1691 defer close(messagesDone)
1692 for range child.messages {
1693 }
1694 }()
1695
1696 go func() {
1697 for {
1698 select {
1699 case <-done:
1700 return
1701 default:
1702 child.broker = broker
1703 runtime.Gosched()
1704 }
1705 }

Callers

nothing calls this directly

Calls 5

AddMessageMethod · 0.95
responseFeederMethod · 0.95
newBrokerSubscriptionFunction · 0.85
NewTestConfigFunction · 0.70
AddMethod · 0.45

Tested by

no test coverage detected