(t *testing.T)
| 119 | } |
| 120 | |
| 121 | func TestPGPubsubDriver(t *testing.T) { |
| 122 | t.Parallel() |
| 123 | |
| 124 | ctx := testutil.Context(t, testutil.WaitLong) |
| 125 | logger := slogtest.Make(t, &slogtest.Options{ |
| 126 | IgnoreErrors: true, |
| 127 | }).Leveled(slog.LevelDebug) |
| 128 | |
| 129 | connectionURL, err := dbtestutil.Open(t) |
| 130 | require.NoError(t, err) |
| 131 | |
| 132 | // use a separate subber and pubber so we can keep track of listener connections |
| 133 | db, err := sql.Open("postgres", connectionURL) |
| 134 | require.NoError(t, err) |
| 135 | defer db.Close() |
| 136 | pubber, err := pubsub.New(ctx, logger, db, connectionURL) |
| 137 | require.NoError(t, err) |
| 138 | defer pubber.Close() |
| 139 | |
| 140 | // use a connector that sends us the connections for the subber |
| 141 | subDriver := dbtestutil.NewDriver() |
| 142 | defer subDriver.Close() |
| 143 | tconn, err := subDriver.Connector(connectionURL) |
| 144 | require.NoError(t, err) |
| 145 | tcdb := sql.OpenDB(tconn) |
| 146 | defer tcdb.Close() |
| 147 | subber, err := pubsub.New(ctx, logger, tcdb, connectionURL) |
| 148 | require.NoError(t, err) |
| 149 | defer subber.Close() |
| 150 | |
| 151 | // test that we can publish and subscribe |
| 152 | gotChan := make(chan struct{}, 1) |
| 153 | defer close(gotChan) |
| 154 | subCancel, err := subber.Subscribe("test", func(_ context.Context, _ []byte) { |
| 155 | select { |
| 156 | case gotChan <- struct{}{}: |
| 157 | default: |
| 158 | } |
| 159 | }) |
| 160 | require.NoError(t, err) |
| 161 | defer subCancel() |
| 162 | |
| 163 | // send a message |
| 164 | err = pubber.Publish("test", []byte("hello")) |
| 165 | require.NoError(t, err) |
| 166 | |
| 167 | // wait for the message |
| 168 | _ = testutil.TryReceive(ctx, t, gotChan) |
| 169 | |
| 170 | // read out first connection |
| 171 | firstConn := testutil.TryReceive(ctx, t, subDriver.Connections) |
| 172 | |
| 173 | // drop the underlying connection being used by the pubsub |
| 174 | // the pq.Listener should reconnect and repopulate it's listeners |
| 175 | // so old subscriptions should still work |
| 176 | err = firstConn.Close() |
| 177 | require.NoError(t, err) |
| 178 |
nothing calls this directly
no test coverage detected