()
| 520 | } |
| 521 | |
| 522 | func (a *csAttempt) newStream() error { |
| 523 | cs := a.cs |
| 524 | cs.callHdr.PreviousAttempts = cs.numRetries |
| 525 | |
| 526 | // Merge metadata stored in PickResult, if any, with existing call metadata. |
| 527 | // It is safe to overwrite the csAttempt's context here, since all state |
| 528 | // maintained in it are local to the attempt. When the attempt has to be |
| 529 | // retried, a new instance of csAttempt will be created. |
| 530 | if a.pickResult.Metadata != nil { |
| 531 | // We currently do not have a function it the metadata package which |
| 532 | // merges given metadata with existing metadata in a context. Existing |
| 533 | // function `AppendToOutgoingContext()` takes a variadic argument of key |
| 534 | // value pairs. |
| 535 | // |
| 536 | // TODO: Make it possible to retrieve key value pairs from metadata.MD |
| 537 | // in a form passable to AppendToOutgoingContext(), or create a version |
| 538 | // of AppendToOutgoingContext() that accepts a metadata.MD. |
| 539 | md, _ := metadata.FromOutgoingContext(a.ctx) |
| 540 | md = metadata.Join(md, a.pickResult.Metadata) |
| 541 | a.ctx = metadata.NewOutgoingContext(a.ctx, md) |
| 542 | |
| 543 | // If the `CallAuthority` CallOption is not set, check if the LB picker |
| 544 | // has provided an authority override in the PickResult metadata and |
| 545 | // apply it, as specified in gRFC A81. |
| 546 | if cs.callInfo.authority == "" { |
| 547 | if authMD := a.pickResult.Metadata.Get(":authority"); len(authMD) > 0 { |
| 548 | cs.callHdr.Authority = authMD[0] |
| 549 | } |
| 550 | } |
| 551 | } |
| 552 | s, err := a.transport.NewStream(a.ctx, cs.callHdr, a.statsHandler) |
| 553 | if err != nil { |
| 554 | nse, ok := err.(*transport.NewStreamError) |
| 555 | if !ok { |
| 556 | // Unexpected. |
| 557 | return err |
| 558 | } |
| 559 | |
| 560 | if nse.AllowTransparentRetry { |
| 561 | a.allowTransparentRetry = true |
| 562 | } |
| 563 | |
| 564 | // Unwrap and convert error. |
| 565 | return toRPCErr(nse.Err) |
| 566 | } |
| 567 | a.transportStream = s |
| 568 | a.ctx = s.Context() |
| 569 | a.parser = parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool} |
| 570 | return nil |
| 571 | } |
| 572 | |
| 573 | // clientStream implements a client side Stream. |
| 574 | type clientStream struct { |
no test coverage detected