MCPcopy
hub / github.com/segmentio/kafka-go / NewConnWith

Function NewConnWith

conn.go:164–203  ·  view source on GitHub ↗

NewConnWith returns a new kafka connection configured with config. The offset is initialized to FirstOffset.

(conn net.Conn, config ConnConfig)

Source from the content-addressed store, hash-verified

162// NewConnWith returns a new kafka connection configured with config.
163// The offset is initialized to FirstOffset.
164func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
165 if len(config.ClientID) == 0 {
166 config.ClientID = DefaultClientID
167 }
168
169 if config.Partition < 0 || config.Partition > math.MaxInt32 {
170 panic(fmt.Sprintf("invalid partition number: %d", config.Partition))
171 }
172
173 c := &Conn{
174 conn: conn,
175 rbuf: *bufio.NewReader(conn),
176 wbuf: *bufio.NewWriter(conn),
177 clientID: config.ClientID,
178 topic: config.Topic,
179 partition: int32(config.Partition),
180 broker: int32(config.Broker),
181 rack: config.Rack,
182 offset: FirstOffset,
183 requiredAcks: -1,
184 transactionalID: emptyToNullable(config.TransactionalID),
185 }
186
187 c.wb.w = &c.wbuf
188
189 // The fetch request needs to ask for a MaxBytes value that is at least
190 // enough to load the control data of the response. To avoid having to
191 // recompute it on every read, it is cached here in the Conn value.
192 c.fetchMinSize = (fetchResponseV2{
193 Topics: []fetchResponseTopicV2{{
194 TopicName: config.Topic,
195 Partitions: []fetchResponsePartitionV2{{
196 Partition: int32(config.Partition),
197 MessageSet: messageSet{{}},
198 }},
199 }},
200 }).size()
201 c.fetchMaxBytes = math.MaxInt32 - c.fetchMinSize
202 return c
203}
204
205func (c *Conn) negotiateVersion(key apiKey, sortedSupportedVersions ...apiVersion) (apiVersion, error) {
206 v, err := c.loadVersions()

Callers 2

connectMethod · 0.85
NewConnFunction · 0.85

Calls 4

emptyToNullableFunction · 0.85
NewReaderMethod · 0.65
NewWriterMethod · 0.65
sizeMethod · 0.65

Tested by

no test coverage detected