MCPcopy
hub / github.com/celery/celery / _stamp_headers

Method _stamp_headers

celery/canvas.py:553–611  ·  view source on GitHub ↗

Collect all stamps from visitor, headers and self, and return an idempotent dictionary of stamps. .. versionadded:: 5.3 Arguments: visitor_headers (Dict): Stamps from a visitor method. append_stamps (bool): If True, duplicated stamps

(self, visitor_headers=None, append_stamps=False, self_headers=True, **headers)

Source from the content-addressed store, hash-verified

551 self.immutable = immutable
552
553 def _stamp_headers(self, visitor_headers=None, append_stamps=False, self_headers=True, **headers):
554 """Collect all stamps from visitor, headers and self,
555 and return an idempotent dictionary of stamps.
556
557 .. versionadded:: 5.3
558
559 Arguments:
560 visitor_headers (Dict): Stamps from a visitor method.
561 append_stamps (bool):
562 If True, duplicated stamps will be appended to a list.
563 If False, duplicated stamps will be replaced by the last stamp.
564 self_headers (bool):
565 If True, stamps from self.options will be added.
566 If False, stamps from self.options will be ignored.
567 headers (Dict): Stamps that should be added to headers.
568
569 Returns:
570 Dict: Merged stamps.
571 """
572 # Use append_stamps=False to prioritize visitor_headers over headers in case of duplicated stamps.
573 # This will lose duplicated headers from the headers argument, but that is the best effort solution
574 # to avoid implicitly casting the duplicated stamp into a list of both stamps from headers and
575 # visitor_headers of the same key.
576 # Example:
577 # headers = {"foo": "bar1"}
578 # visitor_headers = {"foo": "bar2"}
579 # _merge_dictionaries(headers, visitor_headers, aggregate_duplicates=True)
580 # headers["foo"] == ["bar1", "bar2"] -> The stamp is now a list
581 # _merge_dictionaries(headers, visitor_headers, aggregate_duplicates=False)
582 # headers["foo"] == "bar2" -> "bar1" is lost, but the stamp is according to the visitor
583
584 headers = headers.copy()
585
586 if "stamped_headers" not in headers:
587 headers["stamped_headers"] = list(headers.keys())
588
589 # Merge headers with visitor headers
590 if visitor_headers is not None:
591 visitor_headers = visitor_headers or {}
592 if "stamped_headers" not in visitor_headers:
593 visitor_headers["stamped_headers"] = list(visitor_headers.keys())
594
595 # Sync from visitor
596 _merge_dictionaries(headers, visitor_headers, aggregate_duplicates=append_stamps)
597 headers["stamped_headers"] = list(set(headers["stamped_headers"]))
598
599 # Merge headers with self.options
600 if self_headers:
601 stamped_headers = set(headers.get("stamped_headers", []))
602 stamped_headers.update(self.options.get("stamped_headers", []))
603 headers["stamped_headers"] = list(stamped_headers)
604 # Only merge stamps that are in stamped_headers from self.options
605 redacted_options = {k: v for k, v in self.options.items() if k in headers["stamped_headers"]}
606
607 # Sync from self.options
608 _merge_dictionaries(headers, redacted_options, aggregate_duplicates=append_stamps)
609 headers["stamped_headers"] = list(set(headers["stamped_headers"]))
610

Callers 5

stampMethod · 0.95
stamp_linksMethod · 0.95
stampMethod · 0.80
stampMethod · 0.80
stampMethod · 0.80

Calls 6

_merge_dictionariesFunction · 0.85
copyMethod · 0.80
keysMethod · 0.80
getMethod · 0.45
updateMethod · 0.45
itemsMethod · 0.45

Tested by

no test coverage detected