joinGroup attempts to join a consumer group See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
(request joinGroupRequest)
| 369 | // |
| 370 | // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup |
| 371 | func (c *Conn) joinGroup(request joinGroupRequest) (joinGroupResponse, error) { |
| 372 | version, err := c.negotiateVersion(joinGroup, v1, v2) |
| 373 | if err != nil { |
| 374 | return joinGroupResponse{}, err |
| 375 | } |
| 376 | |
| 377 | response := joinGroupResponse{v: version} |
| 378 | |
| 379 | err = c.writeOperation( |
| 380 | func(deadline time.Time, id int32) error { |
| 381 | return c.writeRequest(joinGroup, version, id, request) |
| 382 | }, |
| 383 | func(deadline time.Time, size int) error { |
| 384 | return expectZeroSize(func() (remain int, err error) { |
| 385 | return (&response).readFrom(&c.rbuf, size) |
| 386 | }()) |
| 387 | }, |
| 388 | ) |
| 389 | if err != nil { |
| 390 | return joinGroupResponse{}, err |
| 391 | } |
| 392 | if response.ErrorCode != 0 { |
| 393 | return joinGroupResponse{}, Error(response.ErrorCode) |
| 394 | } |
| 395 | |
| 396 | return response, nil |
| 397 | } |
| 398 | |
| 399 | // leaveGroup leaves the consumer from the consumer group |
| 400 | // |
nothing calls this directly
no test coverage detected