MCPcopy
hub / github.com/segmentio/kafka-go / testConnFetchAndCommitOffsets

Function testConnFetchAndCommitOffsets

conn_test.go:873–940  ·  view source on GitHub ↗
(t *testing.T, conn *Conn)

Source from the content-addressed store, hash-verified

871}
872
873func testConnFetchAndCommitOffsets(t *testing.T, conn *Conn) {
874 const N = 10
875 if _, err := conn.WriteMessages(makeTestSequence(N)...); err != nil {
876 t.Fatal(err)
877 }
878
879 groupID := makeGroupID()
880 generationID, memberID, stop := createGroup(t, conn, groupID)
881 defer stop()
882
883 request := offsetFetchRequestV1{
884 GroupID: groupID,
885 Topics: []offsetFetchRequestV1Topic{
886 {
887 Topic: conn.topic,
888 Partitions: []int32{0},
889 },
890 },
891 }
892 fetch, err := conn.offsetFetch(request)
893 if err != nil {
894 t.Fatalf("bad err: %v", err)
895 }
896
897 if v := len(fetch.Responses); v != 1 {
898 t.Fatalf("expected 1 Response; got %v", v)
899 }
900
901 if v := len(fetch.Responses[0].PartitionResponses); v != 1 {
902 t.Fatalf("expected 1 PartitionResponses; got %v", v)
903 }
904
905 if offset := fetch.Responses[0].PartitionResponses[0].Offset; offset != -1 {
906 t.Fatalf("expected initial offset of -1; got %v", offset)
907 }
908
909 committedOffset := int64(N - 1)
910 _, err = conn.offsetCommit(offsetCommitRequestV2{
911 GroupID: groupID,
912 GenerationID: generationID,
913 MemberID: memberID,
914 RetentionTime: int64(time.Hour / time.Millisecond),
915 Topics: []offsetCommitRequestV2Topic{
916 {
917 Topic: conn.topic,
918 Partitions: []offsetCommitRequestV2Partition{
919 {
920 Partition: 0,
921 Offset: committedOffset,
922 },
923 },
924 },
925 },
926 })
927 if err != nil {
928 t.Fatalf("bad error: %v", err)
929 }
930

Callers

nothing calls this directly

Calls 6

makeTestSequenceFunction · 0.85
makeGroupIDFunction · 0.85
createGroupFunction · 0.85
offsetFetchMethod · 0.65
offsetCommitMethod · 0.65
WriteMessagesMethod · 0.45

Tested by

no test coverage detected