From 23bea0e412d80b0215af834b223003b1bcdda20e Mon Sep 17 00:00:00 2001 From: Maurizio Drocco Date: Fri, 22 Jun 2018 09:33:39 -0700 Subject: [PATCH 1/2] [pnnl/gmt#14] Reverting for fresh start Revert "[pnnl/gmt#13] moved crtqueue code outside gmt/" This reverts commit 9efbe0fd5e582e107c1097d1ac352e0650492988. --- .../{ => gmt}/crt_queue/HazardPointers.hpp | 34 +++++++ include/{ => gmt}/crt_queue/crtqueue.hpp | 92 +++++++++++++++++-- include/gmt/queue.h | 47 ---------- src/CMakeLists.txt | 2 +- src/{queue.cpp => crt_queue/crtqueue.cpp} | 6 +- 5 files changed, 125 insertions(+), 56 deletions(-) rename include/{ => gmt}/crt_queue/HazardPointers.hpp (74%) rename include/{ => gmt}/crt_queue/crtqueue.hpp (77%) rename src/{queue.cpp => crt_queue/crtqueue.cpp} (96%) diff --git a/include/crt_queue/HazardPointers.hpp b/include/gmt/crt_queue/HazardPointers.hpp similarity index 74% rename from include/crt_queue/HazardPointers.hpp rename to include/gmt/crt_queue/HazardPointers.hpp index cfbc16c..f94a78b 100644 --- a/include/crt_queue/HazardPointers.hpp +++ b/include/gmt/crt_queue/HazardPointers.hpp @@ -1,3 +1,37 @@ +/* + * Global Memory and Threading (GMT) + * + * Copyright © 2018, Battelle Memorial Institute + * All rights reserved. + * + * Battelle Memorial Institute (hereinafter Battelle) hereby grants permission to + * any person or entity lawfully obtaining a copy of this software and associated + * documentation files (hereinafter “the Software”) to redistribute and use the + * Software in source and binary forms, with or without modification. Such + * person or entity may use, copy, modify, merge, publish, distribute, + * sublicense, and/or sell copies of the Software, and may permit others to do + * so, subject to the following conditions: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name `Battelle Memorial Institute` or `Battelle` may be used in + * any form whatsoever without the express written consent of `Battelle`. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL `BATTELLE` OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + /****************************************************************************** * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia * All rights reserved. diff --git a/include/crt_queue/crtqueue.hpp b/include/gmt/crt_queue/crtqueue.hpp similarity index 77% rename from include/crt_queue/crtqueue.hpp rename to include/gmt/crt_queue/crtqueue.hpp index e084229..efeb7d8 100644 --- a/include/crt_queue/crtqueue.hpp +++ b/include/gmt/crt_queue/crtqueue.hpp @@ -1,3 +1,37 @@ +/* + * Global Memory and Threading (GMT) + * + * Copyright © 2018, Battelle Memorial Institute + * All rights reserved. + * + * Battelle Memorial Institute (hereinafter Battelle) hereby grants permission to + * any person or entity lawfully obtaining a copy of this software and associated + * documentation files (hereinafter “the Software”) to redistribute and use the + * Software in source and binary forms, with or without modification. Such + * person or entity may use, copy, modify, merge, publish, distribute, + * sublicense, and/or sell copies of the Software, and may permit others to do + * so, subject to the following conditions: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name `Battelle Memorial Institute` or `Battelle` may be used in + * any form whatsoever without the express written consent of `Battelle`. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL `BATTELLE` OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + /****************************************************************************** * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia * All rights reserved. @@ -27,13 +61,12 @@ ****************************************************************************** */ -#ifndef __CRTQUEUE_HPP__ -#define __CRTQUEUE_HPP__ +#ifndef __CRTQUEUE_H__ +#define __CRTQUEUE_H__ -#include -#include +#include -#include "crt_queue/HazardPointers.hpp" +#include "gmt/crt_queue/HazardPointers.hpp" /** @@ -261,4 +294,51 @@ class CRTurnQueue { }; -#endif //__CRTQUEUE_HPP__ +/* + * GMT-friendly wrapper + */ +extern thread_local int qmpmc_tid; +extern std::atomic qmpmc_tcnt; + +typedef CRTurnQueue qmpmc_t_; +typedef qmpmc_t_ *qmpmc_t; + +static void qmpmc_assign_tid() { + qmpmc_tid = qmpmc_tcnt++; +} + +static inline void qmpmc_push(qmpmc_t *q, void *item) { + (*q)->enqueue(item, qmpmc_tid); +} + +static inline int qmpmc_pop(qmpmc_t *q, void **dst) { + void *p{nullptr}; + if(!(p = (*q)->dequeue(qmpmc_tid))) + return 0; + *dst = p; + return 1; +} + +static inline void qmpmc_push_n(qmpmc_t *q, void **item, uint32_t n) +{ + for(uint32_t i = 0; i < n; ++i) + qmpmc_push(q, item[i]); +} + +static inline int qmpmc_pop_n(qmpmc_t *q, void **item, uint32_t n) { + uint32_t i; + for(i = 0; i < n; ++i) + if(!qmpmc_pop(q, item + i)) + break; + return i; +} + +static void qmpmc_init(qmpmc_t *q, uint32_t) { + *q = new qmpmc_t_(); +} + +static void qmpmc_destroy(qmpmc_t *q) { + (*q)->~CRTurnQueue(); +} + +#endif //__CRTQUEUE_H__ diff --git a/include/gmt/queue.h b/include/gmt/queue.h index aa5d28f..2f30ae0 100644 --- a/include/gmt/queue.h +++ b/include/gmt/queue.h @@ -315,54 +315,7 @@ INLINE int qmpmc_pop_n(qmpmc_t * q, void **item, uint32_t n) INLINE void qmpmc_assign_tid() { } #else -/* - * GMT-friendly wrapper of the CRT queue - */ #include "crt_queue/crtqueue.hpp" - -extern thread_local int qmpmc_tid; -extern std::atomic qmpmc_tcnt; - -typedef CRTurnQueue qmpmc_t_; -typedef qmpmc_t_ *qmpmc_t; - -INLINE void qmpmc_assign_tid() { - qmpmc_tid = qmpmc_tcnt++; -} - -INLINE void qmpmc_push(qmpmc_t *q, void *item) { - (*q)->enqueue(item, qmpmc_tid); -} - -INLINE int qmpmc_pop(qmpmc_t *q, void **dst) { - void *p{nullptr}; - if(!(p = (*q)->dequeue(qmpmc_tid))) - return 0; - *dst = p; - return 1; -} - -INLINE void qmpmc_push_n(qmpmc_t *q, void **item, uint32_t n) -{ - for(uint32_t i = 0; i < n; ++i) - qmpmc_push(q, item[i]); -} - -INLINE int qmpmc_pop_n(qmpmc_t *q, void **item, uint32_t n) { - uint32_t i; - for(i = 0; i < n; ++i) - if(!qmpmc_pop(q, item + i)) - break; - return i; -} - -INLINE void qmpmc_init(qmpmc_t *q, uint32_t) { - *q = new qmpmc_t_(); -} - -INLINE void qmpmc_destroy(qmpmc_t *q) { - (*q)->~CRTurnQueue(); -} #endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1a6b0eb..8c9ea34 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -2,10 +2,10 @@ set(sources aggregation.c debug.c gmt_malloc.c gmt_put_get.c main.c network.c uthread.c comm_server.c gmt_execute.c gmt_misc.c gmt_ucontext.c memory.c profiling.c utils.c config.c gmt_for.c helper.c mtask.c timing.c worker.c - queue.cpp ) if(GMT_VSIZE_MPMC) +set(sources ${sources} crt_queue/crtqueue.cpp) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DVSIZE_MPMC") endif() diff --git a/src/queue.cpp b/src/crt_queue/crtqueue.cpp similarity index 96% rename from src/queue.cpp rename to src/crt_queue/crtqueue.cpp index 5ec1e95..8c291e5 100644 --- a/src/queue.cpp +++ b/src/crt_queue/crtqueue.cpp @@ -31,9 +31,11 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#ifdef VSIZE_MPMC + +#include #include +#include +#include thread_local int qmpmc_tid{-1}; std::atomic qmpmc_tcnt{0}; -#endif From 948cf98a05fc21819c48aab069968f4550303d85 Mon Sep 17 00:00:00 2001 From: Maurizio Drocco Date: Fri, 22 Jun 2018 09:33:52 -0700 Subject: [PATCH 2/2] [pnnl/gmt#14] Reverting for fresh start Revert "[pnnl/gmt#12] dynamic queues" This reverts commit 3166c015075cd15cf91de4c6ffe78b5dff4cc8b4. --- CMakeLists.txt | 3 - include/gmt/crt_queue/HazardPointers.hpp | 188 ------------- include/gmt/crt_queue/crtqueue.hpp | 344 ----------------------- include/gmt/queue.h | 11 +- src/CMakeLists.txt | 6 - src/comm_server.c | 3 - src/crt_queue/crtqueue.cpp | 41 --- src/helper.c | 3 - src/mtask.c | 3 - src/worker.c | 3 - 10 files changed, 1 insertion(+), 604 deletions(-) delete mode 100644 include/gmt/crt_queue/HazardPointers.hpp delete mode 100644 include/gmt/crt_queue/crtqueue.hpp delete mode 100644 src/crt_queue/crtqueue.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 210454e..4341845 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -69,9 +69,6 @@ include_directories(${GMT_MAIN_INCLUDE_DIR} ${GMT_INCLUDE_DIR}) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wabi -Wextra") -# Set dynamic queues -set(GMT_VSIZE_MPMC true) - configure_file(${GMT_MAIN_INCLUDE_DIR}/gmt/config.h.in ${GMT_INCLUDE_DIR}/gmt/config.h) install(FILES ${GMT_INCLUDE_DIR}/gmt/config.h DESTINATION include/gmt) diff --git a/include/gmt/crt_queue/HazardPointers.hpp b/include/gmt/crt_queue/HazardPointers.hpp deleted file mode 100644 index f94a78b..0000000 --- a/include/gmt/crt_queue/HazardPointers.hpp +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Global Memory and Threading (GMT) - * - * Copyright © 2018, Battelle Memorial Institute - * All rights reserved. - * - * Battelle Memorial Institute (hereinafter Battelle) hereby grants permission to - * any person or entity lawfully obtaining a copy of this software and associated - * documentation files (hereinafter “the Software”) to redistribute and use the - * Software in source and binary forms, with or without modification. Such - * person or entity may use, copy, modify, merge, publish, distribute, - * sublicense, and/or sell copies of the Software, and may permit others to do - * so, subject to the following conditions: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the name `Battelle Memorial Institute` or `Battelle` may be used in - * any form whatsoever without the express written consent of `Battelle`. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL `BATTELLE` OR CONTRIBUTORS BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON - * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -/****************************************************************************** - * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Concurrency Freaks nor the - * names of its contributors may be used to endorse or promote products - * derived from this software without specific prior written permission. - - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - ****************************************************************************** - */ - -#ifndef _HAZARD_POINTERS_HPP_ -#define _HAZARD_POINTERS_HPP_ - -#include -#include -#include - - -template -class HazardPointers { - -private: - static const int HP_MAX_THREADS = 128; - static const int HP_MAX_HPS = 11; // This is named 'K' in the HP paper - static const int CLPAD = 128/sizeof(std::atomic); - static const int HP_THRESHOLD_R = 0; // This is named 'R' in the HP paper - static const int MAX_RETIRED = HP_MAX_THREADS*HP_MAX_HPS; // Maximum number of retired objects per thread - - const int maxHPs; - const int maxThreads; - - std::atomic hp[HP_MAX_THREADS*CLPAD][HP_MAX_HPS]; - // It's not nice that we have a lot of empty vectors, but we need padding to avoid false sharing - std::vector retiredList[HP_MAX_THREADS*CLPAD]; - -public: - HazardPointers(int maxHPs=HP_MAX_HPS, int maxThreads=HP_MAX_THREADS) : maxHPs{maxHPs}, maxThreads{maxThreads} { - for (int ithread = 0; ithread < HP_MAX_THREADS; ithread++) { - for (int ihp = 0; ihp < HP_MAX_HPS; ihp++) { - hp[ithread*CLPAD][ihp].store(nullptr, std::memory_order_relaxed); - } - } - } - - ~HazardPointers() { - for (int ithread = 0; ithread < HP_MAX_THREADS; ithread++) { - // Clear the current retired nodes - for (unsigned iret = 0; iret < retiredList[ithread*CLPAD].size(); iret++) { - delete retiredList[ithread*CLPAD][iret]; - } - } - } - - - /** - * Progress Condition: wait-free bounded (by maxHPs) - */ - void clear(const int tid) { - for (int ihp = 0; ihp < maxHPs; ihp++) { - hp[tid*CLPAD][ihp].store(nullptr, std::memory_order_release); - } - } - - - /** - * Progress Condition: wait-free population oblivious - */ - void clearOne(int ihp, const int tid) { - hp[tid*CLPAD][ihp].store(nullptr, std::memory_order_release); - } - - - /** - * Progress Condition: lock-free - */ - T* protect(int index, const std::atomic& atom, const int tid) { - T* n = nullptr; - T* ret; - while ((ret = atom.load()) != n) { - hp[tid*CLPAD][index].store(ret); - n = ret; - } - return ret; - } - - - /** - * This returns the same value that is passed as ptr, which is sometimes useful - * Progress Condition: wait-free population oblivious - */ - T* protectPtr(int index, T* ptr, const int tid) { - hp[tid*CLPAD][index].store(ptr); - return ptr; - } - - - - /** - * This returns the same value that is passed as ptr, which is sometimes useful - * Progress Condition: wait-free population oblivious - */ - T* protectRelease(int index, T* ptr, const int tid) { - hp[tid*CLPAD][index].store(ptr, std::memory_order_release); - return ptr; - } - - - /** - * Progress Condition: wait-free bounded (by the number of threads squared) - */ - void retire(T* ptr, const int tid) { - retiredList[tid*CLPAD].push_back(ptr); - if (retiredList[tid*CLPAD].size() < HP_THRESHOLD_R) return; - for (unsigned iret = 0; iret < retiredList[tid*CLPAD].size();) { - auto obj = retiredList[tid*CLPAD][iret]; - bool canDelete = true; - for (int tid = 0; tid < maxThreads && canDelete; tid++) { - for (int ihp = maxHPs-1; ihp >= 0; ihp--) { - if (hp[tid*CLPAD][ihp].load() == obj) { - canDelete = false; - break; - } - } - } - if (canDelete) { - retiredList[tid*CLPAD].erase(retiredList[tid*CLPAD].begin() + iret); - delete obj; - continue; - } - iret++; - } - } -}; - -#endif /* _HAZARD_POINTERS_HPP_ */ diff --git a/include/gmt/crt_queue/crtqueue.hpp b/include/gmt/crt_queue/crtqueue.hpp deleted file mode 100644 index efeb7d8..0000000 --- a/include/gmt/crt_queue/crtqueue.hpp +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Global Memory and Threading (GMT) - * - * Copyright © 2018, Battelle Memorial Institute - * All rights reserved. - * - * Battelle Memorial Institute (hereinafter Battelle) hereby grants permission to - * any person or entity lawfully obtaining a copy of this software and associated - * documentation files (hereinafter “the Software”) to redistribute and use the - * Software in source and binary forms, with or without modification. Such - * person or entity may use, copy, modify, merge, publish, distribute, - * sublicense, and/or sell copies of the Software, and may permit others to do - * so, subject to the following conditions: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the name `Battelle Memorial Institute` or `Battelle` may be used in - * any form whatsoever without the express written consent of `Battelle`. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL `BATTELLE` OR CONTRIBUTORS BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON - * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -/****************************************************************************** - * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Concurrency Freaks nor the - * names of its contributors may be used to endorse or promote products - * derived from this software without specific prior written permission. - - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - ****************************************************************************** - */ - -#ifndef __CRTQUEUE_H__ -#define __CRTQUEUE_H__ - -#include - -#include "gmt/crt_queue/HazardPointers.hpp" - - -/** - *

CR Turn Queue

- * - * A concurrent wait-free queue that is Multi-Producer-Multi-Consumer and does - * its own wait-free memory reclamation. - * Based on the paper "A Wait-Free Queue with Wait-Free Memory Reclamation" - * https://github.com/pramalhe/ConcurrencyFreaks/tree/master/papers/crturnqueue-2016.pdf - * - *

- * Enqueue algorithm: CR Turn enqueue - * Dequeue algorithm: CR Turn dequeue - * Consistency: Linearizable - * enqueue() progress: wait-free bounded O(N_threads) - * dequeue() progress: wait-free bounded O(N_threads) - * Memory Reclamation: Hazard Pointers (wait-free) - * - *

- * The paper on Hazard Pointers is named "Hazard Pointers: Safe Memory - * Reclamation for Lock-Free objects" and it is available here: - * http://web.cecs.pdx.edu/~walpole/class/cs510/papers/11.pdf - * - * @author Andreia Correia - * @author Pedro Ramalhete - */ -template -class CRTurnQueue { - -private: - struct Node { - T* item; - const int enqTid; - std::atomic deqTid; - std::atomic next; - - Node(T* item, int tid) : item{item}, enqTid{tid}, deqTid{IDX_NONE}, next{nullptr} { } - - bool casDeqTid(int cmp, int val) { - return deqTid.compare_exchange_strong(cmp, val); - } - }; - - static const int IDX_NONE = -1; - static const int MAX_THREADS = 128; - const int maxThreads; - - // Pointers to head and tail of the list - std::atomic head alignas(128); - std::atomic tail alignas(128); - // Enqueue requests - std::atomic enqueuers[MAX_THREADS] alignas(128); - // Dequeue requests - std::atomic deqself[MAX_THREADS] alignas(128); - std::atomic deqhelp[MAX_THREADS] alignas(128); - - - HazardPointers hp {3, maxThreads}; // We need three hazard pointers - const int kHpTail = 0; - const int kHpHead = 0; - const int kHpNext = 1; - const int kHpDeq = 2; - - Node* sentinelNode = new Node(nullptr, 0); - - - /** - * Called only from dequeue() - * - * Search for the next request to dequeue and assign it to lnext.deqTid - * It is only a request to dequeue if deqself[i] equals deqhelp[i]. - */ - int searchNext(Node* lhead, Node* lnext) { - const int turn = lhead->deqTid.load(); - for (int idx=turn+1; idx < turn+maxThreads+1; idx++) { - const int idDeq = idx%maxThreads; - if (deqself[idDeq].load() != deqhelp[idDeq].load()) continue; - if (lnext->deqTid.load() == IDX_NONE) lnext->casDeqTid(IDX_NONE, idDeq); - break; - } - return lnext->deqTid.load(); - } - - - /** - * Called only from dequeue() - * - * If the ldeqTid is not our own, we must use an HP to protect against - * deqhelp[ldeqTid] being retired-deleted-newed-reenqueued. - */ - void casDeqAndHead(Node* lhead, Node* lnext, const int tid) { - const int ldeqTid = lnext->deqTid.load(); - if (ldeqTid == tid) { - deqhelp[ldeqTid].store(lnext, std::memory_order_release); - } else { - Node* ldeqhelp = hp.protectPtr(kHpDeq, deqhelp[ldeqTid].load(), tid); - if (ldeqhelp != lnext && lhead == head.load()) { - deqhelp[ldeqTid].compare_exchange_strong(ldeqhelp, lnext); // Assign next to request - } - } - head.compare_exchange_strong(lhead, lnext); - } - - - /** - * Called only from dequeue() - * - * Giveup procedure, for when there are no nodes left to dequeue - */ - void giveUp(Node* myReq, const int tid) { - Node* lhead = head.load(); - if (deqhelp[tid].load() != myReq || lhead == tail.load()) return; - hp.protectPtr(kHpHead, lhead, tid); - if (lhead != head.load()) return; - Node* lnext = hp.protectPtr(kHpNext, lhead->next.load(), tid); - if (lhead != head.load()) return; - if (searchNext(lhead, lnext) == IDX_NONE) lnext->casDeqTid(IDX_NONE, tid); - casDeqAndHead(lhead, lnext, tid); - } - -public: - CRTurnQueue(int maxThreads=MAX_THREADS) : maxThreads(maxThreads) { - head.store(sentinelNode, std::memory_order_relaxed); - tail.store(sentinelNode, std::memory_order_relaxed); - for (int i = 0; i < maxThreads; i++) { - enqueuers[i].store(nullptr, std::memory_order_relaxed); - // deqself[i] != deqhelp[i] means that isRequest=false - deqself[i].store(new Node(nullptr, 0), std::memory_order_relaxed); - deqhelp[i].store(new Node(nullptr, 0), std::memory_order_relaxed); - } - } - - - ~CRTurnQueue() { - delete sentinelNode; - while (dequeue(0) != nullptr); // Drain the queue - for (int i=0; i < maxThreads; i++) delete deqself[i].load(); - for (int i=0; i < maxThreads; i++) delete deqhelp[i].load(); - } - - - std::string className() { return "CRTurnQueue"; } - - - /** - * Steps when uncontended: - * 1. Add node to enqueuers[] - * 2. Insert node in tail.next using a CAS - * 3. Advance tail to tail.next - * 4. Remove node from enqueuers[] - * - * @param tid The tid must be a UNIQUE index for each thread, in the range 0 to maxThreads-1 - */ - void enqueue(T* item, const int tid) { - if (item == nullptr) throw std::invalid_argument("item can not be nullptr"); - Node* myNode = new Node(item,tid); - enqueuers[tid].store(myNode); - for (int i = 0; i < maxThreads; i++) { - if (enqueuers[tid].load() == nullptr) { - hp.clear(tid); - return; // Some thread did all the steps - } - Node* ltail = hp.protectPtr(kHpTail, tail.load(), tid); - if (ltail != tail.load()) continue; // If the tail advanced maxThreads times, then my node has been enqueued - if (enqueuers[ltail->enqTid].load() == ltail) { // Help a thread do step 4 - Node* tmp = ltail; - enqueuers[ltail->enqTid].compare_exchange_strong(tmp, nullptr); - } - for (int j = 1; j < maxThreads+1; j++) { // Help a thread do step 2 - Node* nodeToHelp = enqueuers[(j + ltail->enqTid) % maxThreads].load(); - if (nodeToHelp == nullptr) continue; - Node* nodenull = nullptr; - ltail->next.compare_exchange_strong(nodenull, nodeToHelp); - break; - } - Node* lnext = ltail->next.load(); - if (lnext != nullptr) tail.compare_exchange_strong(ltail, lnext); // Help a thread do step 3 - } - enqueuers[tid].store(nullptr, std::memory_order_release); // Do step 4, just in case it's not done - hp.clear(tid); - } - - - /** - * Steps when uncontended: - * 1. Publish request to dequeue in dequeuers[tid]; - * 2. CAS node->deqTid from IDX_START to tid; - * 3. Set dequeuers[tid] to the newly owned node; - * 4. Advance the head with casHead(); - * - * We must protect either head or tail with HP before doing the check for - * empty queue, otherwise we may get into retired-deleted-newed-reenqueued. - * - * @param tid: The tid must be a UNIQUE index for each thread, in the range 0 to maxThreads-1 - */ - T* dequeue(const int tid) { - Node* prReq = deqself[tid].load(); // Previous request - Node* myReq = deqhelp[tid].load(); - deqself[tid].store(myReq); // Step 1 - for (int i=0; i < maxThreads; i++) { - if (deqhelp[tid].load() != myReq) break; // No need for HP - Node* lhead = hp.protectPtr(kHpHead, head.load(), tid); - if (lhead != head.load()) continue; - if (lhead == tail.load()) { // Give up - deqself[tid].store(prReq); // Rollback request to dequeue - giveUp(myReq, tid); - if (deqhelp[tid].load() != myReq) { - deqself[tid].store(myReq, std::memory_order_relaxed); - break; - } - hp.clear(tid); - return nullptr; - } - Node* lnext = hp.protectPtr(kHpNext, lhead->next.load(), tid); - if (lhead != head.load()) continue; - if (searchNext(lhead, lnext) != IDX_NONE) casDeqAndHead(lhead, lnext, tid); - } - Node* myNode = deqhelp[tid].load(); - Node* lhead = hp.protectPtr(kHpHead, head.load(), tid); // Do step 4 if needed - if (lhead == head.load() && myNode == lhead->next.load()) head.compare_exchange_strong(lhead, myNode); - hp.clear(tid); - hp.retire(prReq, tid); - return myNode->item; - } - -}; - -/* - * GMT-friendly wrapper - */ -extern thread_local int qmpmc_tid; -extern std::atomic qmpmc_tcnt; - -typedef CRTurnQueue qmpmc_t_; -typedef qmpmc_t_ *qmpmc_t; - -static void qmpmc_assign_tid() { - qmpmc_tid = qmpmc_tcnt++; -} - -static inline void qmpmc_push(qmpmc_t *q, void *item) { - (*q)->enqueue(item, qmpmc_tid); -} - -static inline int qmpmc_pop(qmpmc_t *q, void **dst) { - void *p{nullptr}; - if(!(p = (*q)->dequeue(qmpmc_tid))) - return 0; - *dst = p; - return 1; -} - -static inline void qmpmc_push_n(qmpmc_t *q, void **item, uint32_t n) -{ - for(uint32_t i = 0; i < n; ++i) - qmpmc_push(q, item[i]); -} - -static inline int qmpmc_pop_n(qmpmc_t *q, void **item, uint32_t n) { - uint32_t i; - for(i = 0; i < n; ++i) - if(!qmpmc_pop(q, item + i)) - break; - return i; -} - -static void qmpmc_init(qmpmc_t *q, uint32_t) { - *q = new qmpmc_t_(); -} - -static void qmpmc_destroy(qmpmc_t *q) { - (*q)->~CRTurnQueue(); -} - -#endif //__CRTQUEUE_H__ diff --git a/include/gmt/queue.h b/include/gmt/queue.h index 2f30ae0..6736036 100644 --- a/include/gmt/queue.h +++ b/include/gmt/queue.h @@ -221,7 +221,7 @@ INLINE int64_t NAME##_guess_size (NAME##_t *q ) {\ return ( delta >= (int64_t)0 ) ? delta : (int64_t) (NEXT_POW2(SIZE) + delta);\ } -#ifndef VSIZE_MPMC + typedef struct qmpmc_t { int64_t writer_ticket __align(CACHE_LINE); int64_t volatile reader_ticket __align(CACHE_LINE); @@ -309,15 +309,6 @@ INLINE int qmpmc_pop_n(qmpmc_t * q, void **item, uint32_t n) return i; } -/* - * dummy function for API compatibility with CRT queue - */ -INLINE void qmpmc_assign_tid() { -} -#else -#include "crt_queue/crtqueue.hpp" -#endif - /*******************************************************************/ /* single producer single consumer */ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8c9ea34..c57c9ff 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -3,12 +3,6 @@ set(sources comm_server.c gmt_execute.c gmt_misc.c gmt_ucontext.c memory.c profiling.c utils.c config.c gmt_for.c helper.c mtask.c timing.c worker.c ) - -if(GMT_VSIZE_MPMC) -set(sources ${sources} crt_queue/crtqueue.cpp) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DVSIZE_MPMC") -endif() - set_source_files_properties(${sources} PROPERTIES LANGUAGE CXX ) find_package(Threads REQUIRED) diff --git a/src/comm_server.c b/src/comm_server.c index 86fa9a3..d10aeeb 100644 --- a/src/comm_server.c +++ b/src/comm_server.c @@ -177,9 +177,6 @@ INLINE void comm_server_test_recv() void *comm_server_loop(void *arg) { - /* initialize thread index for the MPMC queue */ - qmpmc_assign_tid(); - _unused(arg); if (config.thread_pinning) { pin_thread(config.num_cores - 1); diff --git a/src/crt_queue/crtqueue.cpp b/src/crt_queue/crtqueue.cpp deleted file mode 100644 index 8c291e5..0000000 --- a/src/crt_queue/crtqueue.cpp +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Global Memory and Threading (GMT) - * - * Copyright © 2018, Battelle Memorial Institute - * All rights reserved. - * - * Battelle Memorial Institute (hereinafter Battelle) hereby grants permission to - * any person or entity lawfully obtaining a copy of this software and associated - * documentation files (hereinafter “the Software”) to redistribute and use the - * Software in source and binary forms, with or without modification. Such - * person or entity may use, copy, modify, merge, publish, distribute, - * sublicense, and/or sell copies of the Software, and may permit others to do - * so, subject to the following conditions: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the name `Battelle Memorial Institute` or `Battelle` may be used in - * any form whatsoever without the express written consent of `Battelle`. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL `BATTELLE` OR CONTRIBUTORS BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON - * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -#include -#include -#include -#include - -thread_local int qmpmc_tid{-1}; -std::atomic qmpmc_tcnt{0}; diff --git a/src/helper.c b/src/helper.c index 5b0cf96..e129359 100644 --- a/src/helper.c +++ b/src/helper.c @@ -90,9 +90,6 @@ INLINE uint64_t helper_check_aggreg_timeout(uint32_t hid, uint64_t old_tick) void *helper_loop(void *arg) { - /* initialize thread index for the MPMC queue */ - qmpmc_assign_tid(); - uint32_t hid = (uint64_t) arg; if (config.thread_pinning) { diff --git a/src/mtask.c b/src/mtask.c index 0976444..f09777b 100644 --- a/src/mtask.c +++ b/src/mtask.c @@ -66,9 +66,6 @@ void mtm_init() i++) handleid_queue_push(&mtm.handleid_pool, i); - /* initialize thread index for the MPMC queue */ - qmpmc_assign_tid(); - uint32_t cnt = 0; for (i = 0; i < pool_size; i++) { mtm.mtasks[i].args = NULL; diff --git a/src/worker.c b/src/worker.c index 420d851..d431033 100644 --- a/src/worker.c +++ b/src/worker.c @@ -144,9 +144,6 @@ void worker_sigsegv_handler(int signum, siginfo_t * info, void *data) void *worker_loop(void *args) { - /* initialize thread index for the MPMC queue */ - qmpmc_assign_tid(); - uint32_t thread_id = get_thread_id(); uint32_t wid = (uint32_t) ((uint64_t) args); _assert(wid == thread_id);