Hey,
ich bin gerade dabei mein System um Abfragen an die Twitch Api zu senden zu beschleunigen. Aktuell wird jede Anfrage in eine BlockingQueue gelegt und darauf gewartet das diese verarbeitet wird. Ein Consumer nimmt die dann aus der Queue und verarbeitet die.
Das ganze funktioniert, nur ist es leider doch sehr langsam. So bekomme ich maximal 1 Request pro Sekunde durch anstelle der dauerhaft möglichen 13 bzw kurzzeitig 26 pro Sekunde.
Deswegen dachte ich mir einfach mehr Consumer einzusetzen da ich die Requests zwar nicht beschleunigen kann, dafür mehrere parallel bearbeiten könnte.
Dabei kommen aber einige Probleme auf, wo ich nicht ganz weiß wie ich das lösen kann.
) Das erste wäre das Request Limit: Meine Idee: Wenn einer der n Consumer merkt das das Limit fast erreicht ist, sollen die Ausführung von allen Consumern pausiert werden. Einer der Consumer wird als 'master' nicht pausiert, wartet aber ~1 Sekunde, führt den nächsten Request aus und wenn das Limit wieder passt entsperrt alle anderen Consumer; wenn nicht dann wird wieder ~1 Sekunde gewartet usw.
Nur wie sperre und entsperre ich am besten verschiedene Prozesse (oder lasse sie an bestimmten stellen warten)?
) Sollte der Key welcher für die Requests genutzt wird ungültig sein, wird aktuell ein neuer erstellt. Wie mache ich das am besten, dass wenn mehreren Consumern auffällt, dass der Key ungültig ist, nicht mehrfach neue erstellt werden?
Aktuell sieht das ganze noch so aus:
ich bin gerade dabei mein System um Abfragen an die Twitch Api zu senden zu beschleunigen. Aktuell wird jede Anfrage in eine BlockingQueue gelegt und darauf gewartet das diese verarbeitet wird. Ein Consumer nimmt die dann aus der Queue und verarbeitet die.
Das ganze funktioniert, nur ist es leider doch sehr langsam. So bekomme ich maximal 1 Request pro Sekunde durch anstelle der dauerhaft möglichen 13 bzw kurzzeitig 26 pro Sekunde.
Deswegen dachte ich mir einfach mehr Consumer einzusetzen da ich die Requests zwar nicht beschleunigen kann, dafür mehrere parallel bearbeiten könnte.
Dabei kommen aber einige Probleme auf, wo ich nicht ganz weiß wie ich das lösen kann.
) Das erste wäre das Request Limit: Meine Idee: Wenn einer der n Consumer merkt das das Limit fast erreicht ist, sollen die Ausführung von allen Consumern pausiert werden. Einer der Consumer wird als 'master' nicht pausiert, wartet aber ~1 Sekunde, führt den nächsten Request aus und wenn das Limit wieder passt entsperrt alle anderen Consumer; wenn nicht dann wird wieder ~1 Sekunde gewartet usw.
Nur wie sperre und entsperre ich am besten verschiedene Prozesse (oder lasse sie an bestimmten stellen warten)?
) Sollte der Key welcher für die Requests genutzt wird ungültig sein, wird aktuell ein neuer erstellt. Wie mache ich das am besten, dass wenn mehreren Consumern auffällt, dass der Key ungültig ist, nicht mehrfach neue erstellt werden?
Aktuell sieht das ganze noch so aus:
Code:
public class TwitchWorker implements Runnable{
// api
private int ratelimitremaining = 800; // placeholder
// shared. prop.
private static BlockingQueue<TwitchRequest> requestqueue;
private static TwitchKey twitchKey;
public TwitchWorker(){
if(requestqueue == null || twitchKey == null){
// get queue
requestqueue = new TwitchWrap().getTasks();
// get key
twitchKey = new TwitchKey();
}
}
@Override
public void run() {
try{
boolean force = false;
// prepare
while(true){
if(ratelimitremaining > 15 || force){
force = false;
TwitchRequest twitchRequest = requestqueue.take();
process(twitchRequest);
}else{
// Wait for 1 seconds (should refill ~13 requests)
TimeUnit.SECONDS.sleep(1);
force = true;
}
}
}catch (Exception e){
e.printStackTrace();
}
}
private void process(TwitchRequest twitchRequest){
try{
if(!twitchKey.isvalid()){ // if the key is not valid - if this dont exit our application here, it should be valid or not
twitchKey.update(); // try updating it - if this dont exit our application here, it could be renewed successfully
}
// get values from api
URL url = new URL(twitchRequest.getRequest());
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setRequestMethod("GET");
con.setRequestProperty("Authorization", "Bearer "+twitchKey.getToken());
ratelimitremaining = Integer.parseInt(con.getHeaderField("ratelimit-remaining"));
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String inputLine;
StringBuilder content = new StringBuilder();
while ((inputLine = in.readLine()) != null) {
content.append(inputLine);
}
in.close();
con.disconnect();
// set json as result
twitchRequest.setResult(new JSONObject(content.toString()));
// set finished
twitchRequest.setFinished();
}catch (Exception e){
twitchRequest.setResult(new JSONObject("{}"));
twitchRequest.setFinished();
}
}