Ticket #1707: ParallelFor_rangemanager.patch
File ParallelFor_rangemanager.patch, 10.0 KB (added by , 11 years ago) |
---|
-
ps/Parallel.cpp
1 /* Copyright (C) 2013 Wildfire Games. 2 * This file is part of 0 A.D. 3 * 4 * 0 A.D. is free software: you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation, either version 2 of the License, or 7 * (at your option) any later version. 8 * 9 * 0 A.D. is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * GNU General Public License for more details. 13 * 14 * You should have received a copy of the GNU General Public License 15 * along with 0 A.D. If not, see <http://www.gnu.org/licenses/>. 16 */ 17 #include "precompiled.h" 18 19 #include "Parallel.h" 20 #include <lib/posix/posix_pthread.h> 21 #include <lib/sysdep/os_cpu.h> 22 23 namespace ps 24 { 25 #define MAX_WORKER_THREADS 8 26 27 struct worker 28 { 29 pthread_t thread; 30 sem_t jobPending; 31 sem_t workDone; 32 33 void (*job)(void* param); 34 void* arg; 35 }; 36 static worker g_Workers[MAX_WORKER_THREADS]; 37 static bool g_Initialized = false; 38 static int g_NumWorkers = 0; // default workers 0, incase we have a 1-core CPU 39 40 41 42 extern "C" static void* _worker(void* arg) 43 { 44 worker* w = (worker*)arg; 45 while (true) 46 { 47 sem_wait(&w->jobPending); // wait for job signal 48 w->job(w->arg); // launch the job 49 sem_post(&w->workDone); // post done signal 50 } 51 52 } 53 static void uninit_workers() 54 { 55 for (int i = 0; i < g_NumWorkers; ++i) 56 { 57 pthread_cancel(g_Workers[i].thread); 58 sem_destroy(&g_Workers[i].jobPending); 59 sem_destroy(&g_Workers[i].workDone); 60 } 61 } 62 static int init_workers() 63 { 64 if (!g_Initialized) 65 { 66 int numCPUS = os_cpu_NumProcessors(); 67 if (numCPUS > 1) 68 { 69 if (numCPUS > MAX_WORKER_THREADS) 70 numCPUS = MAX_WORKER_THREADS; 71 72 g_NumWorkers = numCPUS; 73 for (int i = 0; i < numCPUS; ++i) 74 { 75 sem_init(&g_Workers[i].jobPending, 0, 0); 76 sem_init(&g_Workers[i].workDone, 0, 0); 77 pthread_create(&g_Workers[i].thread, NULL, _worker, &g_Workers[i]); 78 } 79 atexit(uninit_workers); 80 } 81 g_Initialized = true; 82 } 83 return 1; 84 } 85 86 87 88 struct pfor 89 { 90 int startIndex; 91 int endIndex; 92 int step; 93 pfor_functor* callback; 94 }; 95 static void pfor_threadfunc(pfor* arg) 96 { 97 int i = arg->startIndex; 98 int end = arg->endIndex; 99 int step = arg->step; 100 pfor_functor* cb = arg->callback; 101 for (; i < end; i += step) 102 if ((*cb)(i) == false) 103 break; 104 } 105 void parallel_for(int startIndex, int endIndex, pfor_functor& functor) 106 { 107 static int initialized = init_workers(); // init once 108 109 // special case: we don't have any workers (1-core CPU) 110 if (g_NumWorkers == 0) 111 { 112 // just a normal for-loop then 113 for (int i = startIndex; i < endIndex; ++i) 114 if (functor(i) == false) 115 break; 116 return; 117 } 118 119 pfor params[MAX_WORKER_THREADS]; 120 121 // launch threads 122 for (int i = 0; i < g_NumWorkers; ++i) 123 { 124 params[i].startIndex = startIndex; 125 params[i].endIndex = endIndex; 126 params[i].step = g_NumWorkers; 127 params[i].callback = &functor; 128 g_Workers[i].job = (void(*)(void*))&pfor_threadfunc; 129 g_Workers[i].arg = ¶ms[i]; 130 131 sem_post(&g_Workers[i].jobPending); // notify worker - new job pending 132 } 133 134 // wait for them to finish 135 for (int i = 0; i < g_NumWorkers; ++i) 136 { 137 sem_wait(&g_Workers[i].workDone); // wait for work to finish 138 } 139 } 140 141 } -
ps/Parallel.h
1 /* Copyright (C) 2013 Wildfire Games. 2 * This file is part of 0 A.D. 3 * 4 * 0 A.D. is free software: you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation, either version 2 of the License, or 7 * (at your option) any later version. 8 * 9 * 0 A.D. is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * GNU General Public License for more details. 13 * 14 * You should have received a copy of the GNU General Public License 15 * along with 0 A.D. If not, see <http://www.gnu.org/licenses/>. 16 */ 17 18 #ifndef INCLUDED_PARALLEL 19 #define INCLUDED_PARALLEL 20 21 22 namespace ps 23 { 24 25 struct pfor_functor 26 { 27 /** 28 * Parallel - For callback. 29 * @return TRUE to continue iteration, FALSE to break from the loop. 30 */ 31 virtual bool operator()(int atIndex) = 0; 32 33 // make sure the correct destructors get used 34 virtual ~pfor_functor() {} 35 }; 36 37 /** 38 * A simple parallel FOR loop. 39 * for (int i = 0; i < 10; i++) -> parallel_for(0, 10, pfor_functor()) 40 * 41 * @param startIndex Starting index (inclusive) 42 * @param endIndex Ending index (exclusive) 43 * @param functor Functor to call on every index 44 */ 45 void parallel_for(int startIndex, int endIndex, pfor_functor& functor); 46 } 47 #endif 48 No newline at end of file -
simulation2/components/CCmpRangeManager.cpp
39 39 #include "ps/Profile.h" 40 40 #include "renderer/Scene.h" 41 41 #include "lib/ps_stl.h" 42 #include "ps/Parallel.h" 42 43 43 44 44 45 #define DEBUG_RANGE_MANAGER_BOUNDS 0 … … 814 815 m_DebugOverlayLines.clear(); 815 816 } 816 817 817 /** 818 * Update all currently-enabled active queries. 819 */ 820 void ExecuteActiveQueries() 818 819 820 821 822 struct ExecuteParallelQuery : ps::pfor_functor 821 823 { 822 PROFILE3("ExecuteActiveQueries");823 824 824 // Store a queue of all messages before sending any, so we can assume 825 825 // no entities will move until we've finished checking all the ranges 826 826 std::vector<std::pair<entity_id_t, CMessageRangeUpdate> > messages; 827 std::vector< entity_id_t> results;828 std::vector<entity_id_t> added;829 std::vector<entity_id_t> removed;827 std::vector<std::pair<tag_t, Query*> > queries; 828 CCmpRangeManager* rangeMan; 829 CMutex mutex; 830 830 831 for (std::map<tag_t, Query>::iterator it = m_Queries.begin(); it != m_Queries.end(); ++it) 831 832 ExecuteParallelQuery(CCmpRangeManager* rangeMan) : rangeMan(rangeMan) 832 833 { 833 Query& query = it->second;834 } 834 835 836 virtual bool operator()(int atIndex) 837 { 838 tag_t tag = queries[atIndex].first; 839 Query& query = *queries[atIndex].second; 840 835 841 if (!query.enabled) 836 continue;842 return true; // continue 837 843 838 844 CmpPtr<ICmpPosition> cmpSourcePosition(query.source); 839 845 if (!cmpSourcePosition || !cmpSourcePosition->IsInWorld()) 840 continue;846 return true; // continue 841 847 842 results.clear(); 848 std::vector<entity_id_t> added; 849 std::vector<entity_id_t> removed; 850 std::vector<entity_id_t> results; 843 851 results.reserve(query.lastMatch.size()); 844 PerformQuery(query, results); 852 853 rangeMan->PerformQuery(query, results); 845 854 846 855 // Compute the changes vs the last match 847 added.clear();848 removed.clear();849 856 // Return the 'added' list sorted by distance from the entity 850 857 // (Don't bother sorting 'removed' because they might not even have positions or exist any more) 851 std::set_difference(results.begin(), results.end(), query.lastMatch.begin(), query.lastMatch.end(), 852 std::back_inserter(added)); 853 std::set_difference(query.lastMatch.begin(), query.lastMatch.end(), results.begin(), results.end(), 854 std::back_inserter(removed)); 858 std::set_difference(results.begin(), results.end(), 859 query.lastMatch.begin(), query.lastMatch.end(), std::back_inserter(added)); 860 std::set_difference(query.lastMatch.begin(), query.lastMatch.end(), 861 results.begin(), results.end(), std::back_inserter(removed)); 862 855 863 if (added.empty() && removed.empty()) 856 continue;864 return true; // continue 857 865 858 std::stable_sort(added.begin(), added.end(), EntityDistanceOrdering(m_EntityData, cmpSourcePosition->GetPosition2D())); 866 std::stable_sort(added.begin(), added.end(), 867 EntityDistanceOrdering(rangeMan->m_EntityData, cmpSourcePosition->GetPosition2D())); 859 868 860 messages.resize(messages.size() + 1); 861 std::pair<entity_id_t, CMessageRangeUpdate>& back = messages.back(); 862 back.first = query.source.GetId(); 863 back.second.tag = it->first; 864 back.second.added.swap(added); 865 back.second.removed.swap(removed); 866 it->second.lastMatch.swap(results); 869 { 870 CScopeLock lock(mutex); 871 messages.resize(messages.size() + 1); 872 std::pair<entity_id_t, CMessageRangeUpdate>& back = messages.back(); 873 back.first = query.source.GetId(); 874 back.second.tag = tag; 875 back.second.added.swap(added); 876 back.second.removed.swap(removed); 877 878 } 879 query.lastMatch.swap(results); 880 881 return true; // continue 867 882 } 883 }; 868 884 885 /** 886 * Update all currently-enabled active queries. 887 */ 888 void ExecuteActiveQueries() 889 { 890 PROFILE3("ExecuteActiveQueries"); 891 892 int size = m_Queries.size(); 893 if (size == 0) 894 return; 895 896 ExecuteParallelQuery pquery(this); 897 898 // prepare queries into a random-access vector, so we can use Parallel For 899 pquery.queries.reserve(size); 900 for (std::map<tag_t, Query>::iterator it = m_Queries.begin(); it != m_Queries.end(); ++it) 901 pquery.queries.push_back(std::make_pair(it->first, &it->second)); 902 903 // run the parallel for loop 904 ps::parallel_for(0, size, pquery); 905 869 906 CComponentManager& cmpMgr = GetSimContext().GetComponentManager(); 870 for (size_t i = 0; i < messages.size(); ++i)871 cmpMgr.PostMessage( messages[i].first,messages[i].second);907 for (size_t i = 0; i < pquery.messages.size(); ++i) 908 cmpMgr.PostMessage(pquery.messages[i].first, pquery.messages[i].second); 872 909 } 873 910 874 911 /**