(t *testing.T)
| 1860 | } |
| 1861 | |
| 1862 | func TestBrokerProducerFlushSkipsMutedPartitions(t *testing.T) { |
| 1863 | config := NewTestConfig() |
| 1864 | parent := &asyncProducer{ |
| 1865 | conf: config, |
| 1866 | muter: newPartitionMuter(), |
| 1867 | txnmgr: &transactionManager{}, |
| 1868 | } |
| 1869 | bp := &brokerProducer{ |
| 1870 | parent: parent, |
| 1871 | accumulatingBatch: newProduceSet(parent), |
| 1872 | currentRetries: make(map[string]map[int32]error), |
| 1873 | } |
| 1874 | |
| 1875 | safeAddMessage(t, bp.accumulatingBatch, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("p0")}) |
| 1876 | safeAddMessage(t, bp.accumulatingBatch, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("p1")}) |
| 1877 | |
| 1878 | blocked := newProduceSet(parent) |
| 1879 | safeAddMessage(t, blocked, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("held")}) |
| 1880 | if !parent.muter.tryMute(blocked) { |
| 1881 | t.Fatal("expected to mute blocked partition") |
| 1882 | } |
| 1883 | defer parent.muter.unmute(blocked) |
| 1884 | |
| 1885 | if !bp.tryBuildFlushingBatch() { |
| 1886 | t.Fatal("expected to flush available partitions") |
| 1887 | } |
| 1888 | if bp.flushingBatch == nil { |
| 1889 | t.Fatal("expected flushing batch to be set") |
| 1890 | } |
| 1891 | if _, ok := bp.flushingBatch.msgs["topic"][1]; ok { |
| 1892 | t.Fatal("expected muted partition to stay buffered") |
| 1893 | } |
| 1894 | if _, ok := bp.accumulatingBatch.msgs["topic"][0]; ok { |
| 1895 | t.Fatal("expected unmuted partition to flush") |
| 1896 | } |
| 1897 | if _, ok := bp.accumulatingBatch.msgs["topic"][1]; !ok { |
| 1898 | t.Fatal("expected muted partition to remain in accumulating batch") |
| 1899 | } |
| 1900 | } |
| 1901 | |
| 1902 | // TestBrokerProducerWaitForSpaceAllPartitionsMuted verifies that waitForSpace unblocks |
| 1903 | // when all partitions in the accumulating batch are externally muted and later unmuted. |
nothing calls this directly
no test coverage detected