Skip to content

Commit

Permalink
Rewrite tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
serhiy-storchaka committed Dec 4, 2023
1 parent 05c626f commit dac6e39
Showing 1 changed file with 45 additions and 91 deletions.
136 changes: 45 additions & 91 deletions Lib/test/test_shutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,108 +638,62 @@ def test_rmtree_on_junction(self):
finally:
shutil.rmtree(TESTFN, ignore_errors=True)

def test_rmtree_deleted_file_race_condition(self):
# bpo-37260
#
# Test that a file deleted after it is enumerated
# by scandir() but before unlink() is called
# doesn't generate any errors.
def _onexc(fn, path, exc):
if path == paths[0]:
os.chmod(paths[0], mode)
os.rmdir(paths[0])
os.unlink(paths[1])
pass
else:
raise

paths = [os.path.join(TESTFN, 'foo'),
os.path.join(TESTFN, 'bar')]
os.mkdir(TESTFN)
os.mkdir(paths[0])
write_file((TESTFN, 'bar'), 'bar')
mode = os.stat(paths[0]).st_mode
os.chmod(paths[0], 0)

try:
shutil.rmtree(TESTFN, onexc=_onexc)
except:
# test failed, so cleanup artifacts
try:
os.chmod(paths[0], mode)
except:
pass

shutil.rmtree(TESTFN)

def test_rmtree_deleted_dir_race_condition(self):
# bpo-37260
#
# Test that a directory deleted after it is enumerated
# by scandir() but before rmdr() is called
# doesn't generate any errors.
def _onexc(fn, path, exc):
if path == paths[0]:
os.chmod(paths[0], mode)
os.rmdir(paths[0])
os.rmdir(paths[1])
pass
else:
raise

paths = [os.path.join(TESTFN, 'foo'),
os.path.join(TESTFN, 'bar')]
os.mkdir(TESTFN)
os.mkdir(paths[0])
os.mkdir(paths[1])
mode = os.stat(paths[0]).st_mode
os.chmod(paths[0], 0)

try:
shutil.rmtree(TESTFN, onexc=_onexc)
except:
# test failed, so cleanup artifacts
try:
os.chmod(paths[0], mode)
except:
pass

shutil.rmtree(TESTFN)

@os_helper.skip_unless_symlink
def test_rmtree_deleted_symlink_race_condition(self):
@unittest.skipIf(sys.platform[:6] == 'cygwin',
"This test can't be run on Cygwin (issue #1071513).")
@os_helper.skip_if_dac_override
@os_helper.skip_unless_working_chmod
def test_rmtree_deleted_race_condition(self):
# bpo-37260
#
# Test that a symlink deleted after it is enumerated
# by scandir() but before unlink() is called
# doesn't generate any errors.
# Test that a file or a directory deleted after it is enumerated
# by scandir() but before unlink() or rmdr() is called doesn't
# generate any errors.
def _onexc(fn, path, exc):
if path == paths[0]:
os.chmod(paths[0], mode)
os.rmdir(paths[0])
os.unlink(paths[1])
pass
else:
assert fn in (os.rmdir, os.unlink)
if not isinstance(exc, PermissionError):
raise
# Make the parent and the children writeable.
for p, mode in zip(paths, old_modes):
os.chmod(p, mode)
# Remove other dirs except one.
keep = next(p for p in dirs if p != path)
for p in dirs:
if p != keep:
os.rmdir(p)
# Remove other files except one.
keep = next(p for p in files if p != path)
for p in files:
if p != keep:
os.unlink(p)

paths = [os.path.join(TESTFN, 'foo'),
os.path.join(TESTFN, 'bar')]
os.mkdir(TESTFN)
os.mkdir(paths[0])
os.symlink('foo', paths[1])
mode = os.stat(paths[0]).st_mode
os.chmod(paths[0], 0)
paths = [TESTFN] + [os.path.join(TESTFN, f'child{i}')
for i in range(6)]
dirs = paths[1::2]
files = paths[2::2]
for path in dirs:
os.mkdir(path)
for path in files:
write_file(path, '')

old_modes = [os.stat(path).st_mode for path in paths]

# Make the parent and the children non-writeable.
new_mode = stat.S_IREAD|stat.S_IEXEC
for path in reversed(paths):
os.chmod(path, new_mode)

try:
shutil.rmtree(TESTFN, onexc=_onexc)
except:
# test failed, so cleanup artifacts
try:
os.chmod(paths[0], mode)
except:
pass

# Test failed, so cleanup artifacts.
for path, mode in zip(paths, old_modes):
try:
os.chmod(path, mode)
except OSError:
pass
shutil.rmtree(TESTFN)
raise


class TestCopyTree(BaseTest, unittest.TestCase):
Expand Down

0 comments on commit dac6e39

Please sign in to comment.