MCPcopy
hub / github.com/segmentio/kafka-go / NewWriter

Function NewWriter

writer.go:415–519  ·  view source on GitHub ↗

NewWriter creates and returns a new Writer configured with config. DEPRECATED: Writer value can be instantiated and configured directly, this function is retained for backward compatibility and will be removed in version 1.0.

(config WriterConfig)

Source from the content-addressed store, hash-verified

413// this function is retained for backward compatibility and will be removed
414// in version 1.0.
415func NewWriter(config WriterConfig) *Writer {
416 if err := config.Validate(); err != nil {
417 panic(err)
418 }
419
420 if config.Dialer == nil {
421 config.Dialer = DefaultDialer
422 }
423
424 if config.Balancer == nil {
425 config.Balancer = &RoundRobin{}
426 }
427
428 // Converts the pre-0.4 Dialer API into a Transport.
429 kafkaDialer := DefaultDialer
430 if config.Dialer != nil {
431 kafkaDialer = config.Dialer
432 }
433
434 dialer := (&net.Dialer{
435 Timeout: kafkaDialer.Timeout,
436 Deadline: kafkaDialer.Deadline,
437 LocalAddr: kafkaDialer.LocalAddr,
438 DualStack: kafkaDialer.DualStack,
439 FallbackDelay: kafkaDialer.FallbackDelay,
440 KeepAlive: kafkaDialer.KeepAlive,
441 })
442
443 var resolver Resolver
444 if r, ok := kafkaDialer.Resolver.(*net.Resolver); ok {
445 dialer.Resolver = r
446 } else {
447 resolver = kafkaDialer.Resolver
448 }
449
450 stats := new(writerStats)
451 // For backward compatibility with the pre-0.4 APIs, support custom
452 // resolvers by wrapping the dial function.
453 dial := func(ctx context.Context, network, addr string) (net.Conn, error) {
454 start := time.Now()
455 defer func() {
456 stats.dials.observe(1)
457 stats.dialTime.observe(int64(time.Since(start)))
458 }()
459 address, err := lookupHost(ctx, addr, resolver)
460 if err != nil {
461 return nil, err
462 }
463 return dialer.DialContext(ctx, network, address)
464 }
465
466 idleTimeout := config.IdleConnTimeout
467 if idleTimeout == 0 {
468 // Historical default value of WriterConfig.IdleTimeout, 9 minutes seems
469 // like it is way too long when there is no ping mechanism in the kafka
470 // protocol.
471 idleTimeout = 9 * time.Minute
472 }

Callers 4

TestDialerResolverFunction · 0.85
newTestWriterFunction · 0.85

Calls 8

lookupHostFunction · 0.85
TCPFunction · 0.85
RequiredAcksTypeAlias · 0.85
DialContextMethod · 0.80
CodeMethod · 0.65
CompressionTypeAlias · 0.50
ValidateMethod · 0.45
observeMethod · 0.45

Tested by 4

TestDialerResolverFunction · 0.68
newTestWriterFunction · 0.68