MCPcopy
hub / github.com/IBM/sarama / TestConsumeMessagesFromReadReplica

Function TestConsumeMessagesFromReadReplica

consumer_test.go:845–950  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

843}
844
845func TestConsumeMessagesFromReadReplica(t *testing.T) {
846 withRefreshFrequency := func(frequency time.Duration) func(*Config) {
847 return func(cfg *Config) {
848 cfg.Metadata.RefreshFrequency = frequency
849 }
850 }
851 preferredReplica := func(replicaID int32) preferredReadReplica {
852 return preferredReadReplica{id: replicaID, ok: true}
853 }
854 assertOffsets := func(t *testing.T, c PartitionConsumer, offsets ...int64) {
855 t.Helper()
856 for _, want := range offsets {
857 select {
858 case msg := <-c.Messages():
859 assertMessageOffset(t, msg, want)
860 case <-time.After(5 * time.Second):
861 require.Failf(t, "timed out waiting for message", "offset %d", want)
862 }
863 }
864 }
865
866 t.Run("switches to preferred follower", func(t *testing.T) {
867 c, cleanup := newReadReplicaTest(t, readReplicaTestConfig{
868 leaderFetches: []readReplicaFetch{
869 {records: []int64{1, 2}, preferredReadReplica: preferredReplica(1)},
870 },
871 followerFetches: []readReplicaFetch{
872 {records: []int64{3, 4}},
873 },
874 })
875 defer cleanup()
876 assertOffsets(t, c, 1, 2, 3, 4)
877 })
878
879 t.Run("falls back to leader when preferred replica is unknown", func(t *testing.T) {
880 const notInMetadata = 5
881 c, cleanup := newReadReplicaTest(t, readReplicaTestConfig{
882 leaderFetches: []readReplicaFetch{
883 {records: []int64{1, 2}, preferredReadReplica: preferredReplica(notInMetadata)},
884 {records: []int64{3, 4}},
885 },
886 })
887 defer cleanup()
888 assertOffsets(t, c, 1, 2, 3, 4)
889 })
890
891 t.Run("falls back to leader on ErrReplicaNotAvailable", func(t *testing.T) {
892 c, cleanup := newReadReplicaTest(t, readReplicaTestConfig{
893 leaderFetches: []readReplicaFetch{
894 {preferredReadReplica: preferredReplica(1)},
895 {records: []int64{3, 4}},
896 },
897 followerFetches: []readReplicaFetch{
898 {records: []int64{1, 2}},
899 {err: ErrReplicaNotAvailable},
900 },
901 })
902 defer cleanup()

Callers

nothing calls this directly

Calls 5

assertMessageOffsetFunction · 0.85
newReadReplicaTestFunction · 0.85
HelperMethod · 0.80
RunMethod · 0.80
MessagesMethod · 0.65

Tested by

no test coverage detected