Tx executes a function within a transaction context. If the KVOperator already has a session, it will use that session. Otherwise, it creates a new transaction session. The transaction will be committed if the function returns nil, or rolled back if it returns an error.
(ctx context.Context, fn func(ctx context.Context, kv *KVOperator) error)
| 273 | // it will use that session. Otherwise, it creates a new transaction session. |
| 274 | // The transaction will be committed if the function returns nil, or rolled back if it returns an error. |
| 275 | func (kv *KVOperator) Tx(ctx context.Context, fn func(ctx context.Context, kv *KVOperator) error) error { |
| 276 | var ( |
| 277 | txKv = kv |
| 278 | shouldCommit bool |
| 279 | ) |
| 280 | |
| 281 | if kv.session == nil { |
| 282 | session := kv.data.DB.NewSession().Context(ctx) |
| 283 | if err := session.Begin(); err != nil { |
| 284 | _ = session.Close() |
| 285 | return fmt.Errorf("%w: begin transaction failed: %v", ErrKVTransactionFailed, err) |
| 286 | } |
| 287 | |
| 288 | defer func() { |
| 289 | if !shouldCommit { |
| 290 | if rollbackErr := session.Rollback(); rollbackErr != nil { |
| 291 | log.Errorf("rollback failed: %v", rollbackErr) |
| 292 | } |
| 293 | } |
| 294 | _ = session.Close() |
| 295 | }() |
| 296 | |
| 297 | txKv = &KVOperator{ |
| 298 | session: session, |
| 299 | data: kv.data, |
| 300 | pluginSlugName: kv.pluginSlugName, |
| 301 | } |
| 302 | shouldCommit = true |
| 303 | } |
| 304 | |
| 305 | if err := fn(ctx, txKv); err != nil { |
| 306 | return fmt.Errorf("%w: %v", ErrKVTransactionFailed, err) |
| 307 | } |
| 308 | |
| 309 | if shouldCommit { |
| 310 | if err := txKv.session.Commit(); err != nil { |
| 311 | return fmt.Errorf("%w: commit failed: %v", ErrKVTransactionFailed, err) |
| 312 | } |
| 313 | } |
| 314 | return nil |
| 315 | } |
| 316 | |
| 317 | // KVStorage defines the interface for plugins that need data storage capabilities |
| 318 | type KVStorage interface { |