RoundTrip sends a request to a kafka broker and returns the response.
(rw io.ReadWriter, apiVersion int16, correlationID int32, clientID string, req Message)
| 6 | |
| 7 | // RoundTrip sends a request to a kafka broker and returns the response. |
| 8 | func RoundTrip(rw io.ReadWriter, apiVersion int16, correlationID int32, clientID string, req Message) (Message, error) { |
| 9 | if err := WriteRequest(rw, apiVersion, correlationID, clientID, req); err != nil { |
| 10 | return nil, err |
| 11 | } |
| 12 | if !hasResponse(req) { |
| 13 | return nil, nil |
| 14 | } |
| 15 | id, res, err := ReadResponse(rw, req.ApiKey(), apiVersion) |
| 16 | if err != nil { |
| 17 | return nil, err |
| 18 | } |
| 19 | if id != correlationID { |
| 20 | return nil, Errorf("correlation id mismatch (expected=%d, found=%d)", correlationID, id) |
| 21 | } |
| 22 | return res, nil |
| 23 | } |
| 24 | |
| 25 | func hasResponse(msg Message) bool { |
| 26 | x, _ := msg.(interface{ HasResponse() bool }) |
no test coverage detected