| 111 | } |
| 112 | |
| 113 | func TestPubsub_ordering(t *testing.T) { |
| 114 | t.Parallel() |
| 115 | |
| 116 | ctx, cancelFunc := context.WithCancel(context.Background()) |
| 117 | defer cancelFunc() |
| 118 | logger := testutil.Logger(t) |
| 119 | |
| 120 | connectionURL, err := dbtestutil.Open(t) |
| 121 | require.NoError(t, err) |
| 122 | db, err := sql.Open("postgres", connectionURL) |
| 123 | require.NoError(t, err) |
| 124 | defer db.Close() |
| 125 | ps, err := pubsub.New(ctx, logger, db, connectionURL) |
| 126 | require.NoError(t, err) |
| 127 | defer ps.Close() |
| 128 | event := "test" |
| 129 | messageChannel := make(chan []byte, 100) |
| 130 | cancelSub, err := ps.Subscribe(event, func(ctx context.Context, message []byte) { |
| 131 | // sleep a random amount of time to simulate handlers taking different amount of time |
| 132 | // to process, depending on the message |
| 133 | // nolint: gosec |
| 134 | n := rand.Intn(100) |
| 135 | time.Sleep(time.Duration(n) * time.Millisecond) |
| 136 | messageChannel <- message |
| 137 | }) |
| 138 | require.NoError(t, err) |
| 139 | defer cancelSub() |
| 140 | for i := 0; i < 100; i++ { |
| 141 | err = ps.Publish(event, []byte(fmt.Sprintf("%d", i))) |
| 142 | assert.NoError(t, err) |
| 143 | } |
| 144 | for i := 0; i < 100; i++ { |
| 145 | select { |
| 146 | case <-time.After(testutil.WaitShort): |
| 147 | t.Fatalf("timed out waiting for message %d", i) |
| 148 | case message := <-messageChannel: |
| 149 | assert.Equal(t, fmt.Sprintf("%d", i), string(message)) |
| 150 | } |
| 151 | } |
| 152 | } |
| 153 | |
| 154 | // disconnectTestPort is the hardcoded port for TestPubsub_Disconnect. In this test we need to be able to stop Postgres |
| 155 | // and restart it on the same port. If we use an ephemeral port, there is a chance the OS will reallocate before we |