MCPcopy Index your code
hub / github.com/coder/coder / TestPubsub_Disconnect

Function TestPubsub_Disconnect

coderd/database/pubsub/pubsub_linux_test.go:161–294  ·  view source on GitHub ↗

nolint: paralleltest

(t *testing.T)

Source from the content-addressed store, hash-verified

159
160// nolint: paralleltest
161func TestPubsub_Disconnect(t *testing.T) {
162 // we always use a Docker container for this test, even in CI, since we need to be able to kill
163 // postgres and bring it back on the same port.
164 connectionURL, closePg, err := dbtestutil.OpenContainerized(t, dbtestutil.DBContainerOptions{Port: disconnectTestPort})
165 require.NoError(t, err)
166 defer closePg()
167 db, err := sql.Open("postgres", connectionURL)
168 require.NoError(t, err)
169 defer db.Close()
170
171 ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
172 defer cancelFunc()
173 logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
174 ps, err := pubsub.New(ctx, logger, db, connectionURL)
175 require.NoError(t, err)
176 defer ps.Close()
177 event := "test"
178
179 // buffer responses so that when the test completes, goroutines don't get blocked & leak
180 errors := make(chan error, pubsub.BufferSize)
181 messages := make(chan string, pubsub.BufferSize)
182 readOne := func() (m string, e error) {
183 t.Helper()
184 select {
185 case <-ctx.Done():
186 t.Fatal("timed out")
187 case m = <-messages:
188 // OK
189 }
190 select {
191 case <-ctx.Done():
192 t.Fatal("timed out")
193 case e = <-errors:
194 // OK
195 }
196 return m, e
197 }
198
199 cancelSub, err := ps.SubscribeWithErr(event, func(ctx context.Context, msg []byte, err error) {
200 messages <- string(msg)
201 errors <- err
202 })
203 require.NoError(t, err)
204 defer cancelSub()
205
206 for i := 0; i < 100; i++ {
207 err = ps.Publish(event, []byte(fmt.Sprintf("%d", i)))
208 require.NoError(t, err)
209 }
210 // make sure we're getting at least one message.
211 m, err := readOne()
212 require.NoError(t, err)
213 require.Equal(t, "0", m)
214
215 closePg()
216 // write some more messages until we hit an error
217 j := 100
218 for {

Callers

nothing calls this directly

Calls 12

OpenContainerizedFunction · 0.92
NewFunction · 0.92
FatalMethod · 0.80
LessMethod · 0.80
CloseMethod · 0.65
HelperMethod · 0.65
SubscribeWithErrMethod · 0.65
PublishMethod · 0.65
OpenMethod · 0.45
DoneMethod · 0.45
EqualMethod · 0.45
IsMethod · 0.45

Tested by

no test coverage detected