MCPcopy
hub / github.com/segmentio/kafka-go / Unmarshal

Function Unmarshal

protocol/decode.go:490–532  ·  view source on GitHub ↗
(data []byte, version int16, value interface{})

Source from the content-addressed store, hash-verified

488}
489
490func Unmarshal(data []byte, version int16, value interface{}) error {
491 typ := elemTypeOf(value)
492 cache, _ := unmarshalers.Load().(map[versionedType]decodeFunc)
493 key := versionedType{typ: typ, version: version}
494 decode := cache[key]
495
496 if decode == nil {
497 decode = decodeFuncOf(reflect.TypeOf(value).Elem(), version, false, structTag{
498 MinVersion: -1,
499 MaxVersion: -1,
500 TagID: -2,
501 Compact: true,
502 Nullable: true,
503 })
504
505 newCache := make(map[versionedType]decodeFunc, len(cache)+1)
506 newCache[key] = decode
507
508 for typ, fun := range cache {
509 newCache[typ] = fun
510 }
511
512 unmarshalers.Store(newCache)
513 }
514
515 d, _ := decoders.Get().(*decoder)
516 if d == nil {
517 d = &decoder{reader: bytes.NewReader(nil)}
518 }
519
520 d.remain = len(data)
521 r, _ := d.reader.(*bytes.Reader)
522 r.Reset(data)
523
524 defer func() {
525 r.Reset(nil)
526 d.Reset(r, 0)
527 decoders.Put(d)
528 }()
529
530 decode(d, valueOf(value))
531 return dontExpectEOF(d.err)
532}
533
534var (
535 decoders sync.Pool // *decoder

Callers 5

SyncGroupMethod · 0.92
UnmarshalFunction · 0.92
UnmarshalMethod · 0.92
JoinGroupMethod · 0.92
TestSubscriptionFunction · 0.92

Calls 8

decodeFuncOfFunction · 0.85
GetMethod · 0.80
PutMethod · 0.80
elemTypeOfFunction · 0.70
valueOfFunction · 0.70
dontExpectEOFFunction · 0.70
NewReaderMethod · 0.65
ResetMethod · 0.45

Tested by 1

TestSubscriptionFunction · 0.74