MCPcopy
hub / github.com/kubernetes/client-go / main

Function main

examples/workqueue/main.go:144–217  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

142}
143
144func main() {
145 var kubeconfig string
146 var master string
147
148 flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
149 flag.StringVar(&master, "master", "", "master url")
150 flag.Parse()
151
152 // creates the connection
153 config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
154 if err != nil {
155 klog.Fatal(err)
156 }
157
158 // creates the clientset
159 clientset, err := kubernetes.NewForConfig(config)
160 if err != nil {
161 klog.Fatal(err)
162 }
163
164 // create the pod watcher
165 podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
166
167 // create the workqueue
168 queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
169
170 // Bind the workqueue to a cache with the help of an informer. This way we make sure that
171 // whenever the cache is updated, the pod key is added to the workqueue.
172 // Note that when we finally process the item from the workqueue, we might see a newer version
173 // of the Pod than the version which was responsible for triggering the update.
174 indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
175 AddFunc: func(obj interface{}) {
176 key, err := cache.MetaNamespaceKeyFunc(obj)
177 if err == nil {
178 queue.Add(key)
179 }
180 },
181 UpdateFunc: func(old interface{}, new interface{}) {
182 key, err := cache.MetaNamespaceKeyFunc(new)
183 if err == nil {
184 queue.Add(key)
185 }
186 },
187 DeleteFunc: func(obj interface{}) {
188 // IndexerInformer uses a delta queue, therefore for deletes we have to use this
189 // key function.
190 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
191 if err == nil {
192 queue.Add(key)
193 }
194 },
195 }, cache.Indexers{})
196
197 controller := NewController(queue, indexer, informer)
198
199 // We can now warm up the cache for initial synchronization.
200 // Let's suppose that we knew about a pod "mypod" on our last run, therefore add it to the cache.
201 // If this pod is not there anymore, the controller will be notified about the removal after the

Callers

nothing calls this directly

Calls 14

RunMethod · 0.95
BuildConfigFromFlagsFunction · 0.92
NewForConfigFunction · 0.92
NewListWatchFromClientFunction · 0.92
NewRateLimitingQueueFunction · 0.92
NewIndexerInformerFunction · 0.92
MetaNamespaceKeyFuncFunction · 0.92
NewControllerFunction · 0.85
RESTClientMethod · 0.65
CoreV1Method · 0.65

Tested by

no test coverage detected