MCPcopy
hub / github.com/segmentio/kafka-go / createTopic

Function createTopic

reader_test.go:279–326  ·  view source on GitHub ↗
(t *testing.T, topic string, partitions int)

Source from the content-addressed store, hash-verified

277}
278
279func createTopic(t *testing.T, topic string, partitions int) {
280 t.Helper()
281
282 t.Logf("createTopic(%s, %d)", topic, partitions)
283
284 conn, err := Dial("tcp", "localhost:9092")
285 if err != nil {
286 err = fmt.Errorf("createTopic, Dial: %w", err)
287 t.Fatal(err)
288 }
289 defer conn.Close()
290
291 controller, err := conn.Controller()
292 if err != nil {
293 err = fmt.Errorf("createTopic, conn.Controller: %w", err)
294 t.Fatal(err)
295 }
296
297 conn, err = Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
298 if err != nil {
299 t.Fatal(err)
300 }
301
302 conn.SetDeadline(time.Now().Add(10 * time.Second))
303
304 _, err = conn.createTopics(createTopicsRequest{
305 Topics: []createTopicsRequestV0Topic{
306 {
307 Topic: topic,
308 NumPartitions: int32(partitions),
309 ReplicationFactor: 1,
310 },
311 },
312 Timeout: milliseconds(5 * time.Second),
313 })
314 if err != nil {
315 if !errors.Is(err, TopicAlreadyExists) {
316 err = fmt.Errorf("createTopic, conn.createTopics: %w", err)
317 t.Error(err)
318 t.FailNow()
319 }
320 }
321
322 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
323 defer cancel()
324
325 waitForTopic(ctx, t, topic)
326}
327
328// Block until topic exists.
329func waitForTopic(ctx context.Context, t *testing.T, topic string) {

Calls 8

DialFunction · 0.85
millisecondsFunction · 0.85
waitForTopicFunction · 0.85
ControllerMethod · 0.80
createTopicsMethod · 0.80
CloseMethod · 0.45
SetDeadlineMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected