When set to ReadCommitted, no uncommitted message should be available in messages channel
(t *testing.T)
| 1993 | |
| 1994 | // When set to ReadCommitted, no uncommitted message should be available in messages channel |
| 1995 | func TestExcludeUncommitted(t *testing.T) { |
| 1996 | // Given |
| 1997 | broker0 := NewMockBroker(t, 0) |
| 1998 | |
| 1999 | fetchResponse := &FetchResponse{ |
| 2000 | Version: 5, |
| 2001 | Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: { |
| 2002 | AbortedTransactions: []*AbortedTransaction{{ProducerID: 7, FirstOffset: 1235}}, |
| 2003 | }}}, |
| 2004 | } |
| 2005 | fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1234, 7, true) // committed msg |
| 2006 | fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1235, 7, true) // uncommitted msg |
| 2007 | fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1236, 7, true) // uncommitted msg |
| 2008 | fetchResponse.AddControlRecord("my_topic", 0, 1237, 7, ControlRecordAbort) // abort control record |
| 2009 | fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1238, 7, true) // committed msg |
| 2010 | |
| 2011 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 2012 | "MetadataRequest": NewMockMetadataResponse(t). |
| 2013 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 2014 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 2015 | "OffsetRequest": NewMockOffsetResponse(t). |
| 2016 | SetOffset("my_topic", 0, OffsetOldest, 0). |
| 2017 | SetOffset("my_topic", 0, OffsetNewest, 1237), |
| 2018 | "FetchRequest": NewMockWrapper(fetchResponse), |
| 2019 | }) |
| 2020 | |
| 2021 | cfg := NewTestConfig() |
| 2022 | cfg.Consumer.Return.Errors = true |
| 2023 | cfg.Version = V0_11_0_0 |
| 2024 | cfg.Consumer.IsolationLevel = ReadCommitted |
| 2025 | |
| 2026 | // When |
| 2027 | master, err := NewConsumer([]string{broker0.Addr()}, cfg) |
| 2028 | if err != nil { |
| 2029 | t.Fatal(err) |
| 2030 | } |
| 2031 | |
| 2032 | consumer, err := master.ConsumePartition("my_topic", 0, 1234) |
| 2033 | if err != nil { |
| 2034 | t.Fatal(err) |
| 2035 | } |
| 2036 | |
| 2037 | // Then: only the 2 committed messages are returned |
| 2038 | select { |
| 2039 | case message := <-consumer.Messages(): |
| 2040 | assertMessageOffset(t, message, int64(1234)) |
| 2041 | case err := <-consumer.Errors(): |
| 2042 | t.Error(err) |
| 2043 | } |
| 2044 | select { |
| 2045 | case message := <-consumer.Messages(): |
| 2046 | assertMessageOffset(t, message, int64(1238)) |
| 2047 | case err := <-consumer.Errors(): |
| 2048 | t.Error(err) |
| 2049 | } |
| 2050 | |
| 2051 | safeClose(t, consumer) |
| 2052 | safeClose(t, master) |
nothing calls this directly
no test coverage detected