(t *testing.T)
| 335 | } |
| 336 | |
| 337 | func TestConsumerOffsetsAreManagedCorrectlyWithOffsetOldest(t *testing.T) { |
| 338 | trm := newTestReporterMock() |
| 339 | consumer := NewConsumer(trm, NewTestConfig()) |
| 340 | pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) |
| 341 | pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) |
| 342 | pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) |
| 343 | pcmock.ExpectMessagesDrainedOnClose() |
| 344 | |
| 345 | pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest) |
| 346 | if err != nil { |
| 347 | t.Error(err) |
| 348 | } |
| 349 | |
| 350 | message1 := <-pc.Messages() |
| 351 | if message1.Offset != 0 { |
| 352 | t.Errorf("Expected offset of first message in the partition to be 0, got %d", message1.Offset) |
| 353 | } |
| 354 | |
| 355 | message2 := <-pc.Messages() |
| 356 | if message2.Offset != 1 { |
| 357 | t.Errorf("Expected offset of second message in the partition to be 1, got %d", message2.Offset) |
| 358 | } |
| 359 | |
| 360 | if err := consumer.Close(); err != nil { |
| 361 | t.Error(err) |
| 362 | } |
| 363 | |
| 364 | if len(trm.errors) != 0 { |
| 365 | t.Errorf("Expected to not report any errors, found: %v", trm.errors) |
| 366 | } |
| 367 | } |
| 368 | |
| 369 | func TestConsumerOffsetsAreManagedCorrectlyWithSpecifiedOffset(t *testing.T) { |
| 370 | startingOffset := int64(123) |
nothing calls this directly
no test coverage detected