Fetch returns a FetchResponse or error
(request *FetchRequest)
| 582 | |
| 583 | // Fetch returns a FetchResponse or error |
| 584 | func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) { |
| 585 | defer func() { |
| 586 | // snapshot meters under the lock; Open may reassign them on reconnect |
| 587 | b.lock.Lock() |
| 588 | fetchRate, brokerFetchRate := b.fetchRate, b.brokerFetchRate |
| 589 | b.lock.Unlock() |
| 590 | |
| 591 | if fetchRate != nil { |
| 592 | fetchRate.Mark(1) |
| 593 | } |
| 594 | if brokerFetchRate != nil { |
| 595 | brokerFetchRate.Mark(1) |
| 596 | } |
| 597 | }() |
| 598 | |
| 599 | response := new(FetchResponse) |
| 600 | |
| 601 | err := b.sendAndReceive(request, response) |
| 602 | if err != nil { |
| 603 | return nil, err |
| 604 | } |
| 605 | |
| 606 | return response, nil |
| 607 | } |
| 608 | |
| 609 | // CommitOffset return an Offset commit response or error |
| 610 | func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) { |