diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py index 4cc18039ad678c..b3d5a26e48ca80 100644 --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -638,108 +638,62 @@ def test_rmtree_on_junction(self): finally: shutil.rmtree(TESTFN, ignore_errors=True) - def test_rmtree_deleted_file_race_condition(self): - # bpo-37260 - # - # Test that a file deleted after it is enumerated - # by scandir() but before unlink() is called - # doesn't generate any errors. - def _onexc(fn, path, exc): - if path == paths[0]: - os.chmod(paths[0], mode) - os.rmdir(paths[0]) - os.unlink(paths[1]) - pass - else: - raise - - paths = [os.path.join(TESTFN, 'foo'), - os.path.join(TESTFN, 'bar')] - os.mkdir(TESTFN) - os.mkdir(paths[0]) - write_file((TESTFN, 'bar'), 'bar') - mode = os.stat(paths[0]).st_mode - os.chmod(paths[0], 0) - - try: - shutil.rmtree(TESTFN, onexc=_onexc) - except: - # test failed, so cleanup artifacts - try: - os.chmod(paths[0], mode) - except: - pass - - shutil.rmtree(TESTFN) - - def test_rmtree_deleted_dir_race_condition(self): - # bpo-37260 - # - # Test that a directory deleted after it is enumerated - # by scandir() but before rmdr() is called - # doesn't generate any errors. - def _onexc(fn, path, exc): - if path == paths[0]: - os.chmod(paths[0], mode) - os.rmdir(paths[0]) - os.rmdir(paths[1]) - pass - else: - raise - - paths = [os.path.join(TESTFN, 'foo'), - os.path.join(TESTFN, 'bar')] - os.mkdir(TESTFN) - os.mkdir(paths[0]) - os.mkdir(paths[1]) - mode = os.stat(paths[0]).st_mode - os.chmod(paths[0], 0) - - try: - shutil.rmtree(TESTFN, onexc=_onexc) - except: - # test failed, so cleanup artifacts - try: - os.chmod(paths[0], mode) - except: - pass - - shutil.rmtree(TESTFN) - - @os_helper.skip_unless_symlink - def test_rmtree_deleted_symlink_race_condition(self): + @unittest.skipIf(sys.platform[:6] == 'cygwin', + "This test can't be run on Cygwin (issue #1071513).") + @os_helper.skip_if_dac_override + @os_helper.skip_unless_working_chmod + def test_rmtree_deleted_race_condition(self): # bpo-37260 # - # Test that a symlink deleted after it is enumerated - # by scandir() but before unlink() is called - # doesn't generate any errors. + # Test that a file or a directory deleted after it is enumerated + # by scandir() but before unlink() or rmdr() is called doesn't + # generate any errors. def _onexc(fn, path, exc): - if path == paths[0]: - os.chmod(paths[0], mode) - os.rmdir(paths[0]) - os.unlink(paths[1]) - pass - else: + assert fn in (os.rmdir, os.unlink) + if not isinstance(exc, PermissionError): raise + # Make the parent and the children writeable. + for p, mode in zip(paths, old_modes): + os.chmod(p, mode) + # Remove other dirs except one. + keep = next(p for p in dirs if p != path) + for p in dirs: + if p != keep: + os.rmdir(p) + # Remove other files except one. + keep = next(p for p in files if p != path) + for p in files: + if p != keep: + os.unlink(p) - paths = [os.path.join(TESTFN, 'foo'), - os.path.join(TESTFN, 'bar')] os.mkdir(TESTFN) - os.mkdir(paths[0]) - os.symlink('foo', paths[1]) - mode = os.stat(paths[0]).st_mode - os.chmod(paths[0], 0) + paths = [TESTFN] + [os.path.join(TESTFN, f'child{i}') + for i in range(6)] + dirs = paths[1::2] + files = paths[2::2] + for path in dirs: + os.mkdir(path) + for path in files: + write_file(path, '') + + old_modes = [os.stat(path).st_mode for path in paths] + + # Make the parent and the children non-writeable. + new_mode = stat.S_IREAD|stat.S_IEXEC + for path in reversed(paths): + os.chmod(path, new_mode) try: shutil.rmtree(TESTFN, onexc=_onexc) except: - # test failed, so cleanup artifacts - try: - os.chmod(paths[0], mode) - except: - pass - + # Test failed, so cleanup artifacts. + for path, mode in zip(paths, old_modes): + try: + os.chmod(path, mode) + except OSError: + pass shutil.rmtree(TESTFN) + raise class TestCopyTree(BaseTest, unittest.TestCase):