MCPcopy
hub / github.com/segmentio/kafka-go / writeCompressedMessages

Method writeCompressedMessages

conn.go:1131–1273  ·  view source on GitHub ↗
(codec CompressionCodec, msgs ...Message)

Source from the content-addressed store, hash-verified

1129}
1130
1131func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) {
1132 if len(msgs) == 0 {
1133 return
1134 }
1135
1136 writeTime := time.Now()
1137 for i, msg := range msgs {
1138 // users may believe they can set the Topic and/or Partition
1139 // on the kafka message.
1140 if msg.Topic != "" && msg.Topic != c.topic {
1141 err = errInvalidWriteTopic
1142 return
1143 }
1144 if msg.Partition != 0 {
1145 err = errInvalidWritePartition
1146 return
1147 }
1148
1149 if msg.Time.IsZero() {
1150 msgs[i].Time = writeTime
1151 }
1152
1153 nbytes += len(msg.Key) + len(msg.Value)
1154 }
1155
1156 var produceVersion apiVersion
1157 if produceVersion, err = c.negotiateVersion(produce, v2, v3, v7); err != nil {
1158 return
1159 }
1160
1161 err = c.writeOperation(
1162 func(deadline time.Time, id int32) error {
1163 now := time.Now()
1164 deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
1165 switch produceVersion {
1166 case v7:
1167 recordBatch, err :=
1168 newRecordBatch(
1169 codec,
1170 msgs...,
1171 )
1172 if err != nil {
1173 return err
1174 }
1175 return c.wb.writeProduceRequestV7(
1176 id,
1177 c.clientID,
1178 c.topic,
1179 c.partition,
1180 deadlineToTimeout(deadline, now),
1181 int16(atomic.LoadInt32(&c.requiredAcks)),
1182 c.transactionalID,
1183 recordBatch,
1184 )
1185 case v3:
1186 recordBatch, err :=
1187 newRecordBatch(
1188 codec,

Callers 2

Calls 15

negotiateVersionMethod · 0.95
writeOperationMethod · 0.95
readFromMethod · 0.95
adjustDeadlineForRTTFunction · 0.85
newRecordBatchFunction · 0.85
deadlineToTimeoutFunction · 0.85
expectZeroSizeFunction · 0.85
readArrayWithFunction · 0.85
discardStringFunction · 0.85
discardInt32Function · 0.85
IsZeroMethod · 0.80
writeProduceRequestV7Method · 0.80

Tested by

no test coverage detected