Unmarshal deserializes JSON to all shards with proper locking
(data []byte)
| 401 | |
| 402 | // Unmarshal deserializes JSON to all shards with proper locking |
| 403 | func (w *Work) Unmarshal(data []byte) error { |
| 404 | w.mtx.Lock() |
| 405 | defer w.mtx.Unlock() |
| 406 | |
| 407 | // Lock all shards in order to prevent race conditions during unmarshaling |
| 408 | for i := range ShardCount { |
| 409 | w.Shards[i].mtx.Lock() |
| 410 | } |
| 411 | defer func() { |
| 412 | // Unlock all shards |
| 413 | for i := range ShardCount { |
| 414 | w.Shards[i].mtx.Unlock() |
| 415 | } |
| 416 | }() |
| 417 | |
| 418 | err := json.Unmarshal(data, w) |
| 419 | if err != nil { |
| 420 | return err |
| 421 | } |
| 422 | |
| 423 | // Ensure all shards are properly initialized (in case any were nil after unmarshaling) |
| 424 | for i := range ShardCount { |
| 425 | if w.Shards[i] == nil { |
| 426 | w.Shards[i] = &Shard{ |
| 427 | Jobs: make(map[string]*Job), |
| 428 | Pending: make(map[string]*Job), |
| 429 | } |
| 430 | } else { |
| 431 | if w.Shards[i].Jobs == nil { |
| 432 | w.Shards[i].Jobs = make(map[string]*Job) |
| 433 | } |
| 434 | if w.Shards[i].Pending == nil { |
| 435 | w.Shards[i].Pending = make(map[string]*Job) |
| 436 | } |
| 437 | } |
| 438 | } |
| 439 | |
| 440 | // Rebuild indexes; Unmarshal holds all shard locks so we only take pendingMtx here. |
| 441 | w.pendingMtx.Lock() |
| 442 | defer w.pendingMtx.Unlock() |
| 443 | w.pendingBlocks = make(map[string]string) |
| 444 | w.pendingByTenant = make(map[string]map[tempopb.JobType][]string) |
| 445 | for i := range ShardCount { |
| 446 | for _, j := range w.Shards[i].Pending { |
| 447 | if key := j.PendingBlockKey(); key != "" { |
| 448 | w.pendingBlocks[key] = j.ID |
| 449 | } |
| 450 | tenant := j.JobDetail.Tenant |
| 451 | if w.pendingByTenant[tenant] == nil { |
| 452 | w.pendingByTenant[tenant] = make(map[tempopb.JobType][]string) |
| 453 | } |
| 454 | w.pendingByTenant[tenant][j.Type] = append(w.pendingByTenant[tenant][j.Type], j.ID) |
| 455 | } |
| 456 | } |
| 457 | |
| 458 | w.runningBlocks = make(map[string]*Job) |
| 459 | for i := range ShardCount { |
| 460 | for _, j := range w.Shards[i].Jobs { |
nothing calls this directly
no test coverage detected