MCPcopy
hub / github.com/IBM/sarama / TestListOffsets

Function TestListOffsets

admin_test.go:2188–2301  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2186}
2187
2188func TestListOffsets(t *testing.T) {
2189 const topic = "my-topic"
2190
2191 t.Run("returns offsets from a single broker", func(t *testing.T) {
2192 const partition = int32(0)
2193 const timestamp = int64(1690000000000)
2194 const expectedOffset = int64(42)
2195
2196 broker := newMockBroker(t, 1)
2197 broker.SetHandlerByMap(map[string]MockResponse{
2198 "OffsetRequest": NewMockOffsetResponse(t).SetOffset(topic, partition, timestamp, expectedOffset),
2199 "MetadataRequest": mockMetadataFor(t, broker).SetLeader(topic, partition, broker.BrokerID()),
2200 })
2201
2202 result, err := newTestAdmin(t, broker).ListOffsets(map[string]map[int32]int64{
2203 topic: {partition: timestamp},
2204 }, nil)
2205 require.NoError(t, err)
2206
2207 info := result[topic][partition]
2208 require.NotNil(t, info)
2209 assert.Equal(t, ErrNoError, info.Err)
2210 assert.Equal(t, expectedOffset, info.Offset)
2211 })
2212
2213 t.Run("fans out across partition leaders", func(t *testing.T) {
2214 const offset1 = int64(100)
2215 const offset2 = int64(200)
2216
2217 leader1 := newMockBroker(t, 1)
2218 leader2 := newMockBroker(t, 2)
2219 metadata := mockMetadataFor(t, leader1, leader2).
2220 SetLeader(topic, 0, leader1.BrokerID()).
2221 SetLeader(topic, 1, leader2.BrokerID())
2222
2223 leader1.SetHandlerByMap(map[string]MockResponse{
2224 "MetadataRequest": metadata,
2225 "OffsetRequest": NewMockOffsetResponse(t).SetOffset(topic, 0, OffsetNewest, offset1),
2226 })
2227 leader2.SetHandlerByMap(map[string]MockResponse{
2228 "MetadataRequest": metadata,
2229 "OffsetRequest": NewMockOffsetResponse(t).SetOffset(topic, 1, OffsetNewest, offset2),
2230 })
2231
2232 result, err := newTestAdmin(t, leader1).ListOffsets(map[string]map[int32]int64{
2233 topic: {0: OffsetNewest, 1: OffsetNewest},
2234 }, nil)
2235 require.NoError(t, err)
2236 require.Contains(t, result, topic)
2237 assert.Equal(t, offset1, result[topic][0].Offset)
2238 assert.Equal(t, offset2, result[topic][1].Offset)
2239 })
2240
2241 t.Run("propagates IsolationLevel to the broker request", func(t *testing.T) {
2242 const partition = int32(0)
2243
2244 broker := newMockBroker(t, 1)
2245 metadata := mockMetadataFor(t, broker).SetLeader(topic, partition, broker.BrokerID())

Callers

nothing calls this directly

Calls 12

newMockBrokerFunction · 0.85
NewMockOffsetResponseFunction · 0.85
mockMetadataForFunction · 0.85
newTestAdminFunction · 0.85
RunMethod · 0.80
SetHandlerByMapMethod · 0.80
SetLeaderMethod · 0.80
BrokerIDMethod · 0.80
SetHandlerFuncByMapMethod · 0.80
ListOffsetsMethod · 0.65
ForMethod · 0.65
SetOffsetMethod · 0.45

Tested by

no test coverage detected