MCPcopy
hub / github.com/IBM/sarama / SyncProducer

Interface SyncProducer

sync_producer.go:21–69  ·  sync_producer.go::SyncProducer

SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when it

Source from the content-addressed store, hash-verified

19// For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
20// be set to true in its configuration.
21type SyncProducer interface {
22
23 // SendMessage produces a given message, and returns only when it either has
24 // succeeded or failed to produce. It will return the partition and the offset
25 // of the produced message, or an error if the message failed to produce.
26 SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
27
28 // SendMessages produces a given set of messages, and returns only when all
29 // messages in the set have either succeeded or failed. Note that messages
30 // can succeed and fail individually; if some succeed and some fail,
31 // SendMessages will return an error.
32 SendMessages(msgs []*ProducerMessage) error
33
34 // Close shuts down the producer; you must call this function before a producer
35 // object passes out of scope, as it may otherwise leak memory.
36 // You must call this before calling Close on the underlying client.
37 Close() error
38
39 // TxnStatus return current producer transaction status.
40 TxnStatus() ProducerTxnStatusFlag
41
42 // IsTransactional return true when current producer is transactional.
43 IsTransactional() bool
44
45 // BeginTxn mark current transaction as ready.
46 BeginTxn() error
47
48 // CommitTxn commit current transaction.
49 CommitTxn() error
50
51 // AbortTxn abort current transaction.
52 AbortTxn() error
53
54 // AddOffsetsToTxn add associated offsets to current transaction.
55 AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error
56
57 // AddOffsetsToTxnWithGroupMetadata adds associated offsets to the current
58 // transaction, carrying the consumer group member metadata so the broker
59 // can fence stale members (KIP-447).
60 AddOffsetsToTxnWithGroupMetadata(offsets map[string][]*PartitionOffsetMetadata, groupMetadata *ConsumerGroupMetadata) error
61
62 // AddMessageToTxn add message offsets to current transaction.
63 AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error
64
65 // AddMessageToTxnWithGroupMetadata adds the message offset to the current
66 // transaction, carrying the consumer group member metadata so the broker
67 // can fence stale members (KIP-447).
68 AddMessageToTxnWithGroupMetadata(msg *ConsumerMessage, groupMetadata *ConsumerGroupMetadata, metadata *string) error
69}
70
71type syncProducer struct {
72 producer *asyncProducer

Implementers 2

syncProducersync_producer.go
SyncProducermocks/sync_producer.go

Calls

no outgoing calls

Tested by

no test coverage detected