(t *testing.T)
| 1115 | } |
| 1116 | |
| 1117 | func TestClientRefreshesMetadataConcurrently(t *testing.T) { |
| 1118 | seedBroker := NewMockBroker(t, 1) |
| 1119 | |
| 1120 | seedBroker.setHandler(func(req *request) (res encoderWithHeader) { |
| 1121 | mr, ok := req.body.(*MetadataRequest) |
| 1122 | if !ok { |
| 1123 | return nil |
| 1124 | } |
| 1125 | resp := new(MetadataResponse) |
| 1126 | for _, topic := range mr.Topics { |
| 1127 | switch topic { |
| 1128 | case "topic1": |
| 1129 | resp.Topics = append(resp.Topics, &TopicMetadata{ |
| 1130 | Version: 1, |
| 1131 | Name: "topic1", |
| 1132 | Partitions: []*PartitionMetadata{}, |
| 1133 | }) |
| 1134 | case "topic2": |
| 1135 | resp.Topics = append(resp.Topics, &TopicMetadata{ |
| 1136 | Version: 1, |
| 1137 | Name: "topic2", |
| 1138 | Partitions: []*PartitionMetadata{}, |
| 1139 | }) |
| 1140 | case "topic3": |
| 1141 | resp.Topics = append(resp.Topics, &TopicMetadata{ |
| 1142 | Version: 1, |
| 1143 | Name: "topic3", |
| 1144 | Err: ErrUnknownTopicOrPartition, |
| 1145 | }) |
| 1146 | default: |
| 1147 | t.Errorf("unexpected topic: %s", topic) |
| 1148 | } |
| 1149 | } |
| 1150 | resp.AddBroker(seedBroker.Addr(), 1) |
| 1151 | return resp |
| 1152 | }) |
| 1153 | |
| 1154 | config := NewTestConfig() |
| 1155 | config.Metadata.SingleFlight = true |
| 1156 | client, err := NewClient([]string{seedBroker.Addr()}, config) |
| 1157 | require.NoError(t, err) |
| 1158 | |
| 1159 | var waitGroup sync.WaitGroup |
| 1160 | waitGroup.Add(100) |
| 1161 | for range 100 { |
| 1162 | go func() { |
| 1163 | defer waitGroup.Done() |
| 1164 | assert.NoError(t, client.RefreshMetadata("topic1")) |
| 1165 | assert.NoError(t, client.RefreshMetadata("topic2")) |
| 1166 | assert.ErrorIs(t, client.RefreshMetadata("topic3"), ErrUnknownTopicOrPartition) |
| 1167 | topics, err := client.Topics() |
| 1168 | assert.NoError(t, err) |
| 1169 | assert.Len(t, topics, 2) |
| 1170 | }() |
| 1171 | } |
| 1172 | waitGroup.Wait() |
| 1173 | safeClose(t, client) |
| 1174 | seedBroker.Close() |
nothing calls this directly
no test coverage detected