Ticket #514: osx_dir_watch.patch

File osx_dir_watch.patch, 14.2 KB (added by alan, 12 years ago)

patch translated from https://github.com/zhiqiangxu/fam/tree/master/sysdep/osx

  • source/lib/sysdep/os/osx/dir_watch.cpp

     
    1919 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
    2020 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
    2121 */
     22#include "precompiled.h"
     23#include "ps/Filesystem.h"
     24#include <CoreFoundation/CoreFoundation.h>
     25#include <CoreServices/CoreServices.h>
     26#include <sys/ioctl.h>
     27#include <libgen.h>
    2228
    23 #include "precompiled.h"
    2429#include "lib/sysdep/dir_watch.h"
     30#include "lib/file/vfs/vfs_util.h"
    2531
    2632// stub implementations
     33static pthread_t g_event_loop_thread;
     34static pthread_mutex_t g_mutex_notifications = PTHREAD_MUTEX_INITIALIZER;
     35static pthread_mutex_t g_mutex_timestamps = PTHREAD_MUTEX_INITIALIZER;
     36static std::vector<DirWatchNotification> g_notifications;
     37static CFRunLoopRef loop = NULL;
     38static int initialized = 0;
     39static CFMutableArrayRef     g_paths;
     40static volatile bool done = false;
     41static int pipes[2];
    2742
    28 Status dir_watch_Add(const OsPath& UNUSED(path), PDirWatch& UNUSED(dirWatch))
     43struct dentry
    2944{
     45    struct timespec ts;
     46    off_t size;
     47    bool is_dir;
     48};
     49typedef std::map< OsPath, std::map<OsPath, struct dentry> > t_timestamp_map;
     50typedef std::map<OsPath, struct dentry> t_timestamp_inner_map;
     51typedef std::map< OsPath, std::map<OsPath, struct dentry> >::iterator t_timestamp_map_iterator;
     52typedef std::map<OsPath, struct dentry>::iterator t_timestamp_inner_map_iterator;
     53static t_timestamp_map g_timestamps;
     54static wchar_t        path_buff[PATH_MAX];
     55
     56static Status record_timestamps(const VfsPath& pathname, const FileInfo& fileInfo, uintptr_t cbData);
     57static Status remove_timestamps(const VfsPath& pathname, const FileInfo& UNUSED(fileInfo), const uintptr_t cbData);
     58static void fsevents_callback(FSEventStreamRef streamRef, void *clientCallBackInfo,
     59                  int numEvents,
     60                  const char *const eventPaths[],
     61                  const FSEventStreamEventFlags *eventFlags,
     62                  const uint64_t *eventIDs);
     63Status dir_watch_Delete(const OsPath&);
     64
     65struct DirWatch
     66{
     67    DirWatch()
     68    {
     69    }
     70    ~DirWatch()
     71    {
     72        dir_watch_Delete(path);
     73    }
     74    OsPath path;
     75};
     76
     77static CFFileDescriptorRef fdref = NULL;
     78static CFRunLoopSourceRef fd_rl_src = NULL;
     79
     80static void pipe_callback(CFFileDescriptorRef UNUSED(kq_cffd), CFOptionFlags UNUSED(callBackTypes), void* UNUSED(info))
     81{
     82    char c;
     83    while (read(pipes[0], &c, 1) > 0) ;
     84    CFRunLoopStop(loop);
     85    CFFileDescriptorEnableCallBacks(fdref, kCFFileDescriptorReadCallBack);
     86}
     87
     88static Status setup_run_loop_pipe_read_handler()
     89{
     90    fdref = CFFileDescriptorCreate(NULL, pipes[0], 1, pipe_callback, NULL);
     91    if (fdref == NULL)
     92    {
     93        return ERR::FAIL;
     94    }
     95
     96    fd_rl_src = CFFileDescriptorCreateRunLoopSource(NULL, fdref, (CFIndex)0);
     97    if (fd_rl_src == NULL)
     98    {
     99        CFFileDescriptorInvalidate(fdref);
     100        CFRelease(fdref);
     101        fdref = NULL;
     102        return ERR::FAIL;
     103    }
     104
     105    CFRunLoopAddSource(loop, fd_rl_src, kCFRunLoopDefaultMode);
     106    CFFileDescriptorEnableCallBacks(fdref, kCFFileDescriptorReadCallBack);
     107
     108    return INFO::OK;
     109}
     110
     111static void cleanup_run_loop_pipe_read_handler()
     112{
     113    CFRunLoopRemoveSource(loop, fd_rl_src, kCFRunLoopDefaultMode);
     114    CFFileDescriptorInvalidate(fdref);
     115    CFRelease(fd_rl_src);
     116    CFRelease(fdref);
     117
     118    fd_rl_src = NULL;
     119    fdref   = NULL;
     120}
     121
     122static void fam_deinit()
     123{
     124    done = true;
     125    cleanup_run_loop_pipe_read_handler();
     126    CFRunLoopStop(loop);
     127
     128    // Wait for the thread to finish
     129    pthread_join(g_event_loop_thread, NULL);
     130}
     131
     132static void* event_loop(void*)
     133{
     134    loop = CFRunLoopGetCurrent();
     135    FSEventStreamRef      stream_ref = NULL;
     136    FSEventStreamEventId sinceWhen = kFSEventStreamEventIdSinceNow;
     137    setup_run_loop_pipe_read_handler();
     138
     139    do {
     140        stream_ref = FSEventStreamCreate(kCFAllocatorDefault,
     141                                    (FSEventStreamCallback)&fsevents_callback,
     142                                    NULL,
     143                                    g_paths,
     144                                    sinceWhen,
     145                                    0.3,
     146                                    kFSEventStreamCreateFlagNone);
     147        FSEventStreamScheduleWithRunLoop(stream_ref, loop, kCFRunLoopDefaultMode);
     148        if (!FSEventStreamStart(stream_ref))
     149        {
     150            debug_warn(L"event_loop FSEventStreamStart failed!");
     151            return NULL;
     152        }
     153        CFRunLoopRun();
     154        FSEventStreamFlushSync(stream_ref);
     155        FSEventStreamStop(stream_ref);
     156        FSEventStreamUnscheduleFromRunLoop(stream_ref, loop, kCFRunLoopDefaultMode);
     157        FSEventStreamInvalidate(stream_ref);
     158        FSEventStreamRelease(stream_ref);
     159        sinceWhen = FSEventsGetCurrentEventId();
     160    }
     161    while (!done);
     162    return NULL;
     163}
     164
     165static inline void add_to_g_paths(const OsPath& path)
     166{
     167    pthread_mutex_lock(&g_mutex_timestamps);
     168    vfs::ForEachFile(g_VFS, path, record_timestamps, (uintptr_t)&g_timestamps, L"*", vfs::DIR_RECURSIVE);
     169    pthread_mutex_unlock(&g_mutex_timestamps);
     170
     171    CFStringEncoding encoding = (CFByteOrderLittleEndian == CFByteOrderGetCurrent()) ?
     172                                      kCFStringEncodingUTF32LE : kCFStringEncodingUTF32BE;
     173    CFStringRef macpath = CFStringCreateWithBytes(
     174        NULL,
     175        (const UInt8*)path.string().c_str(),
     176        wcslen(path.string().c_str()) * sizeof(wchar_t),
     177        encoding,
     178        false
     179    );
     180
     181    if (macpath == NULL)
     182    {
     183        debug_warn(L"add_to_g_paths CFStringCreateWithCString failed!");
     184        return;
     185    }
     186    CFArrayAppendValue(g_paths, macpath);
     187    CFArraySortValues(g_paths, CFRangeMake(0, CFArrayGetCount(g_paths)), (CFComparatorFunction)CFStringCompare, NULL);
     188    CFRelease(macpath);
     189}
     190
     191static Status compare_timestamps(t_timestamp_map& old_map, t_timestamp_map& new_map)
     192{
     193    #define time_equal(a, b) (a.tv_sec == b.tv_sec && a.tv_nsec == b.tv_nsec)
     194
     195    t_timestamp_map_iterator it_old;
     196    t_timestamp_map_iterator it_new;
     197    t_timestamp_inner_map_iterator it_old_inner;
     198    t_timestamp_inner_map_iterator it_new_inner;
     199    t_timestamp_inner_map inner_map;
     200    for (it_old = old_map.begin(); it_old != old_map.end(); it_old++)
     201    {
     202        inner_map = it_old->second;
     203        it_new = new_map.find(it_old->first);
     204        for (it_old_inner = inner_map.begin(); it_old_inner != inner_map.end(); it_old_inner++)
     205        {
     206            if ((it_new == new_map.end()) || ((it_new_inner = it_new->second.find(it_old_inner->first)) == it_new->second.end()))
     207            {
     208                // deleted
     209                DirWatchNotification dn(it_old_inner->first, DirWatchNotification::Deleted);
     210                pthread_mutex_lock(&g_mutex_notifications);
     211                g_notifications.push_back(dn);
     212                pthread_mutex_unlock(&g_mutex_notifications);
     213            }
     214            else if (!time_equal(it_new_inner->second.ts, it_old_inner->second.ts) || (it_new_inner->second.size != it_old_inner->second.size))
     215            {
     216                // updated
     217                DirWatchNotification dn(it_old_inner->first, DirWatchNotification::Changed);
     218                pthread_mutex_lock(&g_mutex_notifications);
     219                g_notifications.push_back(dn);
     220                pthread_mutex_unlock(&g_mutex_notifications);
     221            }
     222        }
     223    }
     224    for (it_new = new_map.begin(); it_new != new_map.end(); it_new++)
     225    {
     226        inner_map = it_new->second;
     227        it_old = old_map.find(it_new->first);
     228        for (it_new_inner = inner_map.begin(); it_new_inner != inner_map.end(); it_new_inner++)
     229        {
     230            if ((it_old == old_map.end()) || ((it_old_inner = it_old->second.find(it_new_inner->first)) == it_old->second.end()))
     231            {
     232                // new
     233                DirWatchNotification dn(it_new_inner->first, DirWatchNotification::Created);
     234                pthread_mutex_lock(&g_mutex_notifications);
     235                g_notifications.push_back(dn);
     236                pthread_mutex_unlock(&g_mutex_notifications);
     237            }
     238        }
     239    }
     240
     241    #undef time_equal
     242    return INFO::OK;
     243}
     244
     245static void fsevents_callback(FSEventStreamRef UNUSED(streamRef), void* UNUSED(clientCallBackInfo),
     246                  int numEvents,
     247                  const char *const eventPaths[],
     248                  const FSEventStreamEventFlags *eventFlags,
     249                  const uint64_t* UNUSED(eventIDs))
     250{
     251    for (int i=0; i < numEvents; i++)
     252    {
     253        size_t flags = 0;
     254        if (eventFlags[i] & kFSEventStreamEventFlagHistoryDone)
     255        {
     256            continue;
     257        }
     258        else if (eventFlags[i] & kFSEventStreamEventFlagRootChanged)
     259        {
     260            // do not handle this flag temporarily
     261            continue;
     262        }
     263        else if (eventFlags[i] & kFSEventStreamEventFlagMustScanSubDirs)
     264        {
     265            flags |= vfs::DIR_RECURSIVE;
     266        }
     267        t_timestamp_map new_fs;
     268        vfs::ForEachFile(g_VFS, eventPaths[i], record_timestamps, (uintptr_t)&new_fs, L"*", flags);
     269        t_timestamp_map_iterator it_new;
     270        t_timestamp_map_iterator it_g;
     271        t_timestamp_map old_fs;
     272        pthread_mutex_lock(&g_mutex_timestamps);
     273        for (it_new = new_fs.begin(); it_new != new_fs.end(); it_new++)
     274        {
     275            it_g = g_timestamps.find(it_new->first);
     276            if (it_g != g_timestamps.end())
     277            {
     278                old_fs.insert(std::pair<OsPath, t_timestamp_inner_map>(it_g->first, it_g->second));
     279            }
     280        }
     281        for (it_new = new_fs.begin(); it_new != new_fs.end(); it_new++)
     282        {
     283            g_timestamps[it_new->first] = it_new->second;
     284        }
     285        pthread_mutex_unlock(&g_mutex_timestamps);
     286        compare_timestamps(old_fs, new_fs);
     287    }
     288}
     289
     290static Status record_timestamps(const VfsPath& pathname, const FileInfo& UNUSED(fileInfo), const uintptr_t cbData)
     291{
     292    struct stat st;
     293    if (wstat(pathname.string().c_str(), &st) == 0)
     294    {
     295        t_timestamp_map* pmap = (t_timestamp_map*)cbData;
     296        wcscpy(path_buff, pathname.string().c_str());
     297        char* dir = dirname(path_buff);
     298        t_timestamp_map_iterator it = pmap->find(dir);
     299        struct dentry d = {ts:st.st_mtimespec, size:st.st_size, is_dir: S_ISDIR(st.st_mode)};
     300        if (it != pmap->end())
     301        {
     302            it->second.insert(std::pair<OsPath, struct dentry>(pathname, d));
     303        }
     304        else
     305        {
     306            t_timestamp_inner_map inner_map;
     307            inner_map.insert(std::pair<OsPath, struct dentry>(pathname, d));
     308            pmap->insert(std::pair<OsPath, t_timestamp_inner_map>(dir, inner_map));
     309        }
     310    }
     311    return INFO::OK;
     312}
     313
     314static Status remove_timestamps(const VfsPath& pathname, const FileInfo& UNUSED(fileInfo), const uintptr_t cbData)
     315{
     316    struct stat st;
     317    if (wstat(pathname.string().c_str(), &st) == 0)
     318    {
     319        t_timestamp_map* pmap = (t_timestamp_map*)cbData;
     320        wcscpy(path_buff, pathname.string().c_str());
     321        char* dir = dirname(path_buff);
     322        t_timestamp_map_iterator it = pmap->find(dir);
     323        if (it != pmap->end())
     324        {
     325            t_timestamp_inner_map_iterator it_inner = it->second.find((const char*)(pathname.string().c_str()));
     326            if (it_inner != it->second.end())
     327            {
     328                it->second.erase(it_inner);
     329            }
     330        }
     331    }
     332    return INFO::OK;
     333}
     334
     335Status dir_watch_Delete(const OsPath& path)
     336{
     337    CFStringEncoding encoding = (CFByteOrderLittleEndian == CFByteOrderGetCurrent()) ?
     338                                      kCFStringEncodingUTF32LE : kCFStringEncodingUTF32BE;
     339    CFStringRef macpath = CFStringCreateWithBytes(
     340        NULL,
     341        (const UInt8*)path.string().c_str(),
     342        wcslen(path.string().c_str()) * sizeof(wchar_t),
     343        encoding,
     344        false
     345    );
     346    if (macpath == NULL)
     347    {
     348        debug_warn(L"dir_watch_Delete CFStringCreateWithCString failed!");
     349        return ERR::FAIL;
     350    }
     351    CFIndex idx = CFArrayBSearchValues(g_paths, CFRangeMake(0, CFArrayGetCount(g_paths)), macpath, (CFComparatorFunction)CFStringCompare, NULL);
     352    CFRelease(macpath);
     353    CFArrayRemoveValueAtIndex(g_paths, idx);
     354    pthread_mutex_lock(&g_mutex_timestamps);
     355    vfs::ForEachFile(g_VFS, path, remove_timestamps, (uintptr_t)&g_timestamps, L"*", vfs::DIR_RECURSIVE);
     356    pthread_mutex_unlock(&g_mutex_timestamps);
     357    write(pipes[1], "1", 1);
     358    return INFO::OK;
     359}
     360
     361Status dir_watch_Add(const OsPath& path, PDirWatch& dirWatch)
     362{
     363    if (!initialized)
     364    {
     365        if (0 != pipe(pipes))
     366        {
     367            debug_warn(L"dir_watch_Add pipe failed!");
     368            return ERR::FAIL;
     369        }
     370        unsigned long nb = 1;
     371        if (-1 == ioctl(pipes[0], FIONBIO, &nb))
     372        {
     373            debug_warn(L"dir_watch_Add ioctl failed!");
     374            return ERR::FAIL;
     375        }
     376        g_paths = CFArrayCreateMutable(kCFAllocatorDefault, 0, &kCFTypeArrayCallBacks);
     377        if (g_paths == NULL)
     378        {
     379            debug_warn(L"dir_watch_Add CFArrayCreateMutable failed!");
     380            return ERR::FAIL;
     381        }
     382        add_to_g_paths(path);
     383
     384        if (pthread_create(&g_event_loop_thread, NULL, &event_loop, NULL))
     385        {
     386            debug_warn(L"dir_watch_Add pthread_create failed!");
     387            return ERR::FAIL;
     388        }
     389        atexit(fam_deinit);
     390        initialized = 1;
     391    }
     392    else
     393    {
     394        add_to_g_paths(path);
     395        write(pipes[1], "1", 1);
     396    }
     397
     398    PDirWatch tmpDirWatch(new DirWatch);
     399    dirWatch.swap(tmpDirWatch);
     400    dirWatch->path = path;
     401
    30402    return INFO::OK;
    31403}
    32404
    33 Status dir_watch_Poll(DirWatchNotifications& UNUSED(notifications))
     405Status dir_watch_Poll(DirWatchNotifications& notifications)
    34406{
     407    if (1 != initialized)
     408        return ERR::FAIL;
     409    std::vector<DirWatchNotification> polled_notifications;
     410
     411    pthread_mutex_lock(&g_mutex_notifications);
     412    g_notifications.swap(polled_notifications);
     413    pthread_mutex_unlock(&g_mutex_notifications);
     414
     415    for (size_t i = 0; i < polled_notifications.size(); ++i)
     416    {
     417        notifications.push_back(polled_notifications[i]);
     418    }
    35419    return INFO::OK;
    36420}
     421