SetOffsetAt changes the offset from which the next batch of messages will be read given the timestamp t. The method fails if the unable to connect partition leader, or unable to read the offset given the ts, or if the reader has been closed.
(ctx context.Context, t time.Time)
| 1056 | // The method fails if the unable to connect partition leader, or unable to read the offset |
| 1057 | // given the ts, or if the reader has been closed. |
| 1058 | func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error { |
| 1059 | r.mutex.Lock() |
| 1060 | if r.closed { |
| 1061 | r.mutex.Unlock() |
| 1062 | return io.ErrClosedPipe |
| 1063 | } |
| 1064 | r.mutex.Unlock() |
| 1065 | |
| 1066 | if len(r.config.Brokers) < 1 { |
| 1067 | return errors.New("no brokers in config") |
| 1068 | } |
| 1069 | var conn *Conn |
| 1070 | var err error |
| 1071 | for _, broker := range r.config.Brokers { |
| 1072 | conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition) |
| 1073 | if err != nil { |
| 1074 | continue |
| 1075 | } |
| 1076 | deadline, _ := ctx.Deadline() |
| 1077 | conn.SetDeadline(deadline) |
| 1078 | offset, err := conn.ReadOffset(t) |
| 1079 | conn.Close() |
| 1080 | if err != nil { |
| 1081 | return err |
| 1082 | } |
| 1083 | |
| 1084 | return r.SetOffset(offset) |
| 1085 | } |
| 1086 | return fmt.Errorf("error dialing all brokers, one of the errors: %w", err) |
| 1087 | } |
| 1088 | |
| 1089 | // Stats returns a snapshot of the reader stats since the last time the method |
| 1090 | // was called, or since the reader was created if it is called for the first |