MCPcopy
hub / github.com/IBM/sarama / TestBrokerProducerFlushSkipsMutedPartitions

Function TestBrokerProducerFlushSkipsMutedPartitions

async_producer_test.go:1862–1900  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1860}
1861
1862func TestBrokerProducerFlushSkipsMutedPartitions(t *testing.T) {
1863 config := NewTestConfig()
1864 parent := &asyncProducer{
1865 conf: config,
1866 muter: newPartitionMuter(),
1867 txnmgr: &transactionManager{},
1868 }
1869 bp := &brokerProducer{
1870 parent: parent,
1871 accumulatingBatch: newProduceSet(parent),
1872 currentRetries: make(map[string]map[int32]error),
1873 }
1874
1875 safeAddMessage(t, bp.accumulatingBatch, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("p0")})
1876 safeAddMessage(t, bp.accumulatingBatch, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("p1")})
1877
1878 blocked := newProduceSet(parent)
1879 safeAddMessage(t, blocked, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("held")})
1880 if !parent.muter.tryMute(blocked) {
1881 t.Fatal("expected to mute blocked partition")
1882 }
1883 defer parent.muter.unmute(blocked)
1884
1885 if !bp.tryBuildFlushingBatch() {
1886 t.Fatal("expected to flush available partitions")
1887 }
1888 if bp.flushingBatch == nil {
1889 t.Fatal("expected flushing batch to be set")
1890 }
1891 if _, ok := bp.flushingBatch.msgs["topic"][1]; ok {
1892 t.Fatal("expected muted partition to stay buffered")
1893 }
1894 if _, ok := bp.accumulatingBatch.msgs["topic"][0]; ok {
1895 t.Fatal("expected unmuted partition to flush")
1896 }
1897 if _, ok := bp.accumulatingBatch.msgs["topic"][1]; !ok {
1898 t.Fatal("expected muted partition to remain in accumulating batch")
1899 }
1900}
1901
1902// TestBrokerProducerWaitForSpaceAllPartitionsMuted verifies that waitForSpace unblocks
1903// when all partitions in the accumulating batch are externally muted and later unmuted.

Callers

nothing calls this directly

Calls 9

tryBuildFlushingBatchMethod · 0.95
newPartitionMuterFunction · 0.85
newProduceSetFunction · 0.85
safeAddMessageFunction · 0.85
StringEncoderTypeAlias · 0.85
tryMuteMethod · 0.80
FatalMethod · 0.80
unmuteMethod · 0.80
NewTestConfigFunction · 0.70

Tested by

no test coverage detected