(ctx context.Context, t *testing.T, writeClient *kgo.Client, partitionID int32, content []byte, version int)
| 221 | } |
| 222 | |
| 223 | func produceRecordWithVersion(ctx context.Context, t *testing.T, writeClient *kgo.Client, partitionID int32, content []byte, version int) int64 { |
| 224 | rec := &kgo.Record{ |
| 225 | Value: content, |
| 226 | Topic: topicName, |
| 227 | Partition: partitionID, |
| 228 | } |
| 229 | if version == 0 { |
| 230 | rec.Headers = nil |
| 231 | } else { |
| 232 | rec.Headers = []kgo.RecordHeader{RecordVersionHeader(version)} |
| 233 | } |
| 234 | |
| 235 | produceResult := writeClient.ProduceSync(ctx, rec) |
| 236 | require.NoError(t, produceResult.FirstErr()) |
| 237 | |
| 238 | return rec.Offset |
| 239 | } |
| 240 | |
| 241 | func RecordVersionHeader(version int) kgo.RecordHeader { |
| 242 | var b [4]byte |
no test coverage detected