Datenbanksystem für mind. 100k Inserts / Updates pro Sekunde

Falc410

Vice Admiral
Registriert
Juni 2006
Beiträge
6.902
Wahrscheinlich das falsche Forum aber auch mit Google, Stackoverflow etc komme ich gerade nicht viel weiter.

Ich habe große (5-10GB) CSV Dateien die ich einlesen und in einer Datenbank ablegen möchte. Hierfür habe ich mir mal ein Python Script geschrieben als Proof-of-Concept mit Django. Ich muss ungefähr 60.000 Zeilen pro Sekunde verarbeiten können.
Bei jeder Zeile passiert zuerst ein Query ob das Object schon in der Datenbank ist, wenn nicht, dann wird es angelegt (Insert), andernfalls wird es geladen, die Werte werden erhöht und das Objekt mit einem Update wieder zurück. Pro Zeile befinden sich 2 Objekte, d.h. ich habe insgesamt 4 Datenbankoperationen pro Zeile - somit wäre ich sogar eher bei 200k Inserts / Updates pro Sekunde.

Nun habe ich aber keine Ahnung mit was ich das am Schluss umsetzen soll. Insgesamt rechne ich später mit max 250.000.000 Objekten in der Datenbank.

Zählt das schon als Big Data? :) Wahrscheinlich nicht. Wie dem auch sei, ich hätte gerne etwas was ich über mein Python Script (multi-threaded) befüllen kann. Mit MySQL hab ja schon Probleme mit Locking etc. Hatte auch an Sachen wie InfluxDB gedacht. Jemand eine Idee?
 
Als Teilansatz:
Sammel neu anzulegende erstmal im Speicher in einem Array und schreib die paketweise per Bulk Insert weg, die meisten DBMS unterstützen irgendwas in der Art. Das ist deutlich schneller als einzelne Insert Statements.

Ob es auch Bulk Updates gibt, keine Ahnung.

Google mal Dein DBMS + Bulk Insert oder fast Insert.

Edit: Bei Oracle gibt es zB verschiedenste Ansätze für Massen-Updates:
http://www.orafaq.com/node/2450
https://asktom.oracle.com/pls/asktom/f?p=100:11:0:🇳🇴:P11_QUESTION_ID:6407993912330
 
Zuletzt bearbeitet:
Genau die Idee hatte ich auch gerade. Nachdem die Objekte wiederholt vorkommen werde ich erstmal ein Dictionary bzw. HashMap bauen und hab dann deutlich weniger inserts.
 
Falc410 schrieb:
Bei jeder Zeile passiert zuerst ein Query ob das Object schon in der Datenbank ist, wenn nicht, dann wird es angelegt (Insert), andernfalls wird es geladen, die Werte werden erhöht und das Objekt mit einem Update wieder zurück.
Das ginge auch einfacher mit INSERT … ON DUPLICATE KEY UPDATE. Dann wird das alles was du machen willst direkt im DBMS gehandelt und die Performance Optimierung läuft dann auch direkt dort.
Braucht natürlich einen Index auf der Tabelle aber das sollte ja das kleinste Problem sein.
Siehe auch hier: https://stackoverflow.com/questions/1361340/how-to-insert-if-not-exists-in-mysql

Falls du PostgreSQL verwendest ist die Syntax ein wenig anders:
https://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql

Wenn du eine ganze Zeile mit neuen Werten ersetzen willst kannst du auch REPLACE verwenden. Wenn die Zeile noch nicht existiert, wird einfach intern ein INSERT ausgeführt.
/edit zu REPLACE
Dazu auch mal das hier durchlesen:
https://stackoverflow.com/questions/548541/insert-ignore-vs-insert-on-duplicate-key-update

Insbesondere diesen Punkt:
If you use REPLACE, MySQL actually does a DELETE followed by an INSERT internally, which has some unexpected side effects:

  • A new auto-increment ID is allocated.
  • Dependent rows with foreign keys may be deleted (if you use cascading foreign keys) or else prevent the REPLACE.
  • Triggers that fire on DELETE are executed unnecessarily.
  • Side effects are propagated to replication slaves too.

Je nachdem, ob das in deinem Fall eben relevant ist - wir kennen ja die Datensätze nicht.
 
Zuletzt bearbeitet:
  • Gefällt mir
Reaktionen: BeBur, mambokurt und Falc410
Falc410 schrieb:
Wahrscheinlich das falsche Forum aber auch mit Google, Stackoverflow etc komme ich gerade nicht viel weiter.

Ich habe große (5-10GB) CSV Dateien die ich einlesen und in einer Datenbank ablegen möchte. Hierfür habe ich mir mal ein Python Script geschrieben als Proof-of-Concept mit Django. Ich muss ungefähr 60.000 Zeilen pro Sekunde verarbeiten können.
Bei jeder Zeile passiert zuerst ein Query ob das Object schon in der Datenbank ist, wenn nicht, dann wird es angelegt (Insert), andernfalls wird es geladen, die Werte werden erhöht und das Objekt mit einem Update wieder zurück. Pro Zeile befinden sich 2 Objekte, d.h. ich habe insgesamt 4 Datenbankoperationen pro Zeile - somit wäre ich sogar eher bei 200k Inserts / Updates pro Sekunde.

Nun habe ich aber keine Ahnung mit was ich das am Schluss umsetzen soll. Insgesamt rechne ich später mit max 250.000.000 Objekten in der Datenbank.

Zählt das schon als Big Data? :) Wahrscheinlich nicht. Wie dem auch sei, ich hätte gerne etwas was ich über mein Python Script (multi-threaded) befüllen kann. Mit MySQL hab ja schon Probleme mit Locking etc. Hatte auch an Sachen wie InfluxDB gedacht. Jemand eine Idee?

Was dir das Genick bricht wird eher das Auslesen jedesmal sein, während du da nachfragst 'gibts das schon wenn ja gib mal' dreht Python ja Däumchen. Von daher +1 für Mihawk90, on duplicate key update ist da schon das Richtige (je nach deinem DBMS halt mit anderer Syntax).

Was dir sonst noch das Genick brechen könnte ist die Latenz im Netzwerk und die Einstellungen bei der Datenbank direkt(Cache, Logging und solche Scherze), das sieht man aber erst wenn es soweit ist. Je nachdem wie posh das werden soll kannst du evtl einfach das Script direkt auf dem Datenbankserver laufen lassen, da fällt auf alle Fälle schonmal die Latenz unter den Tisch (aber das kommt halt aufs Projekt an ob das akzeptiert wird oder nicht).

Deine Beschreibung an sich ist a weng schwammig, von daher lässt sich da schlecht mehr zu sagen.
Ich würde jetzt zum Beispiel grob überlegen: -meine Datei hat 10 GB, soundsoviele Einträge, -> wie groß ist ein Eintrag? Wie viel MB/s muss ich wegschreiben um 200.000k Einträge wegzuschreiben? Das wäre ja dann so ziemlich der Best Case, den Durchsatz musst du quasi MINIMUM erreichen, und wenn du da auf 200MB/s kommst wird dein Netzwerk nicht reichen und wenn dein DBMS auf ner ollen SATA Platte läuft wirds da schon allein vom Schreiben her krachen. Weisst schon wie ich meine, einfach mal Pi mal Daumen gucken ob deine Hardware das überhaupt mitmacht eh du da groß Energie reinsteckst.
 
Frage ist ob das ganze betrieblich und wie häufig genutzt wird, dann würde sich z.B. Auch eine InMemory DB anbieten. Das ist schon ein großer Sprung zu den traditionellen. In manchen DBMS kann man auch einstellen das eine ganze Tabelle im RAM vorliegt. Wie mein Vorschreiber schon schrieb sind auch die technischen Gegebenheiten zu beachten.
 
Ja, ich würde es gerne als InMemoryDB betreiben. Das kann alles auf dem selben Host ausgeführt werden.

Hab jetzt das von Drexel umgesetzt und baue mir erst eine Hashmap auf und siehe da: Es sind schon deutlich weniger Objekte und jetzt kann ich vielleicht auch einfacher mit Bulk Updates arbeiten.

Außerdem habe ich die CSV Dateien bearbeitet und unnötige Columns weggeworfen (da war viel Overhead drin) und hab auch hier die Dateigröße um den Faktor 10 geschrumpft! Sind jetzt nur noch 1GB groß. Das muss dann aber alles in eine Datenstruktur kommen.

Die Datenstruktur ist eigentlich ganz simpel: IP Adressen und jede IP hat eine Liste von Ports (und diese wiederrum Werte für ein- und ausgehende Bytes).
Jetzt habe ich eine Hashmap mit nested Dictionaries für IPs (Key1) und Ports (Key2).

Wenn die IP noch nicht in der Datenbank existiert, kann ich alles auf einmal anlegen, wenn die aber schon drin ist, dann muss ich entweder neue Ports hinzufügen oder die Bytes addieren zu bestehenden Ports.

Wie genau ich das Update / Insert dann noch besser in ein Bulk packen kann, muss ich mal schauen. Sollte aber die Anzahl schon deutlich reduzieren.
 
Du kannst alle Adressen auslesen und als Basis für deine Map nutzen. alles was neu angelegt wird, wird separat gespeichert. Manche Datenbanken haben auch einen Modify Befehl (Insert/Update in einem, wurde oben aber auch schon angesprochen).

aber ob das jetzt eine gute Lösung ist ... dazu müsste man mehr über die generelle Verarbeitung, Struktur und Datenmenge wissen.

eine Möglichkeit wäre auch die Datei sortiert einzulesen und Blockweise zu verarbeiten. Oder sogar vom Quellsystem direkt ins Zielsystem schreiben zu lassen.
 
Falls es sich hier um Log Daten handelt, dann werfe ich mal noch Logstash im Raum. Aber nNur falls es doch mal irgendwann grosser wird das Projekt. Vor allem wenn man auch weitere Verarbeitungen machen will, dann bietet sich ein ELK Stack an.
 
Das sind Verbindungsdaten in Echtzeit. Deswegen wollte ich das zuerst Zeilenweise einlesen in Echtzeit. Aber da ich die Verbindungen vorher noch zusammenbauen muss, geht das nicht. Deswegen 5 Minuten Chunks die aber eben auch in 5 Minuten verarbeitet sein müssen, dann kommt der nächste Chunk.
Vorher alles auslesen wäre theoretisch möglich. Ich rechne so mit max 500.000 Einträgen.

Logstash hatte ich auch überlegt, die Frage ist ob das Performanz genug ist. Ich weiss das man die Nodes skalieren kann aber das hilft eher bei komplexen Abfragen und nicht bei Inserts oder?
 
Wieso gibst du Datenbanken als Lösung vor?
Ich denke, dass http://www.cplusplus.com/reference/unordered_map/unordered_map/ aus performance-Sicht nicht zu schlagen sein werden.

Außerdem: Bequeme performane erreicht man ja auch über parallelität. Und damit meine ich nicht Threads oder Prozesse sondern zur Not sogar PCs. Also zB VMs in der Cloud. Wenn alle 5 Minunten 100 VMs jeweils nur 1% der 5min Chunks verarbeiten müssen wird das alles erheblich einfacher.
 
Als In-Memory Datenbank kann ich dir Redis empfehlen. Je nach Komplexität der Operationen erreicht man da auf 0815 Rechnern eine halbe Million op/Sek. Da ist auch ein kleines Benchmark Tool dabei, mit dem du die Performance der einzelnen Datenstrukturen, die Redis unterstützt, testen kannst.
 
Danke, genau an die hatte ich auch gedacht. Muss ich mich die nächste Woche nämlich eh mit beschäftigen da ich Redis als Message Broker für Celery in einem anderen Projekt brauche.

Und das ganz zu verteilen ist nicht so leicht. Wenn ich die Verbindungsdaten in Echtzeit bekommen sollte, dann kann ich die schon mal nicht aufsplitten und ich benötige später auch eine komplette Ansicht der Daten. Das ganze zu synchronisieren geht sicher, aber ohne Erfahrung und als 1 Mann Projekt wird das wahrscheinlich zu schwierig.
 
Nochmal ein kleines Update. Habe jetzt Multi-Threading implementiert und einen kleinen Testdatensatz von 144.100 Zeilen auf gerade einmal 8381 reduziert (wobei die dann eine Liste von Ports enthalten, d.h die tatsächlichen Updates sind schon größer - trotzdem beachtlich). Aber nun geht mir an meinem Laptop der RAM aus. Python Garbage Collection ist wohl Garbage :)

Habe es geschafft, dass das bearbeiten von einem 500 MB großen CSV mir 20 GB RAM belegt hat. Also da muss noch einiges optimiert werden. Ich werfe das jetzt mal einen großen Server. Dort habe ich schon festgestellt, dass ich zwar viele Threads haben aber dann in I/O Probleme renne. Daher werde ich dort wohl das File zuerst komplett in den RAM laden müssen.

Trotzdem denke ich, das die Datenbank dann nicht mehr das Problem darstellt wenn ich die Daten vorher schon "komprimiere". Ist halt dann nicht in Echtzeit sondern in 5 Minuten Chunks (ggf. reduziere ich das auf 1 Minute) aber gut genug für mich erstmal.

Danke für alle Tipps.
 
Evtl. kannst Du Daten ja vorher in 1000er Blöcken oder so wegschreiben, das sollte von der Performance schon ein Dimensionssprung ggü. Einzel Inserts sein. Und dann natürlich auch die Referenzen auf Objekte etc. löschen, damit der Garbage Collector auch tätig werden kann. Hab noch nie was mit Python gemacht, kann mir aber nicht vorstellen, dass der so lange braucht um Speicher freizugeben. Normalerweise läuft ein GC mehrmals die Sekunde und kann auch manuell getriggert werden...
 
Zuletzt bearbeitet:
  • Gefällt mir
Reaktionen: BeBur
Falc410 schrieb:
Habe es geschafft, dass das bearbeiten von einem 500 MB großen CSV mir 20 GB RAM belegt hat.

Wenn du genug Threads machst und jeder hat die 500Mb Daten + Overhead für Datenstruktur im Ram ist das jetzt auch nicht so schwer ;) Wenns nicht läuft paste mal Code, vllt kann wer gut genug Python oder das Problem ist einfach genug dass ich es auch raffe :D Ansonsten wirf halt Hardware aufs Problem aber 20 Gb um 500 Mb zu bearbeiten ist auf jeden Fall Symptom eines schlimmeren Problems ;)
 
aber 20 Gb um 500 Mb zu bearbeiten ist auf jeden Fall Symptom eines schlimmeren Problems
oder einer schlimmen Lösung :p

Da du wahrscheinlich in der Nähe der aktuellen Lösung (=Python) bleiben willst: Probier doch mal dictionaries statt Datenbanken: https://www.w3schools.com/python/python_dictionaries.asp
Wenn der Key nen int ist, gehts bestimmt auch sehr schnell. Aber Python ist eigtl nie ideal, wenns um Performance geht und man den rechenaufwendigen Anteil im Python Code umsetzt, statt über ne Lib indirekt in C/C++

Alternativ also evtl. das hier:
https://plyvel.readthedocs.io/en/latest/
https://en.wikipedia.org/wiki/LevelDB

Oder das hier:
https://lmdb.readthedocs.io/en/release/
https://en.wikipedia.org/wiki/Lightning_Memory-Mapped_Database
 
Zuletzt bearbeitet:
kuddlmuddl schrieb:
oder einer schlimmen Lösung :p

Da du wahrscheinlich in der Nähe der aktuellen Lösung (=Python) bleiben willst: Probier doch mal dictionaries statt Datenbanken: https://www.w3schools.com/python/python_dictionaries.asp

Ich habe noch gar nicht angefangen irgendwelche Inserts zu machen. Im Moment bin ich noch an dem Schritt, dass ich alles in Dictionaries sammel. Aber selbst das dauert zu lange.

Ich weiß, dass Python langsamer ist als C aber ich nehm am Anfang natürlich erstmal das, was ich am besten kenne. Wie gesagt, ursprünglich war geplant das in Echtzeit auszuführen. Verbindungsdaten kommen rein und sollen in Echtzeit ausgewertet und in eine Datenbank geschrieben werden. Mindestens 60.000 Zeilen pro Sekunde kommen rein.

Aber ich kann auch die 5 Minuten Chunks nehmen. Ein Tool zeichnet kontinuierlich auf und rotiert die Datei alle 5 Minuten, d.h. ich habe 5 Minuten Zeit die Datei einzulesen, auszuwerten und weg zu schreiben bevor die nächste kommt.

Ich habe jetzt mal 2 verschiedene Funktionen geschrieben um herauszufinden wie gross der Geschwindigkeitsunterschied ist. In der oberen Funktion baue ich nur ein Dict und in der zweiten stattdessen eine Liste die ich dann im Moment nur per Single Thread rausschreibe. Bei einem Testdatensatz auf meinem Laptop ist die obere Funktion einen Tick schneller, am richtigen Server ist die mit der Liste etwas schneller, aber beides noch deutlich zu langsam - insgesamt dauert es 700 Sekunden!

Und eigentlich mache ich nicht anderes als die CSV Datei in den Speicher zu laden und in jeder Zeile die IP anzuschauen ob die in einem bestimmten Netz liegt und wenn ja, wegschreiben.

Ich habe das jetzt auf einem System mit 32 Cores und 96 GB RAM getestet. RAM Verbrauch geht auf über 30 GB bei einer knapp 500MB CSV Datei. Das einlesen und starten der Threads dauert schon ziemlich lange. Wenn ich die Threads aber mal reduziere, z.B. Single oder 4, dann dauert das ewigkeiten.

Aber ich verstehe es trotzdem nicht - Ja, C ist sicher schneller als Python aber jeder nimmt doch Python für Big Data her und liest Millionen von Datensätzen ein. Muss doch möglich sein. Den IP Lookup mache ich mit netaddr und IPSet - das baut wahrscheinlich auch nur ein grosses Dict aber auf meinem Laptop ist das zumindest kein Problem.

Anscheinend macht die SSD bei mir im Laptop einiges aus. Ein 7MB Testdatensatz verarbeite ich in 6.5 Sekunden mit 4 Threads, am Server dauert das schon 8.3 Sekunden - a) weil er langsamer die Datei einliest und b) weil das erstellen von 32 Threads einen Overhead generiert. Aber das sollte bei 500MB Daten kein Thema mehr sein.

Code:
def outgoingPorts(row):
    if row['sa'] in local_net:
        if row['sa'] in src_list:
            src_list[row['sa']].append(int(row['sp']))
        else:
            src_list[row['sa']] = [row['sp']]

def iplist(row):
    if row['sa'] in local_net:
        ip_list.append(row['sa'])

if __name__ == '__main__':
    manager = Manager()
    src_list = manager.dict()
    ip_list = manager.list()
    start = time.time()
    pool = Pool(multiprocessing.cpu_count())
    with open('/root/uni-directional.csv') as f:
        csv_reader = csv.DictReader(f)
        #pool.map(outgoingPorts, csv_reader)  # generate ip / port list in memory
        pool.map(iplist, csv_reader)  # only write ip's to file

    with open('ip_list.txt', 'w') as file_handler:
        for item in ip_list:
            file_handler.write("{}\n".format(item))
    print('It took {0:0.1f} seconds'.format(time.time() - start))

Auf jeden Fall hätte ich jetzt die theoretisch 18 Millionen inserts / updates auf 33512 reduziert. Nur dauert die Vorauswahl halt schon viel zu lange. Theoretisch könnte ich das alles mit Bash Scripten machen wenn ich ne Funktion hätte die prüft ob eine IP in einem bestimmten Netzbereich liegt.
 
Zuletzt bearbeitet:
Zurück
Oben