| 386 | } |
| 387 | |
| 388 | func (c *Conn) createTopics(request createTopicsRequest) (createTopicsResponse, error) { |
| 389 | version, err := c.negotiateVersion(createTopics, v0, v1, v2) |
| 390 | if err != nil { |
| 391 | return createTopicsResponse{}, err |
| 392 | } |
| 393 | |
| 394 | request.v = version |
| 395 | response := createTopicsResponse{v: version} |
| 396 | |
| 397 | err = c.writeOperation( |
| 398 | func(deadline time.Time, id int32) error { |
| 399 | if request.Timeout == 0 { |
| 400 | now := time.Now() |
| 401 | deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) |
| 402 | request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) |
| 403 | } |
| 404 | return c.writeRequest(createTopics, version, id, request) |
| 405 | }, |
| 406 | func(deadline time.Time, size int) error { |
| 407 | return expectZeroSize(func() (remain int, err error) { |
| 408 | return (&response).readFrom(&c.rbuf, size) |
| 409 | }()) |
| 410 | }, |
| 411 | ) |
| 412 | if err != nil { |
| 413 | return response, err |
| 414 | } |
| 415 | for _, tr := range response.TopicErrors { |
| 416 | if tr.ErrorCode == int16(TopicAlreadyExists) { |
| 417 | continue |
| 418 | } |
| 419 | if tr.ErrorCode != 0 { |
| 420 | return response, Error(tr.ErrorCode) |
| 421 | } |
| 422 | } |
| 423 | |
| 424 | return response, nil |
| 425 | } |
| 426 | |
| 427 | // CreateTopics creates one topic per provided configuration with idempotent |
| 428 | // operational semantics. In other words, if CreateTopics is invoked with a |