| 467 | } |
| 468 | |
| 469 | func TestRingSetAddrsAndRebalanceRace(t *testing.T) { |
| 470 | const ( |
| 471 | ringShard1Name = "ringShardOne" |
| 472 | ringShard2Name = "ringShardTwo" |
| 473 | |
| 474 | ringShard1Port = "6390" |
| 475 | ringShard2Port = "6391" |
| 476 | ) |
| 477 | |
| 478 | ring := NewRing(&RingOptions{ |
| 479 | Addrs: map[string]string{ |
| 480 | ringShard1Name: ":" + ringShard1Port, |
| 481 | }, |
| 482 | // Disable heartbeat |
| 483 | HeartbeatFrequency: 1 * time.Hour, |
| 484 | NewConsistentHash: func(shards []string) ConsistentHash { |
| 485 | switch len(shards) { |
| 486 | case 1: |
| 487 | return fixedHash(ringShard1Name) |
| 488 | case 2: |
| 489 | return fixedHash(ringShard2Name) |
| 490 | default: |
| 491 | t.Fatalf("Unexpected number of shards: %v", shards) |
| 492 | return nil |
| 493 | } |
| 494 | }, |
| 495 | }) |
| 496 | defer ring.Close() |
| 497 | |
| 498 | // Continuously update addresses by adding and removing one address |
| 499 | updatesDone := make(chan struct{}) |
| 500 | defer func() { close(updatesDone) }() |
| 501 | go func() { |
| 502 | for i := 0; ; i++ { |
| 503 | select { |
| 504 | case <-updatesDone: |
| 505 | return |
| 506 | default: |
| 507 | if i%2 == 0 { |
| 508 | ring.SetAddrs(map[string]string{ |
| 509 | ringShard1Name: ":" + ringShard1Port, |
| 510 | }) |
| 511 | } else { |
| 512 | ring.SetAddrs(map[string]string{ |
| 513 | ringShard1Name: ":" + ringShard1Port, |
| 514 | ringShard2Name: ":" + ringShard2Port, |
| 515 | }) |
| 516 | } |
| 517 | } |
| 518 | } |
| 519 | }() |
| 520 | |
| 521 | timer := time.NewTimer(1 * time.Second) |
| 522 | for running := true; running; { |
| 523 | select { |
| 524 | case <-timer.C: |
| 525 | running = false |
| 526 | default: |