(t *testing.T)
| 1058 | } |
| 1059 | |
| 1060 | func TestClientUpdateMetadataErrorAndRetry(t *testing.T) { |
| 1061 | seedBroker := NewMockBroker(t, 1) |
| 1062 | var called atomic.Int32 |
| 1063 | |
| 1064 | seedBroker.setHandler(func(req *request) (res encoderWithHeader) { |
| 1065 | if req.body.key() != 3 { |
| 1066 | t.Error("this test sends only Metadata requests") |
| 1067 | return |
| 1068 | } |
| 1069 | resp := new(MetadataResponse) |
| 1070 | for _, topic := range req.body.(*MetadataRequest).Topics { |
| 1071 | if topic == "new_topic" { |
| 1072 | resp.Topics = append(resp.Topics, &TopicMetadata{ |
| 1073 | Version: 1, |
| 1074 | Name: "new_topic", |
| 1075 | Err: ErrUnknownTopicOrPartition, |
| 1076 | }) |
| 1077 | } |
| 1078 | } |
| 1079 | called.Add(1) |
| 1080 | resp.AddBroker(seedBroker.Addr(), 1) |
| 1081 | return resp |
| 1082 | }) |
| 1083 | |
| 1084 | config := NewTestConfig() |
| 1085 | config.Metadata.Retry.Max = 3 |
| 1086 | config.Metadata.Retry.Backoff = 200 * time.Millisecond |
| 1087 | config.Metadata.RefreshFrequency = 0 |
| 1088 | config.Net.ReadTimeout = 10 * time.Millisecond |
| 1089 | config.Net.WriteTimeout = 10 * time.Millisecond |
| 1090 | config.Metadata.SingleFlight = true |
| 1091 | client, err := NewClient([]string{seedBroker.Addr()}, config) |
| 1092 | if err != nil { |
| 1093 | t.Fatal(err) |
| 1094 | } |
| 1095 | waitGroup := sync.WaitGroup{} |
| 1096 | waitGroup.Add(10) |
| 1097 | for range 10 { |
| 1098 | go func() { |
| 1099 | defer waitGroup.Done() |
| 1100 | err := client.RefreshMetadata("new_topic") |
| 1101 | if err == nil { |
| 1102 | t.Error("should return error") |
| 1103 | return |
| 1104 | } |
| 1105 | }() |
| 1106 | } |
| 1107 | waitGroup.Wait() |
| 1108 | safeClose(t, client) |
| 1109 | seedBroker.Close() |
| 1110 | // The refresh metadata is always for the same topic, |
| 1111 | // it should have been batched. |
| 1112 | if count := called.Load(); count >= 7 { |
| 1113 | t.Errorf("Refresh metadata was called %d times, this should be less than 7.", count) |
| 1114 | } |
| 1115 | } |
| 1116 | |
| 1117 | func TestClientRefreshesMetadataConcurrently(t *testing.T) { |
nothing calls this directly
no test coverage detected