(t *testing.T)
| 843 | } |
| 844 | |
| 845 | func TestConsumeMessagesFromReadReplica(t *testing.T) { |
| 846 | withRefreshFrequency := func(frequency time.Duration) func(*Config) { |
| 847 | return func(cfg *Config) { |
| 848 | cfg.Metadata.RefreshFrequency = frequency |
| 849 | } |
| 850 | } |
| 851 | preferredReplica := func(replicaID int32) preferredReadReplica { |
| 852 | return preferredReadReplica{id: replicaID, ok: true} |
| 853 | } |
| 854 | assertOffsets := func(t *testing.T, c PartitionConsumer, offsets ...int64) { |
| 855 | t.Helper() |
| 856 | for _, want := range offsets { |
| 857 | select { |
| 858 | case msg := <-c.Messages(): |
| 859 | assertMessageOffset(t, msg, want) |
| 860 | case <-time.After(5 * time.Second): |
| 861 | require.Failf(t, "timed out waiting for message", "offset %d", want) |
| 862 | } |
| 863 | } |
| 864 | } |
| 865 | |
| 866 | t.Run("switches to preferred follower", func(t *testing.T) { |
| 867 | c, cleanup := newReadReplicaTest(t, readReplicaTestConfig{ |
| 868 | leaderFetches: []readReplicaFetch{ |
| 869 | {records: []int64{1, 2}, preferredReadReplica: preferredReplica(1)}, |
| 870 | }, |
| 871 | followerFetches: []readReplicaFetch{ |
| 872 | {records: []int64{3, 4}}, |
| 873 | }, |
| 874 | }) |
| 875 | defer cleanup() |
| 876 | assertOffsets(t, c, 1, 2, 3, 4) |
| 877 | }) |
| 878 | |
| 879 | t.Run("falls back to leader when preferred replica is unknown", func(t *testing.T) { |
| 880 | const notInMetadata = 5 |
| 881 | c, cleanup := newReadReplicaTest(t, readReplicaTestConfig{ |
| 882 | leaderFetches: []readReplicaFetch{ |
| 883 | {records: []int64{1, 2}, preferredReadReplica: preferredReplica(notInMetadata)}, |
| 884 | {records: []int64{3, 4}}, |
| 885 | }, |
| 886 | }) |
| 887 | defer cleanup() |
| 888 | assertOffsets(t, c, 1, 2, 3, 4) |
| 889 | }) |
| 890 | |
| 891 | t.Run("falls back to leader on ErrReplicaNotAvailable", func(t *testing.T) { |
| 892 | c, cleanup := newReadReplicaTest(t, readReplicaTestConfig{ |
| 893 | leaderFetches: []readReplicaFetch{ |
| 894 | {preferredReadReplica: preferredReplica(1)}, |
| 895 | {records: []int64{3, 4}}, |
| 896 | }, |
| 897 | followerFetches: []readReplicaFetch{ |
| 898 | {records: []int64{1, 2}}, |
| 899 | {err: ErrReplicaNotAvailable}, |
| 900 | }, |
| 901 | }) |
| 902 | defer cleanup() |
nothing calls this directly
no test coverage detected