我是 Python 的新手,在使用 Watchdog 执行某些代码时遇到问题。该代码应该在修改或创建文件时将文件复制到各自的文件夹中。它将对一个文件起作用,但如果有更多文件匹配,它将退出。由于某种原因,我也无法使用 Ctrl-C 停止程序。完整代码如下:
import os
import os.path
import shutil
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from watchdog.events import PatternMatchingEventHandler
sourcepath='C:/Users/bhart/Downloads/'
sourcefiles = os.listdir(sourcepath)
destinationpath = 'C:/Users/bhart/Downloads/xls'
destinationpathcsv = 'C:/Users/bhart/Downloads/csv'
destinationpathtxt = 'C:/Users/bhart/Downloads/txt'
destinationpathpdf = 'C:/Users/bhart/Downloads/pdf'
path = sourcepath
event_handler = FileSystemEventHandler()
def on_created(event):
for file in sourcefiles:
if os.path.exists(file):
if file.endswith('.xls') or file.endswith('.xlsx'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpath,file))
if file.endswith('.csv'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathcsv,file))
print("CSV file moved.")
if file.endswith('.txt'):
print("TXT file moved")
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathtxt,file))
if file.endswith('.pdf'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathpdf,file))
def on_modified(event):
for file in sourcefiles:
if os.path.exists(file):
if file.endswith('.xls') or file.endswith('.xlsx'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpath,file))
if file.endswith('.csv'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathcsv,file))
if file.endswith('.txt'):
print("TXT file moved")
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathtxt,file))
if file.endswith('.pdf'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathpdf,file))
if __name__ == "__main__":
event_handler.on_modified = on_modified
observer = Observer()
observer.start()
observer.schedule(event_handler, path, recursive=True)
observer.join()
event_handler.on_created = on_created
observer = Observer()
observer.start()
observer.schedule(event_handler, path, recursive=True)
observer.join()
try:
print("test")
except KeyboardInterrupt:
exit()
回答1
我不知道我是否解决了所有问题,但是:
listdir()
给出没有目录的文件名,即使检查 os.path.exists()
也必须使用 os.path.join()
if os.path.exists( os.path.join(sourcepath, file) ):
listdir()
只给出一次文件名,你必须在 for
-loop 中使用它来获取新的文件名。
def on_created(event):
sourcefilenames = os.listdir(sourcepath)
for filename in sourcefilenames:
src = os.path.join(sourcepath, filename)
if os.path.exists(src):
# ... code ...
def on_modified(event):
sourcefilenames = os.listdir(sourcepath)
for filename in sourcefilenames:
src = os.path.join(sourcepath, filename)
if os.path.exists(src):
# ... code ...
.join()
阻塞代码并等到你关闭程序,所以它创建第一个 Observer
并在创建第二个 Observer
之前等待它的结束 - 但你可以用一个 Observer
做所有事情
看来您在 on_created
和 on_modified
中具有相同的代码,因此您可以在两种情况下使用一个函数
def move_it(event):
sourcefilenames = os.listdir(sourcepath)
for filename in sourcefilenames:
src = os.path.join(sourcepath, filename)
if os.path.exists(src):
# ... code ...
if __name__ == "__main__":
event_handler = FileSystemEventHandler()
event_handler.on_modified = move_it
event_handler.on_created = move_it
observer = Observer()
observer.start()
observer.schedule(event_handler, sourcepath, recursive=True)
observer.join()
如果你想捕捉 Ctrl+C
那么你应该把所有的代码放在 try/except
中(或者至少把 join()
放在 try/except
中)。
我不知道您对 Ctrl+C
有什么问题,但它在 Linux 上对我有用。
if __name__ == "__main__":
try:
event_handler = FileSystemEventHandler()
event_handler.on_modified = move_it
event_handler.on_created = move_it
observer = Observer()
observer.start()
observer.schedule(event_handler, sourcepath, recursive=True)
observer.join()
except KeyboardInterrupt:
print('Stopped by Ctrl+C')
一个建议:
如果您使用字典,代码可以更简单、更通用
{
".xls": "C:/.../xls",
".xlsx": "C:/.../xls",
# ...
}
这样您就可以使用 for
-loop 来检查所有扩展。而且您始终可以在不更改函数代码的情况下向字典添加新扩展。
import os
import shutil
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
sourcepath = 'C:/Users/bhart/Downloads/'
destinationpath = {
'.xls' : 'C:/Users/bhart/Downloads/xls',
'.xlsx': 'C:/Users/bhart/Downloads/xls',
'.csv' : 'C:/Users/bhart/Downloads/csv',
'.txt' : 'C:/Users/bhart/Downloads/txt',
'.pdf' : 'C:/Users/bhart/Downloads/pdf',
}
def move_it(event):
sourcefilenames = os.listdir(sourcepath)
for filename in sourcefilenames:
src = os.path.join(sourcepath, filename)
if os.path.exists(src):
for ext, dst in destinationpath.items():
if filename.lower().endswith(ext):
print('move:', filename, '->', dst)
shutil.move(src, os.path.join(dst, filename))
if __name__ == "__main__":
try:
event_handler = FileSystemEventHandler()
event_handler.on_modified = move_it
event_handler.on_created = move_it
observer = Observer()
observer.start()
observer.schedule(event_handler, sourcepath, recursive=True)
observer.join()
except KeyboardInterrupt:
print('Stopped by Ctrl+C')
编辑:
event
给出 event.src_path
、event.event_type
等。您可以使用它而不是 listdir()
来获取文件路径。
import os
import shutil
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
sourcepath = 'C:/Users/bhart/Downloads/'
destinationpath = {
'.xls' : 'C:/Users/bhart/Downloads/xls',
'.xlsx': 'C:/Users/bhart/Downloads/xls',
'.csv' : 'C:/Users/bhart/Downloads/csv',
'.txt' : 'C:/Users/bhart/Downloads/txt',
'.pdf' : 'C:/Users/bhart/Downloads/pdf',
}
def move_it(event):
#print(dir(event))
#print('event:', event)
#print('event_type:', event.event_type)
#print('is_directory:', event.is_directory)
#print('src_path:', event.src_path)
#print('key:', event.key)
#print('----')
if not event.is_directory:
parts = os.path.split(event.src_path)
#print('parts:', parts)
filename = parts[-1]
for ext, dst in destinationpath.items():
if filename.lower().endswith(ext):
shutil.move(event.src_path, os.path.join(dst, filename))
print('move:', filename, '->', dst)
if __name__ == "__main__":
try:
event_handler = FileSystemEventHandler()
event_handler.on_modified = move_it
event_handler.on_created = move_it
#event_handler.on_moved = move_it # ie. rename (but this need to check `dest_path`)
observer = Observer()
observer.start()
observer.schedule(event_handler, sourcepath, recursive=True)
observer.join()
except KeyboardInterrupt:
print('Stopped by Ctrl+C')