Mehrere Consumer an BlockingQueue

Sp4rky

Cadet 4th Year
Registriert
März 2019
Beiträge
77
Hey,
ich bin gerade dabei mein System um Abfragen an die Twitch Api zu senden zu beschleunigen. Aktuell wird jede Anfrage in eine BlockingQueue gelegt und darauf gewartet das diese verarbeitet wird. Ein Consumer nimmt die dann aus der Queue und verarbeitet die.
Das ganze funktioniert, nur ist es leider doch sehr langsam. So bekomme ich maximal 1 Request pro Sekunde durch anstelle der dauerhaft möglichen 13 bzw kurzzeitig 26 pro Sekunde.
Deswegen dachte ich mir einfach mehr Consumer einzusetzen da ich die Requests zwar nicht beschleunigen kann, dafür mehrere parallel bearbeiten könnte.

Dabei kommen aber einige Probleme auf, wo ich nicht ganz weiß wie ich das lösen kann.
) Das erste wäre das Request Limit: Meine Idee: Wenn einer der n Consumer merkt das das Limit fast erreicht ist, sollen die Ausführung von allen Consumern pausiert werden. Einer der Consumer wird als 'master' nicht pausiert, wartet aber ~1 Sekunde, führt den nächsten Request aus und wenn das Limit wieder passt entsperrt alle anderen Consumer; wenn nicht dann wird wieder ~1 Sekunde gewartet usw.
Nur wie sperre und entsperre ich am besten verschiedene Prozesse (oder lasse sie an bestimmten stellen warten)?
) Sollte der Key welcher für die Requests genutzt wird ungültig sein, wird aktuell ein neuer erstellt. Wie mache ich das am besten, dass wenn mehreren Consumern auffällt, dass der Key ungültig ist, nicht mehrfach neue erstellt werden?

Aktuell sieht das ganze noch so aus:
Code:
public class TwitchWorker implements Runnable{

    // api
    private int ratelimitremaining = 800; // placeholder
    // shared. prop.
    private static BlockingQueue<TwitchRequest> requestqueue;
    private static TwitchKey twitchKey;


    public TwitchWorker(){
        if(requestqueue == null || twitchKey == null){
            // get queue
            requestqueue = new TwitchWrap().getTasks();
            // get key
            twitchKey = new TwitchKey();
        }
    }

    @Override
    public void run() {
        try{
            boolean force = false;
            // prepare
            while(true){

                if(ratelimitremaining > 15 || force){
                    force = false;
                    TwitchRequest twitchRequest = requestqueue.take();
                    process(twitchRequest);
                }else{
                    // Wait for 1 seconds (should refill ~13 requests)
                    TimeUnit.SECONDS.sleep(1);
                    force = true;
                }

            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    private void process(TwitchRequest twitchRequest){
        try{
            if(!twitchKey.isvalid()){ // if the key is not valid - if this dont exit our application here, it should be valid or not
                twitchKey.update(); // try updating it - if this dont exit our application here, it could be renewed successfully
            }
            // get values from api
            URL url = new URL(twitchRequest.getRequest());
            HttpURLConnection con = (HttpURLConnection) url.openConnection();
            con.setRequestMethod("GET");
            con.setRequestProperty("Authorization", "Bearer "+twitchKey.getToken());

            ratelimitremaining = Integer.parseInt(con.getHeaderField("ratelimit-remaining"));

            BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
            String inputLine;
            StringBuilder content = new StringBuilder();
            while ((inputLine = in.readLine()) != null) {
                content.append(inputLine);
            }
            in.close();
            con.disconnect();
            // set json as result
            twitchRequest.setResult(new JSONObject(content.toString()));
            // set finished
            twitchRequest.setFinished();
        }catch (Exception e){
            twitchRequest.setResult(new JSONObject("{}"));
            twitchRequest.setFinished();
        }
    }
 
Ohne jetzt Ahnung von Java oder der Twitch-API zu haben, aber ich würde die Ausführung des Tasks einem child-Prozessen/threads übergeben und die Warteschlange als auch die Rückgabewerte vom Hauptprogramm (oder anderen Child-Threads) verarbeiten lassen. Das Hauptprogramm prüft dann beispielsweise sekündlich die Gültigkeit des API-Keys, das ratelimitremaining usw. Im Hauptprogramm kannst du dann wunderbar kontrollieren, wie dein ratelimit aussieht und entsprechend weniger anfragen senden oder eben mehr.
 
  • Gefällt mir
Reaktionen: Sp4rky
Das könnte tatsächlich einfacher umzusetzen sein, nur weiß ich nicht ob das auch gut laufen würde.

Von der API bekomme ich mit jedem Request mit wie das Limit aussieht, dass ohne Request abzufragen geht leider nicht. Ist kein großes Problem, melden sich halt die Threads zurück wenn was nicht ok ist, kommt da ja nicht auf die Reihenfolge an.
Das mit dem Key ist ein größeres Problem, der sollte vor jeder Anfrage geprüft werden damit diese nicht verschwindet. Wenn ich das an die Child Tasks auslagere könnten ja theoretisch mehrere sagen das der Key ungültig ist und würden alle einen neuen anfordern (und ich müsste alle irgendwie warten lassen bis ein neuer erstellt wurde).
Dann müsste zudem auch noch mitgezählt werden wie viele Childs gerade aktiv sind, also müssten alle beim beenden irgendwo wieder runter zählen, wobei ich meine das sowas bei mehreren Threads Probleme machen würde.
 
Atomicinteger

Requestlimit bestimmen allen workern ein limit geben Limit / Workeranzahl.

Wenn einer das Limit aufgebraucht hat guckt der wie hoch das limit gerade ist setzt den atomicinteger oder atomicboolean für alle neu und das Spiel geht von vorne los.
 
  • Gefällt mir
Reaktionen: Sp4rky
Wenn ich so ein boolean setzen würde müsste ich den ja irgendwie in nem Loop abfragen um zu kucken falls dieser aktiv ist oder nicht. Ich würde denken da gibt es eine schönere Lösung so im wait-notify style nur halt das es sich so deaktivieren lässt das es eben bei Bedarf nur blockiert. Aber das Verhalten lässt sich ja auch über nen if und nen bool noch setzen, wenn es da nichts besseres gibt.

// Edit
Bevor ich noch weiter rätsel wie es gehen könnte versuch ich das mal in klein zu bauen, ist vermutlich am besten zum kucken ob es funktioniert 😄
 
While schleife den atomicinteger abfragen ob er größer 1 ist wenn ja um eins reduzieren und request machen

Den api token packst du in einen Atomicreference als string jeder kann lesen und schreiben

Atomics sind genau dafür da. Man muss für simple Dinge keine komplexen Lösungen bauen. :D
 
Okay, ich hoffe mal das das so problemlos funktioniert:
Code:
private int minratelimitremaining = 50;

    private static BlockingQueue<TwitchRequest> requestqueue;
    private static TwitchKey twitchKey;
    private boolean ismaster;
    private String id;

    private static AtomicBoolean keylock= new AtomicBoolean(false);
    private static AtomicBoolean ratelimitlock = new AtomicBoolean(false);


    public TwitchWorker(boolean ismaster, String id){
        if(requestqueue == null || twitchKey == null){
            // get queue
            requestqueue = new TwitchWrap().getTasks();
            // get key
            twitchKey = new TwitchKey();
        }
        this.ismaster = ismaster;
        this.id = id;
    }

    @Override
    public void run() {
        while(true){

            try{

                TwitchRequest twitchRequest = requestqueue.take();

                checkKey();
                while((keylock.get() || ratelimitlock.get()) && !ismaster){TimeUnit.SECONDS.sleep(1);}
                process(twitchRequest);

                if(ismaster && ratelimitlock.get()){
                    TimeUnit.MILLISECONDS.sleep(new Random().nextInt(500)+1000);
                }


            }catch (Exception e){
                e.printStackTrace();
            }

        }

    }

    private void checkKey(){
        if(!twitchKey.isvalid()){ // if the key is not valid - if this dont exit our application here, it should be valid or not
            lock(1);
            if(ismaster){
                twitchKey.update(); // try updating it - if this dont exit our application here, it could be renewed successfully
                unlock(1);
            }
        }
    }

    private void process(TwitchRequest twitchRequest){
        try{
            // get values from api
            URL url = new URL(twitchRequest.getRequest());
            HttpURLConnection con = (HttpURLConnection) url.openConnection();
            con.setRequestMethod("GET");
            con.setRequestProperty("Authorization", "Bearer "+twitchKey.getToken());

            if((Integer.parseInt(con.getHeaderField("ratelimit-remaining")) < minratelimitremaining) && !ratelimitlock.get()){
                lock(0);
            }
            if((Integer.parseInt(con.getHeaderField("ratelimit-remaining")) > minratelimitremaining*2) && ismaster && ratelimitlock.get()){
                unlock(0);
            }

            BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
            String inputLine;
            StringBuilder content = new StringBuilder();
            while ((inputLine = in.readLine()) != null) {
                content.append(inputLine);
            }
            in.close();
            con.disconnect();
            //parse json to new hashmap
            twitchRequest.setResult(new JSONObject(content.toString()));
            // set finished
            twitchRequest.setFinished();
        }catch (Exception e){
            twitchRequest.setResult(new JSONObject("{}"));
            twitchRequest.setFinished();
        }
    }


    public String nextestkeychange(){
        return new java.util.Date(((twitchKey.isvaliduntil()-86400)*1000))+"";
    }

    private void lock(int type){
        switch(type){
            case 0:
                ratelimitlock.set(true);
                break;
            case 1:
                keylock.set(true);
                break;
        }
    }

    private void unlock(int type){
        switch(type){
            case 0:
                ratelimitlock.set(false);
                break;
            case 1:
                keylock.set(false);
                break;
        }
    }
}

Das müsste es mir ermöglichen an einer Stelle zu warten, wenn der Key ungültig ist, und/oder das rate limit knapp wird. * hofft das es funktioniert *
Die Schleife zum warten mit einem sleep zu füllen finde ich persönlich nicht so schön, im schlechtesten Fall warte ich so 1 Sekunde länger als nötig; aber es ist immer noch besser als tausendfach pro Sekunde zu überprüfen ob es weiter geht.
Ich teste das mal aus und kuck wie es läuft :)
// edit
also das mit dem rate limit läuft schon mal ;)
ist es eigentlich besser die Threads davon als Deamons laufen zu lassen?
 
Zuletzt bearbeitet:
Threads immer als Deamon sonst kannst du die JVM nichtmehr beenden wenn die laufen.

Bitte guck dir dann mal an wie man ordentlichen CamelCase schreibt, dein Code ist schwer zu lesen, das ist mir letzte mal schon aufgefallen.

Threads kannst du problemlos schlafen lassen. 1 Sekunde ist aber in CPU Zeit sehr lang. Das kannste ruhig auf 200 ms setzen. Ansonsten immer Random Times einführen damit du verhinderst das alle gleichzeitig aufwachen wollen also random zwischen zwischen 190 und 210 ms beim Starten für jeden Thread berechnen.

Ansonsten um Threads zu sycronisieren kannst du Semaphoren benutzen.

Allg würde ich mal empfehlen das du dir anschaust was im Java concurrent package liegt. Alle deine Fragen wurden dort schon gelöst :)

Ansonsten TwitchKey muss in eine Atomicreference da das was du da baust nicht threadsafe ist. Also sofern TwitchKey nicht safe ist. Ansonsten wird das laufen.
 
  • Gefällt mir
Reaktionen: Sp4rky
Ich hätte jetzt auch an Semaphoren gedacht. Die kann man ja zB mit 13 initialisieren und der 14. Thread muss dann eben warten bis einer der ersten 13 Threads fertig ist und die Semaphore wieder einen hochzählt.
 
Wenn du eine semaphore einsetzen willst, Binde sie auf keinen Fall an deine Thread Zahl, sondern nimm eine TimedSemaphore die soviel output pro Sekunde zulässt wie es nötig ist...
Ergänzung ()

Ansonsten schau dir mal den Apache http Client an, dem kann man mitgeben wie viele requests er pro Sekunde abfeuern darf & vor allem wie er sich bei dem http Status Code 429 verhalten soll
 
Wie wäre es mit Delegation:
* ZeroMQ
** Nach actor pattern thread verwaltung
** Bietet viele patterns (messaging Kontext) um solche Probleme zu lösen
* ZeroMQ kann lokal oder verteilt arbeiten, also Richtung services kann man eine externe task queue nutzen (zb celery oder tasktiger)
** Schiebe Task samt Kontext in die queue
** Queue kümmer sich um Konkurrenz, retries bzw Fehlerbehandlung
** Hier sowie im zeromq Fall wichtiger Punkt timeouts pro task oder granularer message
 
Zurück
Oben