MCPcopy
hub / github.com/celery/celery / test_mget

Method test_mget

t/unit/backends/test_arangodb.py:108–138  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

106 )
107
108 def test_mget(self):
109 self.backend._connection = MagicMock(spec=["__getitem__"])
110
111 result = list(self.backend.mget(None))
112 expected_result = []
113 assert result == expected_result
114 self.backend.db.AQLQuery.assert_not_called()
115
116 Query = MagicMock(spec=pyArango.query.Query)
117 query = Query()
118 query.nextBatch = MagicMock(side_effect=StopIteration())
119 self.backend.db.AQLQuery = Mock(return_value=query)
120
121 keys = [sentinel.task_id_0, sentinel.task_id_1]
122 result = list(self.backend.mget(keys))
123 expected_result = []
124 assert result == expected_result
125 self.backend.db.AQLQuery.assert_called_once_with(
126 "FOR k IN @keys RETURN DOCUMENT(@@collection, k).task",
127 rawResults=True,
128 bindVars={
129 "@collection": self.backend.collection,
130 "keys": keys,
131 },
132 )
133
134 values = [sentinel.value_0, sentinel.value_1]
135 query.__iter__.return_value = iter([sentinel.value_0, sentinel.value_1])
136 result = list(self.backend.mget(keys))
137 expected_result = values
138 assert result == expected_result
139
140 def test_delete(self):
141 self.backend._connection = MagicMock(spec=["__getitem__"])

Callers

nothing calls this directly

Calls 1

mgetMethod · 0.45

Tested by

no test coverage detected