MCPcopy
hub / github.com/segmentio/kafka-go / createTopics

Method createTopics

createtopics.go:388–425  ·  view source on GitHub ↗
(request createTopicsRequest)

Source from the content-addressed store, hash-verified

386}
387
388func (c *Conn) createTopics(request createTopicsRequest) (createTopicsResponse, error) {
389 version, err := c.negotiateVersion(createTopics, v0, v1, v2)
390 if err != nil {
391 return createTopicsResponse{}, err
392 }
393
394 request.v = version
395 response := createTopicsResponse{v: version}
396
397 err = c.writeOperation(
398 func(deadline time.Time, id int32) error {
399 if request.Timeout == 0 {
400 now := time.Now()
401 deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
402 request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
403 }
404 return c.writeRequest(createTopics, version, id, request)
405 },
406 func(deadline time.Time, size int) error {
407 return expectZeroSize(func() (remain int, err error) {
408 return (&response).readFrom(&c.rbuf, size)
409 }())
410 },
411 )
412 if err != nil {
413 return response, err
414 }
415 for _, tr := range response.TopicErrors {
416 if tr.ErrorCode == int16(TopicAlreadyExists) {
417 continue
418 }
419 if tr.ErrorCode != 0 {
420 return response, Error(tr.ErrorCode)
421 }
422 }
423
424 return response, nil
425}
426
427// CreateTopics creates one topic per provided configuration with idempotent
428// operational semantics. In other words, if CreateTopics is invoked with a

Callers 2

CreateTopicsMethod · 0.95
createTopicFunction · 0.80

Calls 9

negotiateVersionMethod · 0.95
writeOperationMethod · 0.95
writeRequestMethod · 0.95
adjustDeadlineForRTTFunction · 0.85
millisecondsFunction · 0.85
deadlineToTimeoutFunction · 0.85
expectZeroSizeFunction · 0.85
ErrorTypeAlias · 0.70
readFromMethod · 0.45

Tested by 1

createTopicFunction · 0.64