MCPcopy
hub / github.com/celery/celery / BufferMap

Class BufferMap

celery/utils/collections.py:765–863  ·  view source on GitHub ↗

Map of buffers.

Source from the content-addressed store, hash-verified

763
764
765class BufferMap(OrderedDict, Evictable):
766 """Map of buffers."""
767
768 Buffer = Messagebuffer
769 Empty = Empty
770
771 maxsize = None
772 total = 0
773 bufmaxsize = None
774
775 def __init__(self, maxsize, iterable=None, bufmaxsize=1000):
776 # type: (int, Iterable, int) -> None
777 super().__init__()
778 self.maxsize = maxsize
779 self.bufmaxsize = 1000
780 if iterable:
781 self.update(iterable)
782 self.total = sum(len(buf) for buf in self.items())
783
784 def put(self, key, item):
785 # type: (Any, Any) -> None
786 self._get_or_create_buffer(key).put(item)
787 self.total += 1
788 self.move_to_end(key) # least recently used.
789 self.maxsize and self._evict()
790
791 def extend(self, key, it):
792 # type: (Any, Iterable) -> None
793 self._get_or_create_buffer(key).extend(it)
794 self.total += len(it)
795 self.maxsize and self._evict()
796
797 def take(self, key, *default):
798 # type: (Any, *Any) -> Any
799 item, throw = None, False
800 try:
801 buf = self[key]
802 except KeyError:
803 throw = True
804 else:
805 try:
806 item = buf.take()
807 self.total -= 1
808 except self.Empty:
809 throw = True
810 else:
811 self.move_to_end(key) # mark as LRU
812
813 if throw:
814 if default:
815 return default[0]
816 raise self.Empty()
817 return item
818
819 def _get_or_create_buffer(self, key):
820 # type: (Any) -> Messagebuffer
821 try:
822 return self[key]

Callers 7

test_append_limitedMethod · 0.90
test_append_unlimitedMethod · 0.90
test_extend_limitedMethod · 0.90
test_extend_unlimitedMethod · 0.90
__init__Method · 0.90

Calls

no outgoing calls

Tested by 6

test_append_limitedMethod · 0.72
test_append_unlimitedMethod · 0.72
test_extend_limitedMethod · 0.72
test_extend_unlimitedMethod · 0.72