MCPcopy Index your code
hub / github.com/apache/answer / Tx

Method Tx

plugin/kv_storage.go:275–315  ·  view source on GitHub ↗

Tx executes a function within a transaction context. If the KVOperator already has a session, it will use that session. Otherwise, it creates a new transaction session. The transaction will be committed if the function returns nil, or rolled back if it returns an error.

(ctx context.Context, fn func(ctx context.Context, kv *KVOperator) error)

Source from the content-addressed store, hash-verified

273// it will use that session. Otherwise, it creates a new transaction session.
274// The transaction will be committed if the function returns nil, or rolled back if it returns an error.
275func (kv *KVOperator) Tx(ctx context.Context, fn func(ctx context.Context, kv *KVOperator) error) error {
276 var (
277 txKv = kv
278 shouldCommit bool
279 )
280
281 if kv.session == nil {
282 session := kv.data.DB.NewSession().Context(ctx)
283 if err := session.Begin(); err != nil {
284 _ = session.Close()
285 return fmt.Errorf("%w: begin transaction failed: %v", ErrKVTransactionFailed, err)
286 }
287
288 defer func() {
289 if !shouldCommit {
290 if rollbackErr := session.Rollback(); rollbackErr != nil {
291 log.Errorf("rollback failed: %v", rollbackErr)
292 }
293 }
294 _ = session.Close()
295 }()
296
297 txKv = &KVOperator{
298 session: session,
299 data: kv.data,
300 pluginSlugName: kv.pluginSlugName,
301 }
302 shouldCommit = true
303 }
304
305 if err := fn(ctx, txKv); err != nil {
306 return fmt.Errorf("%w: %v", ErrKVTransactionFailed, err)
307 }
308
309 if shouldCommit {
310 if err := txKv.session.Commit(); err != nil {
311 return fmt.Errorf("%w: commit failed: %v", ErrKVTransactionFailed, err)
312 }
313 }
314 return nil
315}
316
317// KVStorage defines the interface for plugins that need data storage capabilities
318type KVStorage interface {

Callers 1

TestTransactionsFunction · 0.80

Calls 1

CloseMethod · 0.65

Tested by 1

TestTransactionsFunction · 0.64