MCPcopy
hub / github.com/IBM/sarama / TestExcludeUncommitted

Function TestExcludeUncommitted

consumer_test.go:1995–2054  ·  view source on GitHub ↗

When set to ReadCommitted, no uncommitted message should be available in messages channel

(t *testing.T)

Source from the content-addressed store, hash-verified

1993
1994// When set to ReadCommitted, no uncommitted message should be available in messages channel
1995func TestExcludeUncommitted(t *testing.T) {
1996 // Given
1997 broker0 := NewMockBroker(t, 0)
1998
1999 fetchResponse := &FetchResponse{
2000 Version: 5,
2001 Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: {
2002 AbortedTransactions: []*AbortedTransaction{{ProducerID: 7, FirstOffset: 1235}},
2003 }}},
2004 }
2005 fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1234, 7, true) // committed msg
2006 fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1235, 7, true) // uncommitted msg
2007 fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1236, 7, true) // uncommitted msg
2008 fetchResponse.AddControlRecord("my_topic", 0, 1237, 7, ControlRecordAbort) // abort control record
2009 fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1238, 7, true) // committed msg
2010
2011 broker0.SetHandlerByMap(map[string]MockResponse{
2012 "MetadataRequest": NewMockMetadataResponse(t).
2013 SetBroker(broker0.Addr(), broker0.BrokerID()).
2014 SetLeader("my_topic", 0, broker0.BrokerID()),
2015 "OffsetRequest": NewMockOffsetResponse(t).
2016 SetOffset("my_topic", 0, OffsetOldest, 0).
2017 SetOffset("my_topic", 0, OffsetNewest, 1237),
2018 "FetchRequest": NewMockWrapper(fetchResponse),
2019 })
2020
2021 cfg := NewTestConfig()
2022 cfg.Consumer.Return.Errors = true
2023 cfg.Version = V0_11_0_0
2024 cfg.Consumer.IsolationLevel = ReadCommitted
2025
2026 // When
2027 master, err := NewConsumer([]string{broker0.Addr()}, cfg)
2028 if err != nil {
2029 t.Fatal(err)
2030 }
2031
2032 consumer, err := master.ConsumePartition("my_topic", 0, 1234)
2033 if err != nil {
2034 t.Fatal(err)
2035 }
2036
2037 // Then: only the 2 committed messages are returned
2038 select {
2039 case message := <-consumer.Messages():
2040 assertMessageOffset(t, message, int64(1234))
2041 case err := <-consumer.Errors():
2042 t.Error(err)
2043 }
2044 select {
2045 case message := <-consumer.Messages():
2046 assertMessageOffset(t, message, int64(1238))
2047 case err := <-consumer.Errors():
2048 t.Error(err)
2049 }
2050
2051 safeClose(t, consumer)
2052 safeClose(t, master)

Callers

nothing calls this directly

Calls 15

AddRecordBatchMethod · 0.95
AddControlRecordMethod · 0.95
SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
NewMockWrapperFunction · 0.85
assertMessageOffsetFunction · 0.85

Tested by

no test coverage detected