Ticket #1707: ParallelFor_rangemanager.patch

File ParallelFor_rangemanager.patch, 10.0 KB (added by Jorma Rebane, 9 years ago)

Implemented Parallel For on pthreads (scary), improves rangemanager performance on multicore platforms.

  • ps/Parallel.cpp

     
     1/* Copyright (C) 2013 Wildfire Games.
     2 * This file is part of 0 A.D.
     3 *
     4 * 0 A.D. is free software: you can redistribute it and/or modify
     5 * it under the terms of the GNU General Public License as published by
     6 * the Free Software Foundation, either version 2 of the License, or
     7 * (at your option) any later version.
     8 *
     9 * 0 A.D. is distributed in the hope that it will be useful,
     10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
     11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     12 * GNU General Public License for more details.
     13 *
     14 * You should have received a copy of the GNU General Public License
     15 * along with 0 A.D.  If not, see <http://www.gnu.org/licenses/>.
     16 */
     17#include "precompiled.h"
     18
     19#include "Parallel.h"
     20#include <lib/posix/posix_pthread.h>
     21#include <lib/sysdep/os_cpu.h>
     22
     23namespace ps
     24{
     25    #define MAX_WORKER_THREADS 8
     26
     27    struct worker
     28    {
     29        pthread_t thread;
     30        sem_t jobPending;
     31        sem_t workDone;
     32
     33        void (*job)(void* param);
     34        void* arg;
     35    };
     36    static worker   g_Workers[MAX_WORKER_THREADS];
     37    static bool     g_Initialized   = false;
     38    static int      g_NumWorkers    = 0; // default workers 0, incase we have a 1-core CPU
     39
     40
     41
     42    extern "C" static void* _worker(void* arg)
     43    {
     44        worker* w = (worker*)arg;
     45        while (true)
     46        {
     47            sem_wait(&w->jobPending);   // wait for job signal
     48            w->job(w->arg);             // launch the job
     49            sem_post(&w->workDone);     // post done signal
     50        }
     51
     52    }
     53    static void uninit_workers()
     54    {
     55        for (int i = 0; i < g_NumWorkers; ++i)
     56        {
     57            pthread_cancel(g_Workers[i].thread);
     58            sem_destroy(&g_Workers[i].jobPending);
     59            sem_destroy(&g_Workers[i].workDone);
     60        }
     61    }
     62    static int init_workers()
     63    {
     64        if (!g_Initialized)
     65        {
     66            int numCPUS = os_cpu_NumProcessors();
     67            if (numCPUS > 1)
     68            {
     69                if (numCPUS > MAX_WORKER_THREADS)
     70                    numCPUS = MAX_WORKER_THREADS;
     71
     72                g_NumWorkers = numCPUS;
     73                for (int i = 0; i < numCPUS; ++i)
     74                {
     75                    sem_init(&g_Workers[i].jobPending, 0, 0);
     76                    sem_init(&g_Workers[i].workDone, 0, 0);
     77                    pthread_create(&g_Workers[i].thread, NULL, _worker, &g_Workers[i]);
     78                }
     79                atexit(uninit_workers);
     80            }
     81            g_Initialized = true;
     82        }
     83        return 1;
     84    }
     85
     86
     87
     88    struct pfor
     89    {
     90        int startIndex;
     91        int endIndex;
     92        int step;
     93        pfor_functor* callback;
     94    };
     95    static void pfor_threadfunc(pfor* arg)
     96    {
     97        int i            = arg->startIndex;
     98        int end          = arg->endIndex;
     99        int step         = arg->step;
     100        pfor_functor* cb = arg->callback;
     101        for (; i < end; i += step)
     102            if ((*cb)(i) == false)
     103                break;
     104    }
     105    void parallel_for(int startIndex, int endIndex, pfor_functor& functor)
     106    {
     107        static int initialized = init_workers(); // init once
     108
     109        // special case: we don't have any workers (1-core CPU)
     110        if (g_NumWorkers == 0)
     111        {
     112            // just a normal for-loop then
     113            for (int i = startIndex; i < endIndex; ++i)
     114                if (functor(i) == false)
     115                    break;
     116            return;
     117        }
     118
     119        pfor params[MAX_WORKER_THREADS];
     120
     121        // launch threads
     122        for (int i = 0; i < g_NumWorkers; ++i)
     123        {
     124            params[i].startIndex = startIndex;
     125            params[i].endIndex = endIndex;
     126            params[i].step = g_NumWorkers;
     127            params[i].callback = &functor;
     128            g_Workers[i].job = (void(*)(void*))&pfor_threadfunc;
     129            g_Workers[i].arg = &params[i];
     130
     131            sem_post(&g_Workers[i].jobPending); // notify worker - new job pending
     132        }
     133
     134        // wait for them to finish
     135        for (int i = 0; i < g_NumWorkers; ++i)
     136        {
     137            sem_wait(&g_Workers[i].workDone); // wait for work to finish
     138        }
     139    }
     140
     141}
  • ps/Parallel.h

     
     1/* Copyright (C) 2013 Wildfire Games.
     2 * This file is part of 0 A.D.
     3 *
     4 * 0 A.D. is free software: you can redistribute it and/or modify
     5 * it under the terms of the GNU General Public License as published by
     6 * the Free Software Foundation, either version 2 of the License, or
     7 * (at your option) any later version.
     8 *
     9 * 0 A.D. is distributed in the hope that it will be useful,
     10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
     11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     12 * GNU General Public License for more details.
     13 *
     14 * You should have received a copy of the GNU General Public License
     15 * along with 0 A.D.  If not, see <http://www.gnu.org/licenses/>.
     16 */
     17
     18#ifndef INCLUDED_PARALLEL
     19#define INCLUDED_PARALLEL
     20
     21
     22namespace ps
     23{
     24
     25    struct pfor_functor
     26    {
     27        /**
     28         * Parallel - For callback.
     29         * @return TRUE to continue iteration, FALSE to break from the loop.
     30         */
     31        virtual bool operator()(int atIndex) = 0;
     32
     33        // make sure the correct destructors get used
     34        virtual ~pfor_functor() {}
     35    };
     36
     37    /**
     38     * A simple parallel FOR loop.
     39     * for (int i = 0; i < 10; i++)  ->  parallel_for(0, 10, pfor_functor())
     40     *
     41     * @param startIndex Starting index (inclusive)
     42     * @param endIndex Ending index (exclusive)
     43     * @param functor Functor to call on every index
     44     */
     45    void parallel_for(int startIndex, int endIndex, pfor_functor& functor);
     46}
     47#endif
     48 No newline at end of file
  • simulation2/components/CCmpRangeManager.cpp

     
    3939#include "ps/Profile.h"
    4040#include "renderer/Scene.h"
    4141#include "lib/ps_stl.h"
     42#include "ps/Parallel.h"
    4243
    4344
    4445#define DEBUG_RANGE_MANAGER_BOUNDS 0
     
    814815            m_DebugOverlayLines.clear();
    815816    }
    816817
    817     /**
    818      * Update all currently-enabled active queries.
    819      */
    820     void ExecuteActiveQueries()
     818
     819
     820
     821
     822    struct ExecuteParallelQuery : ps::pfor_functor
    821823    {
    822         PROFILE3("ExecuteActiveQueries");
    823 
    824824        // Store a queue of all messages before sending any, so we can assume
    825825        // no entities will move until we've finished checking all the ranges
    826826        std::vector<std::pair<entity_id_t, CMessageRangeUpdate> > messages;
    827         std::vector<entity_id_t> results;
    828         std::vector<entity_id_t> added;
    829         std::vector<entity_id_t> removed;
     827        std::vector<std::pair<tag_t, Query*> > queries;
     828        CCmpRangeManager* rangeMan;
     829        CMutex mutex;
    830830
    831         for (std::map<tag_t, Query>::iterator it = m_Queries.begin(); it != m_Queries.end(); ++it)
     831
     832        ExecuteParallelQuery(CCmpRangeManager* rangeMan) : rangeMan(rangeMan)
    832833        {
    833             Query& query = it->second;
     834        }
    834835
     836        virtual bool operator()(int atIndex)
     837        {
     838            tag_t tag = queries[atIndex].first;
     839            Query& query = *queries[atIndex].second;
     840
    835841            if (!query.enabled)
    836                 continue;
     842                return true; // continue
    837843
    838844            CmpPtr<ICmpPosition> cmpSourcePosition(query.source);           
    839845            if (!cmpSourcePosition || !cmpSourcePosition->IsInWorld())
    840                 continue;
     846                return true; // continue
    841847
    842             results.clear();
     848            std::vector<entity_id_t> added;
     849            std::vector<entity_id_t> removed;
     850            std::vector<entity_id_t> results;
    843851            results.reserve(query.lastMatch.size());
    844             PerformQuery(query, results);
     852           
     853            rangeMan->PerformQuery(query, results);
    845854
    846855            // Compute the changes vs the last match
    847             added.clear();
    848             removed.clear();
    849856            // Return the 'added' list sorted by distance from the entity
    850857            // (Don't bother sorting 'removed' because they might not even have positions or exist any more)
    851             std::set_difference(results.begin(), results.end(), query.lastMatch.begin(), query.lastMatch.end(),
    852                 std::back_inserter(added));
    853             std::set_difference(query.lastMatch.begin(), query.lastMatch.end(), results.begin(), results.end(),
    854                 std::back_inserter(removed));
     858            std::set_difference(results.begin(), results.end(),
     859                query.lastMatch.begin(), query.lastMatch.end(), std::back_inserter(added));
     860            std::set_difference(query.lastMatch.begin(), query.lastMatch.end(),
     861                results.begin(), results.end(), std::back_inserter(removed));
     862           
    855863            if (added.empty() && removed.empty())
    856                 continue;
     864                return true; // continue
    857865
    858             std::stable_sort(added.begin(), added.end(), EntityDistanceOrdering(m_EntityData, cmpSourcePosition->GetPosition2D()));
     866            std::stable_sort(added.begin(), added.end(),
     867                EntityDistanceOrdering(rangeMan->m_EntityData, cmpSourcePosition->GetPosition2D()));
    859868
    860             messages.resize(messages.size() + 1);
    861             std::pair<entity_id_t, CMessageRangeUpdate>& back = messages.back();
    862             back.first = query.source.GetId();
    863             back.second.tag = it->first;
    864             back.second.added.swap(added);
    865             back.second.removed.swap(removed);
    866             it->second.lastMatch.swap(results);
     869            {
     870                CScopeLock lock(mutex);
     871                messages.resize(messages.size() + 1);
     872                std::pair<entity_id_t, CMessageRangeUpdate>& back = messages.back();
     873                back.first = query.source.GetId();
     874                back.second.tag = tag;
     875                back.second.added.swap(added);
     876                back.second.removed.swap(removed);
     877           
     878            }
     879            query.lastMatch.swap(results);
     880
     881            return true; // continue
    867882        }
     883    };
    868884
     885    /**
     886     * Update all currently-enabled active queries.
     887     */
     888    void ExecuteActiveQueries()
     889    {
     890        PROFILE3("ExecuteActiveQueries");
     891
     892        int size = m_Queries.size();
     893        if (size == 0)
     894            return;
     895
     896        ExecuteParallelQuery pquery(this);
     897
     898        // prepare queries into a random-access vector, so we can use Parallel For
     899        pquery.queries.reserve(size);
     900        for (std::map<tag_t, Query>::iterator it = m_Queries.begin(); it != m_Queries.end(); ++it)
     901            pquery.queries.push_back(std::make_pair(it->first, &it->second));
     902
     903        // run the parallel for loop
     904        ps::parallel_for(0, size, pquery);
     905
    869906        CComponentManager& cmpMgr = GetSimContext().GetComponentManager();
    870         for (size_t i = 0; i < messages.size(); ++i)
    871             cmpMgr.PostMessage(messages[i].first, messages[i].second);
     907        for (size_t i = 0; i < pquery.messages.size(); ++i)
     908            cmpMgr.PostMessage(pquery.messages[i].first, pquery.messages[i].second);
    872909    }
    873910
    874911    /**