MCPcopy
hub / github.com/segmentio/kafka-go / Marshal

Function Marshal

protocol/encode.go:549–596  ·  view source on GitHub ↗
(version int16, value interface{})

Source from the content-addressed store, hash-verified

547}
548
549func Marshal(version int16, value interface{}) ([]byte, error) {
550 typ := typeOf(value)
551 cache, _ := marshalers.Load().(map[versionedType]encodeFunc)
552 key := versionedType{typ: typ, version: version}
553 encode := cache[key]
554
555 if encode == nil {
556 encode = encodeFuncOf(reflect.TypeOf(value), version, false, structTag{
557 MinVersion: -1,
558 MaxVersion: -1,
559 TagID: -2,
560 Compact: true,
561 Nullable: true,
562 })
563
564 newCache := make(map[versionedType]encodeFunc, len(cache)+1)
565 newCache[key] = encode
566
567 for typ, fun := range cache {
568 newCache[typ] = fun
569 }
570
571 marshalers.Store(newCache)
572 }
573
574 e, _ := encoders.Get().(*encoder)
575 if e == nil {
576 e = &encoder{writer: new(bytes.Buffer)}
577 }
578
579 b, _ := e.writer.(*bytes.Buffer)
580 defer func() {
581 b.Reset()
582 e.Reset(b)
583 encoders.Put(e)
584 }()
585
586 encode(e, nonAddressableValueOf(value))
587
588 if e.err != nil {
589 return nil, e.err
590 }
591
592 buf := b.Bytes()
593 out := make([]byte, len(buf))
594 copy(out, buf)
595 return out, nil
596}
597
598type versionedType struct {
599 typ _type

Callers 5

SyncGroupMethod · 0.92
MarshalFunction · 0.92
MarshalMethod · 0.92
JoinGroupMethod · 0.92
TestSubscriptionFunction · 0.92

Calls 7

encodeFuncOfFunction · 0.85
GetMethod · 0.80
PutMethod · 0.80
typeOfFunction · 0.70
nonAddressableValueOfFunction · 0.70
BytesMethod · 0.65
ResetMethod · 0.45

Tested by 1

TestSubscriptionFunction · 0.74