(topic string)
| 314 | } |
| 315 | |
| 316 | func testFuncConsumerGroupFuzzySeed(topic string) error { |
| 317 | client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) |
| 318 | if err != nil { |
| 319 | return err |
| 320 | } |
| 321 | defer func() { _ = client.Close() }() |
| 322 | |
| 323 | total := int64(0) |
| 324 | for pn := int32(0); pn < 4; pn++ { |
| 325 | newest, err := client.GetOffset(topic, pn, OffsetNewest) |
| 326 | if err != nil { |
| 327 | return err |
| 328 | } |
| 329 | oldest, err := client.GetOffset(topic, pn, OffsetOldest) |
| 330 | if err != nil { |
| 331 | return err |
| 332 | } |
| 333 | total = total + newest - oldest |
| 334 | } |
| 335 | if total >= 7000 { |
| 336 | return nil |
| 337 | } |
| 338 | |
| 339 | producer, err := NewAsyncProducerFromClient(client) |
| 340 | if err != nil { |
| 341 | return err |
| 342 | } |
| 343 | for i := total; i < 7000; i++ { |
| 344 | producer.Input() <- &ProducerMessage{Topic: topic, Value: ByteEncoder([]byte("testdata"))} |
| 345 | } |
| 346 | return producer.Close() |
| 347 | } |
| 348 | |
| 349 | type testFuncConsumerGroupMessage struct { |
| 350 | ClientID string |
no test coverage detected