(typ reflect.Type, version int16, flexible bool)
| 387 | } |
| 388 | |
| 389 | func structDecodeFuncOf(typ reflect.Type, version int16, flexible bool) decodeFunc { |
| 390 | type field struct { |
| 391 | decode decodeFunc |
| 392 | index index |
| 393 | tagID int |
| 394 | } |
| 395 | |
| 396 | var fields []field |
| 397 | taggedFields := map[int]*field{} |
| 398 | |
| 399 | forEachStructField(typ, func(typ reflect.Type, index index, tag string) { |
| 400 | forEachStructTag(tag, func(tag structTag) bool { |
| 401 | if tag.MinVersion <= version && version <= tag.MaxVersion { |
| 402 | f := field{ |
| 403 | decode: decodeFuncOf(typ, version, flexible, tag), |
| 404 | index: index, |
| 405 | tagID: tag.TagID, |
| 406 | } |
| 407 | |
| 408 | if tag.TagID < -1 { |
| 409 | // Normal required field |
| 410 | fields = append(fields, f) |
| 411 | } else { |
| 412 | // Optional tagged field (flexible messages only) |
| 413 | taggedFields[tag.TagID] = &f |
| 414 | } |
| 415 | return false |
| 416 | } |
| 417 | return true |
| 418 | }) |
| 419 | }) |
| 420 | |
| 421 | return func(d *decoder, v value) { |
| 422 | for i := range fields { |
| 423 | f := &fields[i] |
| 424 | f.decode(d, v.fieldByIndex(f.index)) |
| 425 | } |
| 426 | |
| 427 | if flexible { |
| 428 | // See https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields |
| 429 | // for details of tag buffers in "flexible" messages. |
| 430 | n := int(d.readUnsignedVarInt()) |
| 431 | |
| 432 | for i := 0; i < n; i++ { |
| 433 | tagID := int(d.readUnsignedVarInt()) |
| 434 | size := int(d.readUnsignedVarInt()) |
| 435 | |
| 436 | f, ok := taggedFields[tagID] |
| 437 | if ok { |
| 438 | f.decode(d, v.fieldByIndex(f.index)) |
| 439 | } else { |
| 440 | d.read(size) |
| 441 | } |
| 442 | } |
| 443 | } |
| 444 | } |
| 445 | } |
| 446 |
no test coverage detected