MCPcopy
hub / github.com/redis/redis-py / waitForIndex

Method waitForIndex

tests/test_search.py:82–119  ·  view source on GitHub ↗
(env, idx, timeout=None)

Source from the content-addressed store, hash-verified

80class SearchTestsBase:
81 @staticmethod
82 def waitForIndex(env, idx, timeout=None):
83 delay = 0.1
84 while True:
85 try:
86 res = env.execute_command("FT.INFO", idx)
87 # ``execute_command`` bypasses the search module's
88 # callbacks, so the response is the raw wire shape.
89 # With ``decode_responses=False`` the structural keys
90 # arrive as bytes; accept both forms.
91 try:
92 i = res.index("indexing")
93 except ValueError:
94 i = res.index(b"indexing")
95 if int(res[i + 1]) == 0:
96 break
97 except ValueError:
98 break
99 except AttributeError:
100 # RESP3 dict response. Keys may be ``str`` or ``bytes``
101 # depending on ``decode_responses``.
102 indexing = res.get("indexing")
103 if indexing is None:
104 indexing = res.get(b"indexing")
105 try:
106 if int(indexing) == 0:
107 break
108 except (TypeError, ValueError):
109 break
110 except ResponseError:
111 # index doesn't exist yet
112 # continue to sleep and try again
113 pass
114
115 time.sleep(delay)
116 if timeout is not None:
117 timeout -= delay
118 if timeout <= 0:
119 break
120
121 @staticmethod
122 def getClient(client):

Callers 15

test_clientMethod · 0.45
test_stopwordsMethod · 0.45
test_filtersMethod · 0.45
test_drop_indexMethod · 0.45
test_no_indexMethod · 0.45
test_summarizeMethod · 0.45
test_spell_checkMethod · 0.45
test_casesensitiveMethod · 0.45
test_withsuffixtrieMethod · 0.45
test_geoshapeMethod · 0.45

Calls 3

execute_commandMethod · 0.45
getMethod · 0.45
sleepMethod · 0.45

Tested by

no test coverage detected