C++ Threads warten lassen begrenzt durch atomic<int>

T_55

Lieutenant
Registriert
Feb. 2013
Beiträge
638
Hallo,

ich möchte nur eine bestimmte Anzahl an Threads zulassen an einer Stelle weiterzumachen, sodass die anderen solange warten sollen.
Habe das mit atomics und sleep umgesetzt, überlege aber ob das so ok ist. Ich vermute die erste Variante ist nicht ganz ok so, in der zweiten sollte es sicherer sein per lock_guard.

Variante 1: vermutlich nicht gut denn es könnte vielleicht zwischen der if-Abfrage und dem hochzählen der atomic ein anderer Thread in der Zeit durchrutschen...?
C++:
// global:
std::atomic<int> AktuelleAnzahlAktiverThreads = {0};
std::atomic<int> MaximaleAnzahlAktiverThreads = {1};

C++:
// in Thread:
while(1)
{
   if(AktuelleAnzahlAktiverThreads < MaximaleAnzahlAktiverThreads)
   {
      AktuelleAnzahlAktiverThreads++;
      break;
   }
   std::this_thread::sleep_for(std::chrono::milliseconds(50)); // CPU lover
}

// thread tut etwas...

// wenn fertig:
AktuelleAnzahlAktiverThreads--;


Variante 2: sicherer per lock_guard
C++:
// global:
std::atomic<int> AktuelleAnzahlAktiverThreads = {0};
std::atomic<int> MaximaleAnzahlAktiverThreads = {1};
std::mutex mtx;

C++:
// in Thread:
while(1)
{
   { // lock-scope
      std::lock_guard<std::mutex> lock(mtx);
      if(AktuelleAnzahlAktiverThreads < MaximaleAnzahlAktiverThreads)
      {
         AktuelleAnzahlAktiverThreads++;
         break;
      }
   } // unlock
   std::this_thread::sleep_for(std::chrono::milliseconds(50)); // CPU lover
}

// thread tut etwas...

// wenn fertig:
{ // lock-scope
   std::lock_guard<std::mutex> lock(mtx);
   AktuelleAnzahlAktiverThreads--;
} // unlock

Ist Variante 2 ok so mit lock_guard um die Dinge oder ist das zu trashig? Verbesserungsvorschläge immer gerne :)

Gruß
 
Schade, dass C++ keinen Semaphore-Typen hat, entspräche im Prinzip deiner zweiten Lösung + conditional variable.
 
@T_55 Deine beiden Lösungen haben mehrere Probleme. Manche davon hast du selber schon erkannt. Prinzipiell, wenn du sleep verwenden musst um CPU Auslastung zu verringern hast du praktisch immer etwas falsch gemacht. Sleep solltest du wirklich nur verwenden, wenn du einen zeitlichen Abstand zwischen zwei Operationen brauchst. Außerdem, wenn du busy waiting in deinem Code hast, hast du auch praktisch immer etwas falsch gemacht. Also wenn du in einer Endlosschleife wartest, bis irgend eine externe condition wahr wird.
Du hast ganz richtig erkannt, dass möglicherweise ein Thread durchrutschen könnte und hast das dann mit einem Mutex verhindert. Funktioniert, aber wenn du schon einen Mutex hast, brauchst du keine atomics mehr. Dir garantiert der Mutex ja schon mutual exclusion. Du hast also den overhead der atomics noch drinnen, obwohl du niemals eine race condition haben wirst.
Aber selbst deine zweite Lösung ohne atomics wäre nicht ideal, eben wegen dem sleep und dem busy waiting.

@new Account() Kannst du mir zeigen wie du compare_exchange hier einsetzen würdest? Ich komme damit auf keine saubere Lösung.

@F.b C++11 bietet alles was man braucht um dieses Problem sauber zu lösen. Stimmt, dass C++ keine Semaphore hat, aber dafür alles um sich selbst welche zu bauen. Und std::condition_variable gibt es ja seit C++11.

Hier meine Lösung für dieses Problem, ohne sleep (außer für simulierte Arbeit) und ohne busy waiting:
C++:
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

class WorkManager {
  public:
    WorkManager() : activeThreads(0) {}
    ~WorkManager()
    {
        // Make sure all threads have finished
        for (auto &worker : threads)
            worker.join();
    }

    void scheduleWork(std::size_t numWorker)
    {
        for (std::size_t i = 0; i < numWorker; ++i)
            // Use emplace to avoid having to move construct the thread
            threads.emplace_back(&WorkManager::workerThread, this);
    }

  private:
    void workerThread()
    {
        // Synchronize access to activeThreads and block until condition becomes true
        {
            std::unique_lock<std::mutex> guard(mtx);
            cv.wait(guard, [this]() { return activeThreads < MAX_ACTIVE_THREADS; });
            ++activeThreads;
            std::cout << activeThreads << "/" << MAX_ACTIVE_THREADS << " active threads" << std::endl;
        }

        // Do some work
        std::this_thread::sleep_for(std::chrono::seconds(5));

        // Synchronize access to activeThreads
        std::unique_lock<std::mutex> guard(mtx);
        --activeThreads;
        cv.notify_one(); // Notify one thread that may be waiting for its turn
    }

    static constexpr auto MAX_ACTIVE_THREADS = 4;

    std::condition_variable cv;
    std::mutex mtx;
    std::size_t activeThreads;
    std::vector<std::thread> threads;
};

int main()
{
    WorkManager workMan;
    workMan.scheduleWork(8);

    return 0;
}

Gruß
BlackMark
 
  • Gefällt mir
Reaktionen: T_55
BlackMark schrieb:
@new Account() Kannst du mir zeigen wie du compare_exchange hier einsetzen würdest? Ich komme damit auf keine saubere Lösung.
Für diesen Part:
T_55 schrieb:
if(AktuelleAnzahlAktiverThreads < MaximaleAnzahlAktiverThreads) { AktuelleAnzahlAktiverThreads++; break; }
Die aktuelle Anzahl auslesen. Dann vergleichen und, falls true unter der Annahme, dass die Anzahl noch stimmt, inkrementieren (mit dem compare_exchange). So lange probieren bis es klappt (Kollisionen treten nur auf, wenn die Threads exakt gleichzeitig fertig werden).

Vorteil: keine locks, d.h. keine Synchronisierung

Ich hoffe ich habe keinen Denkfehler drin ;)
 
@new Account() Ich hab mal versucht das so zu implementieren, wie ich das verstanden habe:
C++:
#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>
#include <vector>

std::atomic<int> g_activeThreads(0);
static constexpr auto MAX_THREADS = 32;

void worker()
{
    while (true) {
        int activeThreads = g_activeThreads.load();
        if (activeThreads < MAX_THREADS) {
            while (!g_activeThreads.compare_exchange_strong(activeThreads, activeThreads + 1))
                ;

            break;
        }
    }

    int activeThreads = g_activeThreads.load();
    if (activeThreads > MAX_THREADS) {
        std::cout << "ERROR: " << activeThreads << "/" << MAX_THREADS << " active threads!" << std::endl;
    }

    std::this_thread::sleep_for(std::chrono::seconds(1));

    --g_activeThreads;
}

int main()
{
    std::vector<std::thread> threads;

    std::cout << "Active threads before: " << g_activeThreads << std::endl;

    for (int i = 0; i < 128; ++i) {
        threads.emplace_back(&worker);
    }

    for (std::thread &workThread : threads) {
        workThread.join();
    }

    std::cout << "Active threads after: " << g_activeThreads << std::endl;

    return 0;
}

Das funktioniert bei mir am System nicht. Es schaffen einige Threads durch, die eigentlich nicht laufen sollten. Also es laufen bis zu 37 von 32 Threads, was nicht sein sollte. Dafür ist der Counter am Ende immer 0, also es passiert kein read-modify-write Fehler.

Falls du das anders gemeint hast oder ich einen Fehler gemacht habe, besser es bitte aus.

Gruß
BlackMark
 
@BlackMark ja, man kann es sich selber bauen, ich hatte allerdings das Beispiel C# vor Augen, wo es (nur mit .NET?) Monitore und gleich zwei Semaphore-Implementationen gibt. Nicht falsch verstehen, ich verdien' mit C++ derzeit meine Brötchen, aber nicht alles sollte selber implementiert werden müssen, besonders wenn es solche Standardkomponenten sind.
 
@F.b Das ist jetzt eben der Unterschied in der Philosophie zwischen C++ und anderen Sprachen. Bis es ein Feature in die C++ Standard Library schafft muss es viele Hürden bestehen. Semaphore haben es scheinbar bis jetzt noch nicht geschafft, aber es hat auch bis C++17 gedauert, bis wir endlich std::filesystem bekommen haben, was praktisch alle anderen Sprachen schon immer hatten. Aber die Hoffnung nicht aufgeben, vielleicht kommen Semaphore in C++20: Efficient concurrent waiting for C++20

Gruß
BlackMark
 
Code:
            while (!g_activeThreads.compare_exchange_strong(activeThreads, activeThreads + 1))
                ;

            break;
Hier eigentlich ein if(erfolgreich)break;
Sonst versuchst du immer die gleiche Zahl zu schreiben.

Trotzdem komisch, dass da mehr Threads durchlaufen. Dürfte eigentlich nicht sein.
 
new Account() schrieb:
Die aktuelle Anzahl auslesen. Dann vergleichen und, falls true unter der Annahme, dass die Anzahl noch stimmt, inkrementieren (mit dem compare_exchange). So lange probieren bis es klappt (Kollisionen treten nur auf, wenn die Threads exakt gleichzeitig fertig werden).
Ok, ich habe den Teil so verstanden, dass man das compare_exchange alleine so lange probieren soll, bis es funktioniert hat.

C++:
while (true) {
    int activeThreads = g_activeThreads.load();
    if (activeThreads < MAX_THREADS) {
        if (g_activeThreads.compare_exchange_strong(activeThreads, activeThreads + 1))
            break;
    }
}
So funktioniert es jetzt. Es fallen keine Threads mehr durch, die nicht durch sollen. Macht auch viel mehr Sinn. So wie es vorher war bestand das Potential auf ein Deadlock, wenn die while Schleife auf eine bestimmte anzahl aktiver Threads wartet, die nicht mehr auftreten wird, weil die Threads schon beim weniger werden sind.

Trotzdem, das ist immer noch busy waiting für alle Threads die über dem Limit liegen. Hat also genau die gleichen Probleme wie die Lösungen vom TE. Busy waiting ist keine saubere Lösung.

Gruß
BlackMark
 
Bei der Variante per condition_variable ist mir aufgefallen, dass sogar die Reihenfolge eingehalten wird. Wer am längsten blockierend wartet kommt bei notify_one() als erstes dran. Gefällt mir richtig gut. Gibts da intern eine queue?

Und nur der Vollständigkeit halber das Prädikat bei wait x<y sollte gegen "spurious wakeup" und "lost wakeup" schützen, richtig?
 
T_55 schrieb:
Wer am längsten blockierend wartet kommt bei notify_one() als erstes dran.
https://en.cppreference.com/w/cpp/thread/condition_variable/notify_one schrieb:
If any threads are waiting on *this, calling notify_one unblocks one of the waiting threads.
Laut Standard muss es nicht der am längsten wartende Thread sein. Möglicherweise verwendet deine Standard Library Implementierung eine Queue, aber auf dieses Verhalten darfst du dich nicht verlassen, der Standard schreibt nichts in die Richtung vor.

T_55 schrieb:
Und nur der Vollständigkeit halber das Prädikat bei wait x<y sollte gegen "spurious wakeup" und "lost wakeup" schützen, richtig?
Das predicate brauchst du aus zwei Gründen. Zum einen ist das deine Bedingung, dass nicht mehr als die maximale Anzahl an Threads laufen dürfen. Das checkt wait für dich, dann musst du es nicht selber machen. Zusätzlich schützt es vor spurious wakeups, denn wait returned erst wenn das predicate true ist. Ohne predicate könnte es wegen einem spurious wakeup returnen, ohne ein notify bekommen zu haben. Lost wakeups gibt es keine, bzw. die wären dann deine Schuld. Der Standard sagt, dass es spurious wakeups geben darf, aber lost wakeups darf es nicht geben.
Steht aber alles ganz genau in der Doku: std::condition_variable::wait

Gruß
BlackMark
 
  • Gefällt mir
Reaktionen: T_55
Zurück
Oben