start is the main function that decodes the large byte array into several events that are sent through the eventstream.
(pipeWriter *io.PipeWriter)
| 516 | // start is the main function that decodes the large byte array into |
| 517 | // several events that are sent through the eventstream. |
| 518 | func (s *SelectResults) start(pipeWriter *io.PipeWriter) { |
| 519 | go func() { |
| 520 | for { |
| 521 | var prelude preludeInfo |
| 522 | headers := make(http.Header) |
| 523 | var err error |
| 524 | |
| 525 | // Create CRC code |
| 526 | crc := crc32.New(crc32.IEEETable) |
| 527 | crcReader := io.TeeReader(s.resp.Body, crc) |
| 528 | |
| 529 | // Extract the prelude(12 bytes) into a struct to extract relevant information. |
| 530 | prelude, err = processPrelude(crcReader, crc) |
| 531 | if err != nil { |
| 532 | pipeWriter.CloseWithError(err) |
| 533 | closeResponse(s.resp) |
| 534 | return |
| 535 | } |
| 536 | |
| 537 | // Extract the headers(variable bytes) into a struct to extract relevant information |
| 538 | if prelude.headerLen > 0 { |
| 539 | if err = extractHeader(io.LimitReader(crcReader, int64(prelude.headerLen)), headers); err != nil { |
| 540 | pipeWriter.CloseWithError(err) |
| 541 | closeResponse(s.resp) |
| 542 | return |
| 543 | } |
| 544 | } |
| 545 | |
| 546 | // Get the actual payload length so that the appropriate amount of |
| 547 | // bytes can be read or parsed. |
| 548 | payloadLen := prelude.PayloadLen() |
| 549 | |
| 550 | m := messageType(headers.Get("message-type")) |
| 551 | |
| 552 | switch m { |
| 553 | case errorMsg: |
| 554 | pipeWriter.CloseWithError(errors.New(headers.Get("error-code") + ":\"" + headers.Get("error-message") + "\"")) |
| 555 | closeResponse(s.resp) |
| 556 | return |
| 557 | case commonMsg: |
| 558 | // Get content-type of the payload. |
| 559 | c := contentType(headers.Get("content-type")) |
| 560 | |
| 561 | // Get event type of the payload. |
| 562 | e := eventType(headers.Get("event-type")) |
| 563 | |
| 564 | // Handle all supported events. |
| 565 | switch e { |
| 566 | case endEvent: |
| 567 | pipeWriter.Close() |
| 568 | closeResponse(s.resp) |
| 569 | return |
| 570 | case recordsEvent: |
| 571 | if _, err = io.Copy(pipeWriter, io.LimitReader(crcReader, payloadLen)); err != nil { |
| 572 | pipeWriter.CloseWithError(err) |
| 573 | closeResponse(s.resp) |
| 574 | return |
| 575 | } |
no test coverage detected