()
| 1023 | } |
| 1024 | |
| 1025 | func (i *bridgeIterator) Next() (*parquetquery.IteratorResult, error) { |
| 1026 | // drain current buffer |
| 1027 | if len(i.nextSpans) > 0 { |
| 1028 | ret := i.nextSpans[0] |
| 1029 | i.nextSpans = i.nextSpans[1:] |
| 1030 | return i.spanToIteratorResult(ret), nil |
| 1031 | } |
| 1032 | |
| 1033 | for { |
| 1034 | res, err := i.iter.Next() |
| 1035 | if err != nil { |
| 1036 | return nil, err |
| 1037 | } |
| 1038 | if res == nil { |
| 1039 | return nil, nil |
| 1040 | } |
| 1041 | |
| 1042 | // The spanset is in the OtherEntries |
| 1043 | iface := res.OtherValueFromKey(otherEntrySpansetKey) |
| 1044 | if iface == nil { |
| 1045 | return nil, fmt.Errorf("engine assumption broken: spanset not found in other entries in bridge") |
| 1046 | } |
| 1047 | spanset, ok := iface.(*traceql.Spanset) |
| 1048 | if !ok { |
| 1049 | return nil, fmt.Errorf("engine assumption broken: spanset is not of type *traceql.Spanset in bridge") |
| 1050 | } |
| 1051 | |
| 1052 | filteredSpansets, err := i.cb(spanset) |
| 1053 | if errors.Is(err, io.EOF) { |
| 1054 | return nil, nil |
| 1055 | } |
| 1056 | if err != nil { |
| 1057 | return nil, err |
| 1058 | } |
| 1059 | // if the filter removed all spansets then let's release all back to the pool |
| 1060 | // no reason to try anything more nuanced than this. it will handle nearly all cases |
| 1061 | if len(filteredSpansets) == 0 { |
| 1062 | for _, s := range spanset.Spans { |
| 1063 | putSpan(s.(*span)) |
| 1064 | } |
| 1065 | putSpanset(spanset) |
| 1066 | } |
| 1067 | |
| 1068 | // flatten spans into i.currentSpans |
| 1069 | for _, ss := range filteredSpansets { |
| 1070 | for idx, s := range ss.Spans { |
| 1071 | span := s.(*span) |
| 1072 | |
| 1073 | // mark whether this is the last span in the spanset |
| 1074 | span.cbSpansetFinal = idx == len(ss.Spans)-1 |
| 1075 | span.cbSpanset = ss |
| 1076 | i.nextSpans = append(i.nextSpans, span) |
| 1077 | } |
| 1078 | } |
| 1079 | |
| 1080 | sort.Slice(i.nextSpans, func(j, k int) bool { |
| 1081 | return parquetquery.CompareRowNumbers(DefinitionLevelResourceSpansILSSpan, i.nextSpans[j].rowNum, i.nextSpans[k].rowNum) == -1 |
| 1082 | }) |
no test coverage detected