Rust Channel zwischen Actix Websocket und Service

jb_alvarado

Lt. Junior Grade
Registriert
Sep. 2015
Beiträge
492
Hallo Allerseits,
ich habe schon auf der Diskussionsseite von Actix-web gefragt, aber da bekomme ich keine Antwort. Vielleicht habe ich hier mehr Glück.

Ich programmiere ein Backend mit Rust und Actix. Dabei möchte ich die Möglichkeit haben per API Aufgaben auszulösen, die zum Teil länger dauern können. Diese Aufgaben sollen im Hintergrund laufen und per Websocket soll der Status vom Server zum Client geschickt werden.

Allerdings habe ich gerade das Problem, dass ich für diesen Status einen Channel (tokio broadcast) zwischen dem API Service und dem Websocket erstellen möchte, dieser aber keine Nachrichten empfängt*. Wenn ich in der Main Funktion einen Thread erstelle und von dort aus Nachrichten versende, kommen sie durch.

*Das komische ist, dass in ganz seltenen Fällen Nachrichten vom Service geschickt werden und auch ankommen. Nach welchem Schema das funktioniert ist mir nicht ersichtlich.

Ich habe hier mal einen Beispielcode erstellt:

Rust:
use std::{thread, time::Duration};
use actix::prelude::*;
use actix_web::{
    get, middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder,
};
use actix_web_actors::ws;
use tokio::sync::broadcast;
struct WebSocket {
    receiver: web::Data<broadcast::Receiver<String>>,
    spawn_handle: Option<SpawnHandle>,
}
impl WebSocket {
    fn new(receiver: web::Data<broadcast::Receiver<String>>) -> Self {
        Self {
            receiver,
            spawn_handle: None,
        }
    }
}
impl Actor for WebSocket {
    type Context = ws::WebsocketContext<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        println!("-- actor");
        let mut receiver = self.receiver.resubscribe();
        self.spawn_handle = Some(ctx.add_stream(async_stream::stream! {
            while let Ok(message) = receiver.recv().await {
                println!("-- msg: {message}");
                yield message.to_string();
            }
        }));
    }
}
impl StreamHandler<String> for WebSocket {
    fn handle(&mut self, msg: String, ctx: &mut Self::Context) {
        ctx.text(msg);
    }
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocket {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, _ctx: &mut Self::Context) {
        println!("Received message: {msg:?}")
    }
}
async fn status_ws(
    req: HttpRequest,
    stream: web::Payload,
    recv: web::Data<broadcast::Receiver<String>>,
) -> Result<HttpResponse, Error> {
    println!("-- client connect");
    ws::start(WebSocket::new(recv), &req, stream)
}
#[get("/hello/")]
async fn say_hello(sender: web::Data<broadcast::Sender<String>>) -> impl Responder {
    let task = "from service 00".to_string();
    sender
        .send(task.clone())
        .expect("Failed to write to channel");
    println!("say hello");
    web::Json(task)
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(move || {
        let (sender, receiver) = broadcast::channel::<String>(20);
        let sender2 = sender.clone();
        let _handle = thread::spawn(move || {
            for i in 1..22 {
                thread::sleep(Duration::from_millis(600));
                sender2
                    .send(format!("from loop {i}"))
                    .expect("Failed to write to channel");
            }
        });
        App::new()
            .app_data(web::Data::new(sender))
            .app_data(web::Data::new(receiver))
            .wrap(middleware::Logger::default())
            .service(web::resource("/ws/status").route(web::get().to(status_ws)))
            .service(web::scope("/api").service(say_hello))
    })
    .bind(("127.0.0.1", 8077))?
    .run()
    .await
}

Habt ihr eine Idee, was hier falsch läuft?
Ergänzung ()

Hm, warum findet man oft kurz nach dem man ins Forum schreibt selbst die Antwort...

Habe den Channel im falschen Kontext erstellt. Wenn er direkt am Anfang in der Main Funktion erstellt wird, geht es.
 
Zuletzt bearbeitet:
  • Gefällt mir
Reaktionen: Stern1710 und ZuseZ3
Hi,
würde es dir etwas ausmachen, den aktualisierten Code bzw die Änderung zu posten? Falls nochmal jemand darüberstolpert und die Antwort sucht :) .
 
Kein Problem:

Rust:
use std::{thread, time::Duration};

use actix::prelude::*;
use actix_web::{
    get, middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder,
};
use actix_web_actors::ws;
use tokio::sync::broadcast;

use serde::Serialize;

#[derive(Clone, Debug, Serialize)]
struct Task {
    name: String,
    message: String,
    current: usize,
}

impl Task {
    pub fn new(name: &str, message: &str, current: usize) -> Self {
        Self {
            name: name.to_string(),
            message: message.to_string(),
            current,
        }
    }
}

struct WebSocket {
    receiver: broadcast::Receiver<Task>,
    spawn_handle: Option<SpawnHandle>,
}

impl WebSocket {
    fn new(receiver: web::Data<broadcast::Receiver<Task>>) -> Self {
        let res = &*receiver.into_inner();
        Self {
            receiver: res.resubscribe(),
            spawn_handle: None,
        }
    }
}

impl Actor for WebSocket {
    type Context = ws::WebsocketContext<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        let mut receiver = self.receiver.resubscribe();
        self.spawn_handle = Some(ctx.add_stream(async_stream::stream! {
            while let Ok(message) = receiver.recv().await {
                yield  message;
            }
        }));
    }
}

impl StreamHandler<Task> for WebSocket {
    fn handle(&mut self, msg: Task, ctx: &mut Self::Context) {
        let v = serde_json::to_string(&msg).unwrap();
        ctx.text(v);
    }
}

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocket {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, _ctx: &mut Self::Context) {
        // received messages comes here
        println!("Received message: {msg:?}")
    }
}

async fn status_ws(
    req: HttpRequest,
    stream: web::Payload,
    recv: web::Data<broadcast::Receiver<Task>>,
) -> Result<HttpResponse, Error> {
    ws::start(WebSocket::new(recv), &req, stream)
}

#[get("/hello/")]
async fn say_hello(sender: web::Data<broadcast::Sender<Task>>) -> impl Responder {
    actix_rt::spawn(async move {
        for i in 1..10 {
            let task = Task::new("echo", "long running task", i);
            sender.send(task.clone()).expect("Failed to write to channel");

            thread::sleep(Duration::from_secs(1));
        }
    });

    "Ok"
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let (sender, receiver) = broadcast::channel::<Task>(100);
    let send_data = web::Data::new(sender.clone());
    let recv_data = web::Data::new(receiver);

    HttpServer::new(move || {
        App::new()
            .app_data(send_data.clone())
            .app_data(recv_data.clone())
            .wrap(middleware::Logger::default())
            .service(web::resource("/ws/status").route(web::get().to(status_ws)))
            .service(web::scope("/api").service(say_hello))
    })
    .bind(("127.0.0.1", 8080))?
    .run()
    .await
}

Habe noch actix_rt::spawn eingefügt, mit dem die Hintergrundaufgabe gestartet wird.
 
Zuletzt bearbeitet:
Zurück
Oben