MCPcopy
hub / github.com/benoitc/gunicorn / _waitfor

Function _waitfor

gunicorn/util.py:175–206  ·  view source on GitHub ↗
(func, pathname, waitall=False)

Source from the content-addressed store, hash-verified

173
174if sys.platform.startswith("win"):
175 def _waitfor(func, pathname, waitall=False):
176 # Perform the operation
177 func(pathname)
178 # Now setup the wait loop
179 if waitall:
180 dirname = pathname
181 else:
182 dirname, name = os.path.split(pathname)
183 dirname = dirname or '.'
184 # Check for `pathname` to be removed from the filesystem.
185 # The exponential backoff of the timeout amounts to a total
186 # of ~1 second after which the deletion is probably an error
187 # anyway.
188 # Testing on a i7@4.3GHz shows that usually only 1 iteration is
189 # required when contention occurs.
190 timeout = 0.001
191 while timeout < 1.0:
192 # Note we are only testing for the existence of the file(s) in
193 # the contents of the directory regardless of any security or
194 # access rights. If we have made it this far, we have sufficient
195 # permissions to do that much using Python's equivalent of the
196 # Windows API FindFirstFile.
197 # Other Windows APIs can fail or give incorrect results when
198 # dealing with files that are pending deletion.
199 L = os.listdir(dirname)
200 if not L if waitall else name in L:
201 return
202 # Increase the timeout and try again
203 time.sleep(timeout)
204 timeout *= 2
205 warnings.warn('tests may fail, delete still pending for ' + pathname,
206 RuntimeWarning, stacklevel=4)
207
208 def _unlink(filename):
209 _waitfor(os.unlink, filename)

Callers 1

_unlinkFunction · 0.85

Calls 1

funcFunction · 0.85

Tested by

no test coverage detected