NewWriter creates and returns a new Writer configured with config. DEPRECATED: Writer value can be instantiated and configured directly, this function is retained for backward compatibility and will be removed in version 1.0.
(config WriterConfig)
| 413 | // this function is retained for backward compatibility and will be removed |
| 414 | // in version 1.0. |
| 415 | func NewWriter(config WriterConfig) *Writer { |
| 416 | if err := config.Validate(); err != nil { |
| 417 | panic(err) |
| 418 | } |
| 419 | |
| 420 | if config.Dialer == nil { |
| 421 | config.Dialer = DefaultDialer |
| 422 | } |
| 423 | |
| 424 | if config.Balancer == nil { |
| 425 | config.Balancer = &RoundRobin{} |
| 426 | } |
| 427 | |
| 428 | // Converts the pre-0.4 Dialer API into a Transport. |
| 429 | kafkaDialer := DefaultDialer |
| 430 | if config.Dialer != nil { |
| 431 | kafkaDialer = config.Dialer |
| 432 | } |
| 433 | |
| 434 | dialer := (&net.Dialer{ |
| 435 | Timeout: kafkaDialer.Timeout, |
| 436 | Deadline: kafkaDialer.Deadline, |
| 437 | LocalAddr: kafkaDialer.LocalAddr, |
| 438 | DualStack: kafkaDialer.DualStack, |
| 439 | FallbackDelay: kafkaDialer.FallbackDelay, |
| 440 | KeepAlive: kafkaDialer.KeepAlive, |
| 441 | }) |
| 442 | |
| 443 | var resolver Resolver |
| 444 | if r, ok := kafkaDialer.Resolver.(*net.Resolver); ok { |
| 445 | dialer.Resolver = r |
| 446 | } else { |
| 447 | resolver = kafkaDialer.Resolver |
| 448 | } |
| 449 | |
| 450 | stats := new(writerStats) |
| 451 | // For backward compatibility with the pre-0.4 APIs, support custom |
| 452 | // resolvers by wrapping the dial function. |
| 453 | dial := func(ctx context.Context, network, addr string) (net.Conn, error) { |
| 454 | start := time.Now() |
| 455 | defer func() { |
| 456 | stats.dials.observe(1) |
| 457 | stats.dialTime.observe(int64(time.Since(start))) |
| 458 | }() |
| 459 | address, err := lookupHost(ctx, addr, resolver) |
| 460 | if err != nil { |
| 461 | return nil, err |
| 462 | } |
| 463 | return dialer.DialContext(ctx, network, address) |
| 464 | } |
| 465 | |
| 466 | idleTimeout := config.IdleConnTimeout |
| 467 | if idleTimeout == 0 { |
| 468 | // Historical default value of WriterConfig.IdleTimeout, 9 minutes seems |
| 469 | // like it is way too long when there is no ping mechanism in the kafka |
| 470 | // protocol. |
| 471 | idleTimeout = 9 * time.Minute |
| 472 | } |