MCPcopy
hub / github.com/IBM/sarama / TestSyncProducer

Function TestSyncProducer

sync_producer_test.go:12–60  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

10)
11
12func TestSyncProducer(t *testing.T) {
13 seedBroker := NewMockBroker(t, 1)
14 leader := NewMockBroker(t, 2)
15
16 metadataResponse := new(MetadataResponse)
17 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
18 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
19 seedBroker.Returns(metadataResponse)
20
21 prodSuccess := new(ProduceResponse)
22 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
23 for range 10 {
24 leader.Returns(prodSuccess)
25 }
26
27 config := NewTestConfig()
28 config.Producer.Return.Successes = true
29 producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
30 if err != nil {
31 t.Fatal(err)
32 }
33
34 for range 10 {
35 msg := &ProducerMessage{
36 Topic: "my_topic",
37 Value: StringEncoder(TestMessage),
38 Metadata: "test",
39 }
40
41 partition, offset, err := producer.SendMessage(msg)
42
43 if partition != 0 || msg.Partition != partition {
44 t.Error("Unexpected partition")
45 }
46 if offset != 0 || msg.Offset != offset {
47 t.Error("Unexpected offset")
48 }
49 if str, ok := msg.Metadata.(string); !ok || str != "test" {
50 t.Error("Unexpected metadata")
51 }
52 if err != nil {
53 t.Error(err)
54 }
55 }
56
57 safeClose(t, producer)
58 leader.Close()
59 seedBroker.Close()
60}
61
62func TestSyncProducerTransactional(t *testing.T) {
63 seedBroker := NewMockBroker(t, 1)

Callers

nothing calls this directly

Calls 14

AddrMethod · 0.95
BrokerIDMethod · 0.95
ReturnsMethod · 0.95
SendMessageMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
StringEncoderTypeAlias · 0.85
AddBrokerMethod · 0.80
FatalMethod · 0.80
NewTestConfigFunction · 0.70
NewSyncProducerFunction · 0.70
safeCloseFunction · 0.70

Tested by

no test coverage detected