Python STOMP Client nutzt while true - Bad Design?

Falc410

Vice Admiral
Registriert
Juni 2006
Beiträge
6.930
Ich muss einen STOMP Client implementieren der sich bei einer Queue an einem ActiveMQ JMS subscribed. Das ganze muss in Python sein und da gibt es nicht viel Auswahl. Also habe ich mir stompest angeschaut. Da gibt es einen async und einen sync Client. Den async Client möchte ich nicht unbedingt benutzen da ich dann jede Menge zusätzlicher Libraries (e.g. Twisted framework) benötige.

Der sync Client sieht aber so aus:
Code:
from stompest.config import StompConfig
from stompest.protocol import StompSpec
from stompest.sync import Stomp

CONFIG = StompConfig('tcp://localhost:61613')
QUEUE = '/queue/test'

if __name__ == '__main__':
    client = Stomp(CONFIG)
    client.connect()
    client.subscribe(QUEUE, {StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL})
    while True:
        frame = client.receiveFrame()
        print 'Got %s' % frame.info()
        client.ack(frame)
    client.disconnect()
Quelle: http://nikipore.github.io/stompest/sync.html

Ich habe nun zwei Probleme mit dem Code:

1. Wie macht ein STOMP Client das normalerweise? Bleibt er durchgehend verbunden oder verbindet er sich immer wieder neu zum Server? Falls die Verbindung offen bleibt, wie macht man das ohne ein While true?
2. Der Code wird sowieso in einem eigenen Thread laufen. Soll ich das while true dann einfach so hinnehmen oder blockiert es am Schluss doch den Server? Würde hier ein sleep() einbauen etwas bringen?
 
STOMP selbst kenne ich nicht, aber man könnte den Client z.b. so in einen Thread packen, der sich auch stoppen lässt. Das while True in deinem Beispiel würde blocken, ebenso client.receiveFrame() laut Doku wenn kein Frame zu empfangen ist. D.h. selbst wenn man stop() aufruft, würde der Thread sich erst nach dem nächsten Frame beenden. Kommt dieser nicht, so würde die Schleife dort hängen bleiben. Daher noch mit canRead() nachschauen, ob was zu empfangen ist und dann erst lesen.

Code:
#!/usr/bin/python

import threading

from stompest.config import StompConfig
from stompest.protocol import StompSpec
from stompest.sync import Stomp

class Client(object):

  def __init__(self):

    CONFIG = StompConfig('tcp://localhost:61613')
    QUEUE = '/queue/test'

    self.run = True

    self.client = Stomp(CONFIG)
    self.client.connect()
    self.client.subscribe(QUEUE, {StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL})

    thread = threading.Thread(target=self.start, args=())
    thread.daemon = True
    thread.start()       

  def start(self):
    while self.run:
      if self.client.canRead(timeout=1):
        frame = self.client.receiveFrame()
        print 'Got %s' % frame.info()
        self.client.ack(frame)

    self.client.disconnect()
    
  def stop(self):
    self.run = False
     

if __name__ == '__main__':
  c = Client()

  # do something

  c.stop()
 
Zuletzt bearbeitet:
Danke für die Antwort. In der Zwischenzeit habe ich mir auch den Source näher angeschaut und ebenso festgestellt dass auch die receiveFrame() blocking ist und eben eine while true Schleife benutzt. Also gehüpft wie gesprungen. Du hast jetzt noch eine Art sleep eingebaut mit dem timeout=1 aber ob das viel bringt?

Ich habe sowieso den Code mit Hilfe von zDaemon als Daemon laufen lassen und kann es dann auch stoppen. Leider habe ich keine Möglichkeit die disconnect() aufzurufen, aber ich denke das wird der Server schon verkraften. Habe den Process ein wenig überwacht und obwohl er while true macht, sagt mir 'top' dass der Process schläft (sleeping). Ich denke dann lasse ich das so. Das async Framework benutzt vielleicht kein busy waiting aber dafür müsste ich eben einiges zusätzliches an Software installieren.
 
Falc410 schrieb:
Danke für die Antwort. In der Zwischenzeit habe ich mir auch den Source näher angeschaut und ebenso festgestellt dass auch die receiveFrame() blocking ist und eben eine while true Schleife benutzt. Also gehüpft wie gesprungen. Du hast jetzt noch eine Art sleep eingebaut mit dem timeout=1 aber ob das viel bringt?

Habe keine Ahnung von Python, aber wenn ich den Code oben richtig interpretiere, dann bringt sein Vorgehen schon einen klaren Vorteil. Wenn keine Daten ankommen, dann kann er 1 mal pro Sekunde prüfen, ob run überhaupt noch true ist. Das heißt, er hängt gar nie beim blockierenden receive, wenn dieses keine Frames hat. Dafür blockiert jetzt eben wahrscheinlich canRead, aber pro Schleifendurchlauf eben immer nur eine Sekunde.

Man könnte bemängeln, dass die run-Variable hier vermutlich nicht threadsafe ist, aber solange es nur einen externen Aufrufer gibt, wird es in diesem Fall egal sein.
 
canRead(timeout=1) blockt für max. 1 sekunde. falls es nichts zu lesen gibt, ist die nächste runde in der while schleife dran, die man anhand self.run abbrechen könnte. damit würde der thread für den fall ohne daten spätestens nach einer sekunde gestoppt werden.

das while true im ersten beispiel fällt nicht weiter auf, weil receiveFrame() ja solange wartet bis ein frame empfangen werden kann.

https://stomp.github.io/stomp-specification-1.2.html#DISCONNECT
A client can disconnect from the server at anytime by closing the socket [...]

Wenn dein ansatz funktioniert kannst du die verbindung wohl auch einfach abbrechen :)
 
Ich hab den Source Code nicht auswendig im Kopf da ich schon zu Hause bin aber die canRead(timeout=1) blockiert 1 Sekunde, bzw. wartet bis zu einer Sekunde in einer while Schleife ob eine Nachricht (Frame) ankommt. Per Default wird timeout=0 verwendet, sprich es blockiert solange bis ein Frame kommt. Also das macht nicht unbedingt einen Unterschied.

Aber du hast Recht - der Thread kann gestoppt werden und dann würde disconnect() ausgeführt. Wie gesagt, ich muss leider zDaemon benutzen. Hat den Vorteil das man keinen daemonize Code in die einzelnen Klassen stecken muss, aber eben den Nachteil das wenn ich 'scriptName stop' aufrufe, der Prozess gekillt wird und ich keine Möglichkeit habe vorher noch Code auszuführen. Ist in dem Projekt leider (im Moment noch) so.
 
Falc410 schrieb:
Also das macht nicht unbedingt einen Unterschied.

Bei IO-Operationen wird immer irgend etwas blockieren, wenn du nicht den async-Weg gehst. Geht nicht anders. Aber wenn du canReceive mit Parameter != 0 benutzt, dann ist der gewünschte Unterschied doch vorhanden?

Um nochmal auf deine Fragen zurückzukommen:

1. Falls die Verbindung offen bleibt, wie macht man das ohne ein While true?

Die von 0x8100 vorgeschlagene Lösung beantwortet diese Frage doch perfekt. Die Verbindung bleibt offen, ein while-true hast du aber nicht. Bei blocking-IO APIs ist das noch die eleganteste Möglichkeit.

(Ausnahme: der eingehende Datenstrom ist von vornherein in seiner Struktur bekannt und endlich. Dann könntest du nach X Bytes/Frames/Nachrichten die Schleife von Innen heraus beenden, oder auf eine Poison Pill im Datenstrom warten, o.ä.).

2. Der Code wird sowieso in einem eigenen Thread laufen. Soll ich das while true dann einfach so hinnehmen oder blockiert es am Schluss doch den Server?

Wenn es nur ein einziger Thread ist, der das tut, dann ist es egal. Denn der frisst während dem Warten keine erwähnenswerten Ressourcen. Die Frage ist nur, wann die Connection aufgeräumt wird, die du beim Zwangsabbruch des Skripts zurücklässt. Wenn dein Skript aber nicht sekündlich neu aufgerufen wird, dann würde ich das für vernachlässigbar halten. Es ist jedoch relativ wahrscheinlich, dass der Endpunkt auf der anderen Seite der Leitung das ganze dann als Verbindungsfehler ansieht und nicht als korrekt beendete Verbindung, das sollte klar sein.
 
Zuletzt bearbeitet:
Die Verbindung soll mehr oder weniger unendlich lange sein, d.h. ich brauche kein Abbruchkriterium, außer ich stoppe die Software.
Mir ist klar das ohne ein disconnect es für den Server (die Gegenstelle) nach einem Verbindungsabbruch aussieht, das scheint aber in diesem Fall nicht weiter zu stören. Zumindest beim testen habe ich nichts schlimmes feststellen können.

Solange der Thread kaum Ressourcen verbraucht werde ich es aber so lassen. Das twisted Framework möchte ich nicht extra installieren.
Mit zdaemon kann ich jetzt 'python client.py' in einem extra Thread ausführen und es funktioniert super. Habe auch ein wenig mit Re-connects herumprobiert (falls die Verbindung zum Server abbricht, bzw. der Server nicht erreichbar ist).

Da die Doku nicht so gut ist und die Beispiele nicht direkt funktioniert haben will ich hier mal ein Beispiel für die Nachwelt festhalten, falls irgendjemand in die Verlegenheit kommt stompest einzusetzen:
Code:
#!/usr/bin/env python

from stompest.config import StompConfig
from stompest.protocol import StompSpec
from stompest.sync import Stomp
from stompest.error import StompConnectionError
from stompest.error import StompConnectTimeout

if __name__ == '__main__':
    uri = 'failover:(tcp://127.0.0.1:61613)?startupMaxReconnectAttempts=1,maxReconnectAttempts=50' 
    # 50 attempts around 20 minutes of re-trying due to exponential increase in wait time between attempts

    CONFIG = StompConfig(uri)
    QUEUE = '/queue/generalTopic'

    client = Stomp(CONFIG)
    client.connect()
    client.subscribe(QUEUE, {StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL})

    while True:
        try:
            frame = client.receiveFrame()
            print 'Got %s' % frame.info()
            client.ack(frame)
        except StompConnectionError:
            print "connection error - trying to reconnect"
            try:
                client.connect()
            except StompConnectTimeout:
                print ("MAX RETRY REACHED")
                exit(1)
            else:
                print "connection re-established! GOOD"
 
Zurück
Oben