MCPcopy
hub / github.com/segmentio/kafka-go / SetOffsetAt

Method SetOffsetAt

reader.go:1058–1087  ·  view source on GitHub ↗

SetOffsetAt changes the offset from which the next batch of messages will be read given the timestamp t. The method fails if the unable to connect partition leader, or unable to read the offset given the ts, or if the reader has been closed.

(ctx context.Context, t time.Time)

Source from the content-addressed store, hash-verified

1056// The method fails if the unable to connect partition leader, or unable to read the offset
1057// given the ts, or if the reader has been closed.
1058func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error {
1059 r.mutex.Lock()
1060 if r.closed {
1061 r.mutex.Unlock()
1062 return io.ErrClosedPipe
1063 }
1064 r.mutex.Unlock()
1065
1066 if len(r.config.Brokers) < 1 {
1067 return errors.New("no brokers in config")
1068 }
1069 var conn *Conn
1070 var err error
1071 for _, broker := range r.config.Brokers {
1072 conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition)
1073 if err != nil {
1074 continue
1075 }
1076 deadline, _ := ctx.Deadline()
1077 conn.SetDeadline(deadline)
1078 offset, err := conn.ReadOffset(t)
1079 conn.Close()
1080 if err != nil {
1081 return err
1082 }
1083
1084 return r.SetOffset(offset)
1085 }
1086 return fmt.Errorf("error dialing all brokers, one of the errors: %w", err)
1087}
1088
1089// Stats returns a snapshot of the reader stats since the last time the method
1090// was called, or since the reader was created if it is called for the first

Callers 1

testReaderSetOffsetAtFunction · 0.80

Calls 6

SetDeadlineMethod · 0.95
ReadOffsetMethod · 0.95
CloseMethod · 0.95
SetOffsetMethod · 0.95
DialLeaderMethod · 0.80
DeadlineMethod · 0.80

Tested by 1

testReaderSetOffsetAtFunction · 0.64