MCPcopy
hub / github.com/segmentio/kafka-go / createTopicWithCompaction

Function createTopicWithCompaction

reader_test.go:1931–1972  ·  view source on GitHub ↗
(t *testing.T, topic string, partitions int)

Source from the content-addressed store, hash-verified

1929}
1930
1931func createTopicWithCompaction(t *testing.T, topic string, partitions int) {
1932 t.Helper()
1933
1934 t.Logf("createTopic(%s, %d)", topic, partitions)
1935
1936 conn, err := Dial("tcp", "localhost:9092")
1937 require.NoError(t, err)
1938 defer conn.Close()
1939
1940 controller, err := conn.Controller()
1941 require.NoError(t, err)
1942
1943 conn, err = Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
1944 require.NoError(t, err)
1945
1946 conn.SetDeadline(time.Now().Add(10 * time.Second))
1947
1948 err = conn.CreateTopics(TopicConfig{
1949 Topic: topic,
1950 NumPartitions: partitions,
1951 ReplicationFactor: 1,
1952 ConfigEntries: []ConfigEntry{
1953 {
1954 ConfigName: "cleanup.policy",
1955 ConfigValue: "compact",
1956 },
1957 {
1958 ConfigName: "segment.bytes",
1959 ConfigValue: "200",
1960 },
1961 },
1962 })
1963 if err != nil {
1964 if !errors.Is(err, TopicAlreadyExists) {
1965 require.NoError(t, err)
1966 }
1967 }
1968
1969 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
1970 defer cancel()
1971 waitForTopic(ctx, t, topic)
1972}
1973
1974// The current behavior of the Reader is to retry OffsetOutOfRange errors
1975// indefinitely, which results in programs hanging in the event of a topic being

Callers 1

Calls 6

DialFunction · 0.85
waitForTopicFunction · 0.85
ControllerMethod · 0.80
CloseMethod · 0.45
SetDeadlineMethod · 0.45
CreateTopicsMethod · 0.45

Tested by

no test coverage detected