| 227 | } |
| 228 | |
| 229 | func createAndInitDatabase(t TBSubset, connParams ConnectionParams, db *sql.DB, name string, initialize func(*sql.DB) error) error { |
| 230 | // We will use a tx to obtain a lock, so another test or process doesn't race with us. |
| 231 | tx, err := db.BeginTx(context.Background(), nil) |
| 232 | if err != nil { |
| 233 | return xerrors.Errorf("begin tx: %w", err) |
| 234 | } |
| 235 | // we only use the transaction for locking and querying, so it's fine to always roll it back. |
| 236 | defer func() { |
| 237 | err := tx.Rollback() |
| 238 | if err != nil && !errors.Is(err, sql.ErrTxDone) { |
| 239 | t.Logf("create database: failed to rollback tx: %s\n", err.Error()) |
| 240 | } |
| 241 | }() |
| 242 | // 2137 is an arbitrary number. We just need a lock that is unique to creating |
| 243 | // the database. |
| 244 | _, err = tx.Exec("SELECT pg_advisory_xact_lock(2137)") |
| 245 | if err != nil { |
| 246 | return xerrors.Errorf("acquire lock: %w", err) |
| 247 | } |
| 248 | |
| 249 | // Someone else might have created the db while we were waiting. |
| 250 | dbExistsRes, err := tx.Query("SELECT 1 FROM pg_database WHERE datname = $1", name) |
| 251 | if err != nil { |
| 252 | return xerrors.Errorf("check if db exists: %w", err) |
| 253 | } |
| 254 | dbAlreadyExists := dbExistsRes.Next() |
| 255 | if err := dbExistsRes.Close(); err != nil { |
| 256 | return xerrors.Errorf("close tpl db exists res: %w", err) |
| 257 | } |
| 258 | if dbAlreadyExists { |
| 259 | return nil |
| 260 | } |
| 261 | |
| 262 | // We will use a temporary database to avoid race conditions. We will |
| 263 | // rename it to the real database name after we're sure it was fully |
| 264 | // initialized. |
| 265 | // It's dropped here to ensure that if a previous run of this function failed |
| 266 | // midway, we don't encounter issues with the temporary database still existing. |
| 267 | tmpDBName := "tmp_" + name |
| 268 | // We're using db instead of tx here because you can't run `DROP DATABASE` inside |
| 269 | // a transaction. |
| 270 | if _, err := db.Exec("DROP DATABASE IF EXISTS " + tmpDBName); err != nil { |
| 271 | return xerrors.Errorf("drop tmp db: %w", err) |
| 272 | } |
| 273 | if _, err := db.Exec("CREATE DATABASE " + tmpDBName); err != nil { |
| 274 | return xerrors.Errorf("create tmp db: %w", err) |
| 275 | } |
| 276 | tmpDbURL := ConnectionParams{ |
| 277 | Username: connParams.Username, |
| 278 | Password: connParams.Password, |
| 279 | Host: connParams.Host, |
| 280 | Port: connParams.Port, |
| 281 | DBName: tmpDBName, |
| 282 | }.DSN() |
| 283 | tmpDb, err := sql.Open("postgres", tmpDbURL) |
| 284 | if err != nil { |
| 285 | return xerrors.Errorf("connect to template db: %w", err) |
| 286 | } |