nolint: paralleltest
(t *testing.T)
| 159 | |
| 160 | // nolint: paralleltest |
| 161 | func TestPubsub_Disconnect(t *testing.T) { |
| 162 | // we always use a Docker container for this test, even in CI, since we need to be able to kill |
| 163 | // postgres and bring it back on the same port. |
| 164 | connectionURL, closePg, err := dbtestutil.OpenContainerized(t, dbtestutil.DBContainerOptions{Port: disconnectTestPort}) |
| 165 | require.NoError(t, err) |
| 166 | defer closePg() |
| 167 | db, err := sql.Open("postgres", connectionURL) |
| 168 | require.NoError(t, err) |
| 169 | defer db.Close() |
| 170 | |
| 171 | ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitSuperLong) |
| 172 | defer cancelFunc() |
| 173 | logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) |
| 174 | ps, err := pubsub.New(ctx, logger, db, connectionURL) |
| 175 | require.NoError(t, err) |
| 176 | defer ps.Close() |
| 177 | event := "test" |
| 178 | |
| 179 | // buffer responses so that when the test completes, goroutines don't get blocked & leak |
| 180 | errors := make(chan error, pubsub.BufferSize) |
| 181 | messages := make(chan string, pubsub.BufferSize) |
| 182 | readOne := func() (m string, e error) { |
| 183 | t.Helper() |
| 184 | select { |
| 185 | case <-ctx.Done(): |
| 186 | t.Fatal("timed out") |
| 187 | case m = <-messages: |
| 188 | // OK |
| 189 | } |
| 190 | select { |
| 191 | case <-ctx.Done(): |
| 192 | t.Fatal("timed out") |
| 193 | case e = <-errors: |
| 194 | // OK |
| 195 | } |
| 196 | return m, e |
| 197 | } |
| 198 | |
| 199 | cancelSub, err := ps.SubscribeWithErr(event, func(ctx context.Context, msg []byte, err error) { |
| 200 | messages <- string(msg) |
| 201 | errors <- err |
| 202 | }) |
| 203 | require.NoError(t, err) |
| 204 | defer cancelSub() |
| 205 | |
| 206 | for i := 0; i < 100; i++ { |
| 207 | err = ps.Publish(event, []byte(fmt.Sprintf("%d", i))) |
| 208 | require.NoError(t, err) |
| 209 | } |
| 210 | // make sure we're getting at least one message. |
| 211 | m, err := readOne() |
| 212 | require.NoError(t, err) |
| 213 | require.Equal(t, "0", m) |
| 214 | |
| 215 | closePg() |
| 216 | // write some more messages until we hit an error |
| 217 | j := 100 |
| 218 | for { |
nothing calls this directly
no test coverage detected