(t *testing.T, topic string, partitions int)
| 1929 | } |
| 1930 | |
| 1931 | func createTopicWithCompaction(t *testing.T, topic string, partitions int) { |
| 1932 | t.Helper() |
| 1933 | |
| 1934 | t.Logf("createTopic(%s, %d)", topic, partitions) |
| 1935 | |
| 1936 | conn, err := Dial("tcp", "localhost:9092") |
| 1937 | require.NoError(t, err) |
| 1938 | defer conn.Close() |
| 1939 | |
| 1940 | controller, err := conn.Controller() |
| 1941 | require.NoError(t, err) |
| 1942 | |
| 1943 | conn, err = Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) |
| 1944 | require.NoError(t, err) |
| 1945 | |
| 1946 | conn.SetDeadline(time.Now().Add(10 * time.Second)) |
| 1947 | |
| 1948 | err = conn.CreateTopics(TopicConfig{ |
| 1949 | Topic: topic, |
| 1950 | NumPartitions: partitions, |
| 1951 | ReplicationFactor: 1, |
| 1952 | ConfigEntries: []ConfigEntry{ |
| 1953 | { |
| 1954 | ConfigName: "cleanup.policy", |
| 1955 | ConfigValue: "compact", |
| 1956 | }, |
| 1957 | { |
| 1958 | ConfigName: "segment.bytes", |
| 1959 | ConfigValue: "200", |
| 1960 | }, |
| 1961 | }, |
| 1962 | }) |
| 1963 | if err != nil { |
| 1964 | if !errors.Is(err, TopicAlreadyExists) { |
| 1965 | require.NoError(t, err) |
| 1966 | } |
| 1967 | } |
| 1968 | |
| 1969 | ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) |
| 1970 | defer cancel() |
| 1971 | waitForTopic(ctx, t, topic) |
| 1972 | } |
| 1973 | |
| 1974 | // The current behavior of the Reader is to retry OffsetOutOfRange errors |
| 1975 | // indefinitely, which results in programs hanging in the event of a topic being |
no test coverage detected