Many systems allow to monitor changes on a filesystem, but I could not find any that is both portable and compatible with gevent. For instance, watchdog is portable on Windows, Linux and MacOS, but it uses threads and does not work together with gevent. On the other hand, gevent_inotifyx, as its name says, works well with gevent, but it relies on inotify on so is for Linux only.
Here is a solution that is only based on gevent and should be as portable as gevent itself.
EDIT (July 3th 2015): there is few hope that is will ever work unless libev is updated on Windows, see the issue on GitHub for details
First, we import the required packages and work around a bug in Windows:
import os, os.path, collections
import gevent, gevent.pool, gevent.queue, gevent.hub
import platform
if platform.system() == "Windows" :
# see https://github.com/gevent/gevent/issues/459
import socket
Then we define a structure for events to provide an event name, a path
on which the event occurred and a Boolean flag to know if the path is
a directory or a file (this is needed for a delete
event since there
is no way to know what has been deleted):
event = collections.namedtuple("event", ["name", "path", "isdir"])
Class DirWatcher
handles all the changes in a tree starting at
root
, it feeds a queue q
with the observed events and provides a
method get
to retrieve the events:
class DirWatcher (object) :
def __init__ (self, root) :
self.root = os.path.abspath(root)
self.workers = gevent.pool.Group()
self.watched = set()
self.q = gevent.queue.Queue()
self.get = self.q.get
self.add(self.root, "crawl")
Using os.path.abspath
is important because
libev
clearly specifies that:
The path should be absolute: If it is relative and your working directory changes, then the behaviour is undefined.
The constructor also defined a group of workers greenlets, each of
which will be in charge of one file or directory whose path is
recorded in attribute watched
(this will be useful latter on).
Finally, the constructor calls method add
to add the root directory
to its watched objects, which works as follows:
def add (self, path, evtname="create") :
if os.path.isdir(path) :
for name in os.listdir(path) :
self.add(os.path.join(path, name), evtname)
self.workers.spawn(self.watch, path)
self.q.put(event(evtname, path, os.path.isdir(path)))
If the added object is a directory, it’s children are added
recursively. Then, a worker is spawned to watch the object and an
event is generated to report the new object. Parameter evtname
is
the name of the event to trigger when an object is added to the
DirWatcher
. So, the add
initiated from __init__
and the
subsequent recursive crawling will generate events crawl
. Latter,
objects creation will generate events create
.
Finally, the worker is as follows:
def watch (self, path) :
hub = gevent.get_hub()
watcher = hub.loop.stat(path, 1)
self.watched.add(path)
isdir = os.path.isdir(path)
if isdir :
old = set(os.listdir(path))
while path in self.watched :
try :
with gevent.Timeout(2) :
hub.wait(watcher)
except gevent.hub.LoopExit :
break
except gevent.Timeout :
continue
if os.path.isdir(path) :
new = set(os.listdir(path))
for name in new - old :
self.add(os.path.join(path, name))
old = new
elif os.path.exists(path) :
self.q.put(event("update", path, isdir))
else :
break
if isdir :
for name in old :
self.watched.discard(os.path.join(path, name))
self.watched.discard(path)
self.q.put(event("delete", path, isdir))
The first two lines invoke the appropriate gevent machinery to create
a watcher
that will allow to be notified when stat
changes on
path
. Parameter 1
in hub.loop.stat(path, 1)
specifies a refresh
rate for the platforms where updates are monitored using polling
(Windows for example). Then, we record in isdir
if path
corresponds to a directory and if so, we record in old
the current
content of the directory.
Then the main loop starts. Basically, while the object exists, the
loop waits for a change notification from the watcher
. But, there
exists changes that are not notified, typically if you delete
directory foo
that contains a file foo/bar
, no notification is
sent about foo/bar
being deleted. We handle this from the worker
that watches foo
, after the while loop, by removing children objects
from self.watched
, which allows the corresponding workers to exit
their while loop. But, if theses workers are blocked on
hub.wait(watcher)
, it does not help, this is why we enclose this
instruction inside a timeout so that when no notification comes within
2 seconds, we execute the continue
and have chance to exit the while
loop. (Note that the timeout is set to 2 to ensure that the watcher
whose rate is 1 has a chance to detect a change.) Apart from the
timeout, a possible exception is LoopExit
that occurs when the root
directory itself is deleted, so we catch it to exit the while loop.
When the watcher unblocks hub.wait
, we proceed by checking what
happened and there are three cases. (1) The object is a
directory, we get its new content and we add each new object that
appeared in the directory. Note that it may be useful to generate here
an update
event for the directory itself (removed objects are
handler in case 3 by their own worker). (2) If the object exists
and is not a directory, it is a file and we just generate an update
event for it. (3) Otherwise, the object has been deleted, so we
exit the while loop.
After the while loop, we have seen already that, for a directory, we
must remove its children. Then, we remove the object itself from
self.watched
and generate the event for this deletion.