This example is not supposed to be run as is. It is just a test to see how pubsub behaves in relation to pool management. It was used to find regressions in pool management in maintnotifications mode. Please don't use it as a reference for how to use pubsub.
()
| 22 | // It was used to find regressions in pool management in maintnotifications mode. |
| 23 | // Please don't use it as a reference for how to use pubsub. |
| 24 | func main() { |
| 25 | startTime = time.Now() |
| 26 | wg := &sync.WaitGroup{} |
| 27 | rdb := redis.NewClient(&redis.Options{ |
| 28 | Addr: ":6379", |
| 29 | MaintNotificationsConfig: &maintnotifications.Config{ |
| 30 | Mode: maintnotifications.ModeEnabled, |
| 31 | EndpointType: maintnotifications.EndpointTypeExternalIP, |
| 32 | HandoffTimeout: 10 * time.Second, |
| 33 | RelaxedTimeout: 10 * time.Second, |
| 34 | PostHandoffRelaxedDuration: 10 * time.Second, |
| 35 | }, |
| 36 | }) |
| 37 | _ = rdb.FlushDB(ctx).Err() |
| 38 | maintnotificationsManager := rdb.GetMaintNotificationsManager() |
| 39 | if maintnotificationsManager == nil { |
| 40 | panic("maintnotifications manager is nil") |
| 41 | } |
| 42 | loggingHook := maintnotifications.NewLoggingHook(int(logging.LogLevelDebug)) |
| 43 | maintnotificationsManager.AddNotificationHook(loggingHook) |
| 44 | |
| 45 | go func() { |
| 46 | for { |
| 47 | time.Sleep(2 * time.Second) |
| 48 | fmt.Printf("pool stats: %+v\n", rdb.PoolStats()) |
| 49 | } |
| 50 | }() |
| 51 | err := rdb.Ping(ctx).Err() |
| 52 | if err != nil { |
| 53 | panic(err) |
| 54 | } |
| 55 | if err := rdb.Set(ctx, "publishers", "0", 0).Err(); err != nil { |
| 56 | panic(err) |
| 57 | } |
| 58 | if err := rdb.Set(ctx, "subscribers", "0", 0).Err(); err != nil { |
| 59 | panic(err) |
| 60 | } |
| 61 | if err := rdb.Set(ctx, "published", "0", 0).Err(); err != nil { |
| 62 | panic(err) |
| 63 | } |
| 64 | if err := rdb.Set(ctx, "received", "0", 0).Err(); err != nil { |
| 65 | panic(err) |
| 66 | } |
| 67 | fmt.Println("published", rdb.Get(ctx, "published").Val()) |
| 68 | fmt.Println("received", rdb.Get(ctx, "received").Val()) |
| 69 | subCtx, cancelSubCtx := context.WithCancel(ctx) |
| 70 | pubCtx, cancelPublishers := context.WithCancel(ctx) |
| 71 | for i := 0; i < 10; i++ { |
| 72 | wg.Add(1) |
| 73 | go subscribe(subCtx, rdb, "test", i, wg) |
| 74 | } |
| 75 | time.Sleep(time.Second) |
| 76 | cancelSubCtx() |
| 77 | time.Sleep(time.Second) |
| 78 | subCtx, cancelSubCtx = context.WithCancel(ctx) |
| 79 | for i := 0; i < 10; i++ { |
| 80 | if err := rdb.Incr(ctx, "publishers").Err(); err != nil { |
| 81 | fmt.Println("incr error:", err) |
nothing calls this directly
no test coverage detected