Watch implements the Clientv3Facade interface
(ctx context.Context, key string, opts ...clientv3.OpOption)
| 85 | |
| 86 | // Watch implements the Clientv3Facade interface |
| 87 | func (m *mockKV) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { |
| 88 | watcher := make(chan clientv3.WatchResponse, channelBufferSize) |
| 89 | consumer := m.createEventConsumer(channelBufferSize) |
| 90 | |
| 91 | go func() { |
| 92 | defer func() { |
| 93 | // When this goroutine ends, remove and close the channel written to by the |
| 94 | // Put and Delete methods as well as closing the channel read by the caller |
| 95 | // of the Watch method |
| 96 | m.destroyEventConsumer(consumer) |
| 97 | |
| 98 | // non-blocking send |
| 99 | select { |
| 100 | case watcher <- clientv3.WatchResponse{Canceled: true}: |
| 101 | default: |
| 102 | } |
| 103 | |
| 104 | close(watcher) |
| 105 | }() |
| 106 | |
| 107 | for { |
| 108 | select { |
| 109 | case <-ctx.Done(): |
| 110 | // Context cancelled for this watcher, run cleanup logic and exit |
| 111 | return |
| 112 | case <-m.close: |
| 113 | // Close method called for all watchers, run cleanup logic and exit |
| 114 | return |
| 115 | case e := <-consumer: |
| 116 | op := clientv3.OpGet(key, opts...) |
| 117 | match := m.isMatch(op, *e.Kv) |
| 118 | |
| 119 | if match { |
| 120 | // non-blocking send |
| 121 | select { |
| 122 | case watcher <- clientv3.WatchResponse{Events: []*clientv3.Event{&e}}: |
| 123 | default: |
| 124 | } |
| 125 | } |
| 126 | } |
| 127 | } |
| 128 | }() |
| 129 | |
| 130 | return watcher |
| 131 | } |
| 132 | |
| 133 | // createEventConsumer creates and returns a new channel that is registered to receive |
| 134 | // events for Puts and Deletes. |