(t *testing.T)
| 241 | } |
| 242 | |
| 243 | func TestFuncConsumerGroupOffsetDeletion(t *testing.T) { |
| 244 | t.Parallel() |
| 245 | checkKafkaVersion(t, "2.4.0") |
| 246 | setupFunctionalTest(t) |
| 247 | defer teardownFunctionalTest(t) |
| 248 | config := NewFunctionalTestConfig() |
| 249 | client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 250 | defer safeClose(t, client) |
| 251 | if err != nil { |
| 252 | t.Fatal(err) |
| 253 | } |
| 254 | |
| 255 | // create a consumer group with offsets on |
| 256 | // - topic test.1 partition 0 |
| 257 | // - topic test.4 partition 0 |
| 258 | groupID := testFuncConsumerGroupID(t) |
| 259 | consumerGroup, err := NewConsumerGroupFromClient(groupID, client) |
| 260 | if err != nil { |
| 261 | t.Fatal(err) |
| 262 | } |
| 263 | defer safeClose(t, consumerGroup) |
| 264 | |
| 265 | offsetMgr, _ := NewOffsetManagerFromClient(groupID, client) |
| 266 | defer safeClose(t, offsetMgr) |
| 267 | markOffset(t, offsetMgr, "test.1", 0, 1) |
| 268 | markOffset(t, offsetMgr, "test.4", 0, 2) |
| 269 | offsetMgr.Commit() |
| 270 | |
| 271 | admin, err := NewClusterAdminFromClient(client) |
| 272 | if err != nil { |
| 273 | t.Fatal(err) |
| 274 | } |
| 275 | offsetFetch, err := admin.ListConsumerGroupOffsets(groupID, nil) |
| 276 | if err != nil { |
| 277 | t.Fatal(err) |
| 278 | } |
| 279 | if len(offsetFetch.Blocks) != 2 { |
| 280 | t.Fatal("Expected offsets on two topics. Found offsets on ", len(offsetFetch.Blocks), "topics.") |
| 281 | } |
| 282 | |
| 283 | // Delete offset for partition topic test.4 partition 0 |
| 284 | err = admin.DeleteConsumerGroupOffset(groupID, "test.4", 0) |
| 285 | if err != nil { |
| 286 | t.Fatal(err) |
| 287 | } |
| 288 | |
| 289 | offsetFetch, err = admin.ListConsumerGroupOffsets(groupID, nil) |
| 290 | if err != nil { |
| 291 | t.Fatal(err) |
| 292 | } |
| 293 | if len(offsetFetch.Blocks) != 1 { |
| 294 | t.Fatal("Expected offsets on one topic. Found offsets on ", len(offsetFetch.Blocks), "topics.") |
| 295 | } |
| 296 | if offsetFetch.Blocks["test.4"] != nil { |
| 297 | t.Fatal("Offset still exists for topic 'topic.4'. It should have been deleted.") |
| 298 | } |
| 299 | } |
| 300 |
nothing calls this directly
no test coverage detected