From dc86b3209e12f0a978976ad266386db3f1fa2637 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 5 Jan 2026 06:04:32 +0100 Subject: [PATCH 01/21] add(event): events to serialize and deserialize --- src/core/events.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/core/events.py b/src/core/events.py index e69de29..a8e6658 100644 --- a/src/core/events.py +++ b/src/core/events.py @@ -0,0 +1,27 @@ +from dataclasses import dataclass +from typing import Dict, Sequence, List, Mapping, Any +import json + + +@dataclass +class ModuleEvent: + """ + Inter-Module communication event + Module subscribe to a topic and link a callback. + The payload must correspond to a mapping of params of the callback. + """ + + topic: str + payload: Mapping[str, Any] + + @classmethod + def from_dict(cls, raw: Dict): + return cls(topic=raw["topic"], payload=raw["payload"]) + + def serialize(self) -> Sequence: + return [self.topic.encode(), json.dumps(self.payload).encode()] + + @classmethod + def deserialize(cls, raw: List[bytes]): + topic, payload = raw + return cls(topic=topic.decode(), payload=json.loads(payload.decode())) From 44680ff47e341b1a24719fe68f445f5ad805983e Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 5 Jan 2026 06:18:22 +0100 Subject: [PATCH 02/21] evol(module): now use ModuleEvent --- src/core/module.py | 52 ++++++++++++++++++++-------------------------- 1 file changed, 22 insertions(+), 30 deletions(-) diff --git a/src/core/module.py b/src/core/module.py index 4be1246..6e0d29e 100644 --- a/src/core/module.py +++ b/src/core/module.py @@ -1,12 +1,12 @@ -import json import threading from abc import ABC, abstractmethod from multiprocessing.synchronize import Event -from typing import Callable, Dict, final +from typing import Callable, Dict, final, Any, Mapping import zmq from src.tools.logger import logging +from src.core.events import ModuleEvent class Module(ABC): @@ -51,19 +51,21 @@ def subscribe(self, topic: str, callback: Callable) -> None: @final def publish( - self, topic: str, msg: object, content_type: str = "str" - ) -> None: # TODO content type enum - if content_type == "json": - payload = json.dumps(msg).encode() - elif content_type == "bytes": - payload = msg - elif content_type == "str": - payload = msg.encode() - else: - raise ValueError(f"Unsupported content_type: {content_type}") - - self.pub_socket.send_multipart([topic.encode(), content_type.encode(), payload]) - self.logger.info(f"Publish: {topic} {content_type}") + self, + topic: str, + **kwargs: Mapping[str, Any], + ) -> None: + """ + Will publish a ModuleEvent to other modules. + + :param topic: the topic of the event + :type topic: str + :param kwargs: kwargs must be named as the receiving module's callbacks + :type kwargs: Mapping[str, Any] + """ + event = ModuleEvent(topic=topic, payload=kwargs) + self.logger.info(f"Publish: {topic} {kwargs.keys()}") + self.pub_socket.send_multipart(event.serialize()) @final def _start_polling(self) -> None: @@ -80,21 +82,11 @@ def _poll_loop(self) -> None: events = dict(poller.poll(100)) for _, sub in self.subs.items(): if sub in events: - topic, content_type, payload = sub.recv_multipart() - topic_str = topic.decode() - content_type_str = content_type.decode() - self.logger.info(f"Receive: {topic_str} {content_type_str}") - if content_type_str == "json": - kwargs = json.loads(payload.decode()) - self.callbacks[topic_str]( - **kwargs - ) # TODO better and cleaner way ? - elif content_type_str == "bytes": - data = payload - self.callbacks[topic_str](data) - elif content_type_str == "str": - data = payload.decode() - self.callbacks[topic_str](data) + data = sub.recv_multipart() + event = ModuleEvent.deserialize(data) + + self.logger.info(f"Receive: {event.topic} {event.payload.keys()}") + self.callbacks[event.topic](**event.payload) @final def start_module( From 4859fd03f67fa239384f720fc62ebecd1ea2bd41 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 5 Jan 2026 06:57:35 +0100 Subject: [PATCH 03/21] evol(modules): use of ModuleEvent --- src/core/agent.py | 2 +- src/core/zmq/event_proxy.py | 12 +++++++++--- src/modules/rag/mode_controller.py | 4 ++-- src/modules/rag/rag.py | 5 +++-- src/modules/speech_to_text/record_speech.py | 6 +++--- src/modules/speech_to_text/speech_to_text.py | 2 +- src/modules/textIO/input.py | 4 ++-- src/modules/textIO/output.py | 2 +- 8 files changed, 22 insertions(+), 15 deletions(-) diff --git a/src/core/agent.py b/src/core/agent.py index df2c182..cb7f5e7 100644 --- a/src/core/agent.py +++ b/src/core/agent.py @@ -274,4 +274,4 @@ def run(self) -> None: while True: data = input() - self.down_proxy.publish("std.in", data) + self.down_proxy.publish("std.in", data=data) diff --git a/src/core/zmq/event_proxy.py b/src/core/zmq/event_proxy.py index 9447de8..66d2b4c 100644 --- a/src/core/zmq/event_proxy.py +++ b/src/core/zmq/event_proxy.py @@ -1,9 +1,10 @@ from dataclasses import dataclass -from typing import Optional +from typing import Optional, Mapping, Any import zmq from src.tools.logger import logging, setup_logger +from src.core.events import ModuleEvent @dataclass @@ -53,6 +54,11 @@ def stop(self) -> None: self.xsub.close(linger=0) self.xpub.close(linger=0) - def publish(self, topic: str, msg: str) -> None: - self.xpub.send_multipart([topic.encode(), "str".encode(), msg.encode()]) + def publish( + self, + topic: str, + **kwargs: Mapping[str, Any], + ) -> None: + event = ModuleEvent(topic=topic, payload=kwargs) + self.xpub.send_multipart(event.serialize()) self.logger.info(f"Publish: {topic} str") diff --git a/src/modules/rag/mode_controller.py b/src/modules/rag/mode_controller.py index 9ed0606..e4b6c09 100644 --- a/src/modules/rag/mode_controller.py +++ b/src/modules/rag/mode_controller.py @@ -25,12 +25,12 @@ def processTextInput(self, text: str): elif "switch rag" in text.lower(): self.switchMode(Modes.RAG) elif "bye bye" in text.lower(): - self.publish("exit", "") # TODO handle (manager being a module) usefull ? + self.publish("exit") # TODO handle (manager being a module) usefull ? elif text.strip() == "": return else: topic = f"{str(self.mode.name).lower()}.in" - self.publish(topic, text) + self.publish(topic, text=text) def set_subscriptions(self): self.subscribe("text.in", self.processTextInput) diff --git a/src/modules/rag/rag.py b/src/modules/rag/rag.py index f2fc736..b083abd 100644 --- a/src/modules/rag/rag.py +++ b/src/modules/rag/rag.py @@ -62,7 +62,8 @@ def ragLoad(self, folderPath: str, fileType: str) -> None: self.documents += self.textSplitter.split_documents(fileLoader.load()) self.vectorstore.add_documents(self.documents) - def ragQuestion(self, question: str) -> None: + def ragQuestion(self, text: str) -> None: + question = text self.logger.debug("question:", question) history = "\n".join( [ @@ -79,7 +80,7 @@ def ragQuestion(self, question: str) -> None: self.conversation_log["conversation"].append( {"question": question.split(helpingContext)[1:], "answer": answer} ) - self.publish("llm.response", answer) + self.publish("llm.response", text=answer) def saveConversation(self, filename: str = "conversation_log.json"): with open(filename, "w") as f: diff --git a/src/modules/speech_to_text/record_speech.py b/src/modules/speech_to_text/record_speech.py index a361ac3..3d87edc 100644 --- a/src/modules/speech_to_text/record_speech.py +++ b/src/modules/speech_to_text/record_speech.py @@ -77,14 +77,14 @@ def record_audio(self, starting_chunk, stop_event: Event = None) -> None: if buffer == []: break speech = np.concatenate(buffer, axis=0) - self.publish("speech.in", speech.tobytes(), "bytes") + self.publish("speech.in", buffer=speech.tobytes()) break else: silence_start = None def set_subscriptions(self) -> None: - self.subscribe("speech.in.pause", self.pause()) - self.subscribe("speech.in.resume", self.pause(False)) + self.subscribe("speech.in.pause", lambda: self.pause()) + self.subscribe("speech.in.resume", lambda: self.pause(False)) def run_module(self, stop_event: Event = None) -> None: if not self.THRESHOLD: diff --git a/src/modules/speech_to_text/speech_to_text.py b/src/modules/speech_to_text/speech_to_text.py index a086a2c..6c087b0 100644 --- a/src/modules/speech_to_text/speech_to_text.py +++ b/src/modules/speech_to_text/speech_to_text.py @@ -44,7 +44,7 @@ def process_audio(self, buffer: bytes) -> None: if not result["text"] or result["text"] == "": return - self.publish("text.in", result["text"]) + self.publish("text.in", text=result["text"]) def set_subscriptions(self) -> None: self.subscribe("speech.in", self.process_audio) diff --git a/src/modules/textIO/input.py b/src/modules/textIO/input.py index 6440238..58ee567 100644 --- a/src/modules/textIO/input.py +++ b/src/modules/textIO/input.py @@ -4,13 +4,13 @@ class TextInput(Module): def set_subscriptions(self): self.subscribe("std.in", self.stdin_to_text) - self.subscribe("std.out", lambda _: print(">> ", end="", flush=True)) + self.subscribe("std.out", lambda: print(">> ", end="", flush=True)) def stdin_to_text(self, data): print(">> ", end="", flush=True) if data == "": return - self.publish("text.in", data) + self.publish("text.in", text=data) def run_module(self, stop_event=None): print(">> ", end="", flush=True) diff --git a/src/modules/textIO/output.py b/src/modules/textIO/output.py index c68ca76..c461e7a 100644 --- a/src/modules/textIO/output.py +++ b/src/modules/textIO/output.py @@ -7,4 +7,4 @@ def set_subscriptions(self) -> None: def print_response(self, text: str) -> None: print(f"\r<< {text}") - self.publish("std.out", "") + self.publish("std.out") From 4b90c4a89aa17b643a6e3265639ba1dcb3b317d6 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Wed, 14 Jan 2026 11:32:13 +0100 Subject: [PATCH 04/21] wip(command): add CommandEvent (is not received yet) --- src/core/agent.py | 31 +++++--- src/core/events.py | 39 +++++++++- src/core/module.py | 4 +- src/core/shell.py | 10 +-- src/core/zmq/control_channel.py | 121 +++++++++++++++++--------------- src/core/zmq/event_proxy.py | 8 +-- src/launch_agent.py | 2 +- 7 files changed, 135 insertions(+), 80 deletions(-) diff --git a/src/core/agent.py b/src/core/agent.py index cb7f5e7..32d5f51 100644 --- a/src/core/agent.py +++ b/src/core/agent.py @@ -5,11 +5,13 @@ from multiprocessing.synchronize import Event from typing import Any, Dict, Mapping +from src.core.events import Command, CommandEvent + from src.modules.factory import ModuleFactory from src.tools.logger import logging, setup_logger from .huri import HuriConfig -from .zmq.control_channel import Command, Dealer +from .zmq.control_channel import Dealer from .zmq.event_proxy import EventProxy from .zmq.log_channel import LogPusher @@ -117,14 +119,24 @@ def __init__(self, config: AgentConfig) -> None: f"Agent {self.dealer.identity}", log_queue=self.log_pusher.log_queue ) - def _command_handler(self, command: Command) -> bool: + def _command_handler(self, command: CommandEvent) -> bool: match command.cmd: - case "START": - return self.start_module(*command.args) - case "STOP": - return self.stop_module(*command.args) - case "STATUS": + case Command.START: + for name in self.modules: + self.start_module(name) + pass + case Command.STOP: + for name in list(self.processes.keys()): + self.stop_module(name) + case Command.START_MODULE: + return self.start_module(**command.payload) + case Command.STOP_MODULE: + return self.stop_module(**command.payload) + case Command.STATUS: return self.status() + case Command.EXIT: + self.exit() + pass case _: return False # todo log @@ -202,7 +214,7 @@ def stop_module(self, name) -> None: del self.stop_events[name] self.log_pusher.level_filter.del_level(name) - def stop_all(self) -> None: + def exit(self) -> None: for name in list(self.processes.keys()): self.stop_module(name) @@ -269,9 +281,6 @@ def run(self) -> None: self.logger.error(e) return - for name in self.modules: - self.start_module(name) - while True: data = input() self.down_proxy.publish("std.in", data=data) diff --git a/src/core/events.py b/src/core/events.py index a8e6658..38051b6 100644 --- a/src/core/events.py +++ b/src/core/events.py @@ -1,6 +1,7 @@ -from dataclasses import dataclass -from typing import Dict, Sequence, List, Mapping, Any import json +from dataclasses import dataclass +from enum import Enum +from typing import Any, Dict, List, Mapping, Sequence @dataclass @@ -25,3 +26,37 @@ def serialize(self) -> Sequence: def deserialize(cls, raw: List[bytes]): topic, payload = raw return cls(topic=topic.decode(), payload=json.loads(payload.decode())) + + +class Command(Enum): + REGISTER = "REGISTER" + START = "START" + STOP = "STOP" + START_MODULE = "START_MODULE" + STOP_MODULE = "STOP_MODULE" + STATUS = "STATUS" + EXIT = "EXIT" + + +@dataclass +class CommandEvent: + cmd: Command + payload: Mapping[str, Any] + + @classmethod + def from_dict(cls, raw: Dict): + return cls(cmd=Command(raw["topic"]), payload=raw["payload"]) + + def serialize(self) -> Sequence: + print(self.cmd.value) + + return [ + self.cmd.value.encode(), + json.dumps(self.payload).encode(), + ] + + @classmethod + def deserialize(cls, raw: List[bytes]): + print(raw) + cmd, payload = raw + return cls(cmd=Command(cmd.decode()), payload=json.loads(payload.decode())) diff --git a/src/core/module.py b/src/core/module.py index 6e0d29e..db8e08e 100644 --- a/src/core/module.py +++ b/src/core/module.py @@ -1,12 +1,12 @@ import threading from abc import ABC, abstractmethod from multiprocessing.synchronize import Event -from typing import Callable, Dict, final, Any, Mapping +from typing import Any, Callable, Dict, Mapping, final import zmq -from src.tools.logger import logging from src.core.events import ModuleEvent +from src.tools.logger import logging class Module(ABC): diff --git a/src/core/shell.py b/src/core/shell.py index 6473357..5d01985 100644 --- a/src/core/shell.py +++ b/src/core/shell.py @@ -1,7 +1,7 @@ import cmd from src.core.huri import HuRI -from src.core.zmq.control_channel import Command +from src.core.events import CommandEvent, Command class RobotShell(cmd.Cmd): @@ -14,18 +14,18 @@ def __init__(self, huri: HuRI) -> None: def do_status(self, arg) -> None: "Display modules and router status." - self.huri.router.send_commands(Command("STATUS", [])) + self.huri.router.send_commands(Command.STATUS) def do_start(self, arg) -> None: "Start a module." - self.huri.router.send_commands(Command("START", [arg.strip()])) + self.huri.router.send_commands(Command.START, arg) def do_stop(self, arg) -> None: "Stop a module." - self.huri.router.send_commands(Command("STOP", [arg.strip()])) + self.huri.router.send_commands(Command.STOP, arg) def do_exit(self, arg) -> None: "Exit HuRi." - self.huri.router.send_commands(Command("EXIT", [])) + self.huri.router.send_commands(Command.EXIT) print("Bye !") return True diff --git a/src/core/zmq/control_channel.py b/src/core/zmq/control_channel.py index 89ef839..b1a110d 100644 --- a/src/core/zmq/control_channel.py +++ b/src/core/zmq/control_channel.py @@ -1,39 +1,39 @@ import json import uuid from dataclasses import asdict, dataclass -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional, Mapping import zmq +from src.core.events import Command, CommandEvent from src.tools.logger import logging, setup_logger +# @dataclass +# class Command: +# cmd: str # "STOP", "START", "STATUS", ... +# args: List[Any] # JSON-serializable arguments -@dataclass -class Command: - cmd: str # "STOP", "START", "STATUS", ... - args: List[Any] # JSON-serializable arguments +# def to_bytes(self) -> bytes: +# return json.dumps(asdict(self)).encode("utf-8") - def to_bytes(self) -> bytes: - return json.dumps(asdict(self)).encode("utf-8") +# @staticmethod +# def from_bytes(data: bytes) -> "Command": +# obj = json.loads(data.decode("utf-8")) +# return Command(**obj) - @staticmethod - def from_bytes(data: bytes) -> "Command": - obj = json.loads(data.decode("utf-8")) - return Command(**obj) +# @dataclass +# class Result: +# success: bool +# result: List[Any] -@dataclass -class Result: - success: bool - result: List[Any] +# def to_bytes(self) -> bytes: +# return json.dumps(asdict(self)).encode("utf-8") - def to_bytes(self) -> bytes: - return json.dumps(asdict(self)).encode("utf-8") - - @staticmethod - def from_bytes(data: bytes) -> "Command": - obj = json.loads(data.decode("utf-8")) - return Result(**obj) +# @staticmethod +# def from_bytes(data: bytes) -> "Command": +# obj = json.loads(data.decode("utf-8")) +# return Result(**obj) class Router: @@ -43,9 +43,8 @@ def __init__( port: int, logger: Optional[logging.Logger] = setup_logger("Router"), ): - self.ctx = zmq.Context.instance() - self.router = self.ctx.socket(zmq.ROUTER) + self.router: zmq.Socket[bytes] = self.ctx.socket(zmq.ROUTER) self.hostname = hostname self.port = port @@ -53,26 +52,26 @@ def __init__( self.dealers: Dict[bytes, bool] = {} + def register_dealer(self, auth: str, name: str, config: Dict[str, Any]) -> None: + if auth != "oui": + return + + self.dealers[name] = config + self.logger.info(f"Dealer registered: {name}") + def start(self): self.router.bind(f"tcp://{self.hostname}:{self.port}") self.logger.info("Router started") try: while True: - identity, *frames = self.router.recv_multipart() + identity, *data = self.router.recv_multipart() + self.logger.warning(data) + command = CommandEvent.deserialize(data) - if not frames: - continue + if command.cmd == Command.REGISTER: + self.register_dealer(**command.payload) - command = frames[0] - - if command == b"REGISTER": - self.dealers[identity] = True - self.logger.info(f"Dealer registered: {identity}") - - elif command == b"RESULT": - payload = frames[1] if len(frames) > 1 else b"" - self.logger.info(f"Result from {identity}: {payload.decode()}") except Exception as e: self.logger.exception(e) pass @@ -82,15 +81,19 @@ def start(self): def stop(self) -> None: self.router.close() - def send_command(self, dealer_id: bytes, command: Command) -> None: - if dealer_id not in self.dealers: + def send_command( + self, dealer_name: str, command: Command, **kwargs: Mapping[str, Any] + ) -> None: + if dealer_name not in self.dealers: raise ValueError("Dealer not registered") - self.router.send_multipart([dealer_id, b"COMMAND", command.to_bytes()]) + event = CommandEvent(cmd=command, payload=kwargs) + self.router.send_multipart(event.serialize()) - def send_commands(self, command: Command) -> None: - for dealer_id, _ in self.dealers.items(): - self.send_command(dealer_id, command) + def send_commands(self, command: Command, **kwargs: Mapping[str, Any]) -> None: + for dealer_name, _ in self.dealers.items(): + self.logger.info(f"Sending Command to: {dealer_name}") + self.send_command(dealer_name, command, **kwargs) class Dealer: @@ -103,41 +106,49 @@ def __init__( identity: Optional[str] = None, ): self.ctx = zmq.Context.instance() - self.dealer = self.ctx.socket(zmq.DEALER) + self.dealer: zmq.Socket[bytes] = self.ctx.socket(zmq.DEALER) self.hostname = hostname self.port = port self.executor = executor - self.identity = (identity or str(uuid.uuid4())).encode() # TODO agent name + self.identity = identity or str(uuid.uuid4()) # TODO agent name self.logger = logger or logging.getLogger(f"Dealer {self.identity}") def start(self): self.dealer.connect(f"tcp://{self.hostname}:{self.port}") - self.dealer.setsockopt(zmq.IDENTITY, self.identity) + self.dealer.setsockopt(zmq.IDENTITY, self.identity.encode()) self.logger.info(f"Dealer started: {self.identity}") try: - self.dealer.send(b"REGISTER") + register = CommandEvent( + cmd=Command.REGISTER, + payload={ + "auth": "oui", + "name": self.identity, + "config": {"none": None}, + }, + ) + self.dealer.send_multipart(register.serialize()) while True: - frames = self.dealer.recv_multipart() - - command = frames[0] + # self.dealer. + self.logger.info("received nothing still") + data = self.dealer.recv_multipart() + self.logger.info("received") + command = CommandEvent.deserialize(data) - if command == b"COMMAND": - self.logger.info("received command") - payload = frames[1] if len(frames) > 1 else b"" - result = self.execute(payload) + self.logger.info("received command") + result = self.execute(command) - self.dealer.send_multipart([b"RESULT", result]) + # self.dealer.send_multipart([b"RESULT", result]) except Exception as e: self.logger.exception(e) finally: self.dealer.close() - def execute(self, command: Command) -> bytes: + def execute(self, command: CommandEvent) -> bytes: """ Execute command sent by Router """ diff --git a/src/core/zmq/event_proxy.py b/src/core/zmq/event_proxy.py index 66d2b4c..ba8ea9d 100644 --- a/src/core/zmq/event_proxy.py +++ b/src/core/zmq/event_proxy.py @@ -1,10 +1,10 @@ from dataclasses import dataclass -from typing import Optional, Mapping, Any +from typing import Any, Mapping, Optional import zmq -from src.tools.logger import logging, setup_logger from src.core.events import ModuleEvent +from src.tools.logger import logging, setup_logger @dataclass @@ -24,8 +24,8 @@ def __init__( ): self.ctx = zmq.Context.instance() - self.xpub = self.ctx.socket(zmq.XPUB) - self.xsub = self.ctx.socket(zmq.XSUB) + self.xpub: zmq.Socket[bytes] = self.ctx.socket(zmq.XPUB) + self.xsub: zmq.Socket[bytes] = self.ctx.socket(zmq.XSUB) self.hostname = hostname self.connect_hostname = connect_hostname diff --git a/src/launch_agent.py b/src/launch_agent.py index efc63c4..1d512ee 100644 --- a/src/launch_agent.py +++ b/src/launch_agent.py @@ -34,7 +34,7 @@ def main() -> None: try: agent.run() except KeyboardInterrupt: - agent.stop_all() + agent.exit() except Exception as e: logging.getLogger(__name__).error(e) From 8438ee903c5fa78837a975a933b1ea792d63f255 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Thu, 15 Jan 2026 10:42:28 +0100 Subject: [PATCH 05/21] evol(control_channel): dealer register with auth(wip) then start --- src/core/events.py | 2 +- src/core/zmq/control_channel.py | 212 +++++++++++++++++++------------- 2 files changed, 127 insertions(+), 87 deletions(-) diff --git a/src/core/events.py b/src/core/events.py index 38051b6..a510e9d 100644 --- a/src/core/events.py +++ b/src/core/events.py @@ -30,6 +30,7 @@ def deserialize(cls, raw: List[bytes]): class Command(Enum): REGISTER = "REGISTER" + AUTH_OK = "AUTH_OK" START = "START" STOP = "STOP" START_MODULE = "START_MODULE" @@ -57,6 +58,5 @@ def serialize(self) -> Sequence: @classmethod def deserialize(cls, raw: List[bytes]): - print(raw) cmd, payload = raw return cls(cmd=Command(cmd.decode()), payload=json.loads(payload.decode())) diff --git a/src/core/zmq/control_channel.py b/src/core/zmq/control_channel.py index b1a110d..5e078f1 100644 --- a/src/core/zmq/control_channel.py +++ b/src/core/zmq/control_channel.py @@ -1,40 +1,14 @@ import json +import threading import uuid from dataclasses import asdict, dataclass -from typing import Any, Callable, Dict, List, Optional, Mapping +from typing import Any, Callable, Dict, List, Mapping, Optional import zmq from src.core.events import Command, CommandEvent from src.tools.logger import logging, setup_logger -# @dataclass -# class Command: -# cmd: str # "STOP", "START", "STATUS", ... -# args: List[Any] # JSON-serializable arguments - -# def to_bytes(self) -> bytes: -# return json.dumps(asdict(self)).encode("utf-8") - -# @staticmethod -# def from_bytes(data: bytes) -> "Command": -# obj = json.loads(data.decode("utf-8")) -# return Command(**obj) - - -# @dataclass -# class Result: -# success: bool -# result: List[Any] - -# def to_bytes(self) -> bytes: -# return json.dumps(asdict(self)).encode("utf-8") - -# @staticmethod -# def from_bytes(data: bytes) -> "Command": -# obj = json.loads(data.decode("utf-8")) -# return Result(**obj) - class Router: def __init__( @@ -48,60 +22,104 @@ def __init__( self.hostname = hostname self.port = port - self.logger = logger or logging.getLogger(__name__) + self._stop_event = None + self._poll_thread = None + self._started = False self.dealers: Dict[bytes, bool] = {} - def register_dealer(self, auth: str, name: str, config: Dict[str, Any]) -> None: + self.logger = logger or logging.getLogger(__name__) + + def _register_dealer( + self, identity: bytes, auth: str, name: str, config: Dict[str, Any] + ) -> None: if auth != "oui": return - self.dealers[name] = config - self.logger.info(f"Dealer registered: {name}") + self.dealers[identity] = config + self.logger.info(f"Dealer registered: {identity}") - def start(self): - self.router.bind(f"tcp://{self.hostname}:{self.port}") - self.logger.info("Router started") + self.send_command(identity, Command.AUTH_OK) + self.send_command(identity, Command.START) + + def _poll(self) -> None: # todo poller + """ + Start a blocking poller loop. + call self.stop() to stop. + """ - try: - while True: + while not self._stop_event.is_set(): + try: identity, *data = self.router.recv_multipart() - self.logger.warning(data) command = CommandEvent.deserialize(data) if command.cmd == Command.REGISTER: - self.register_dealer(**command.payload) + self._register_dealer(identity, **command.payload) + else: + raise Exception(f"Dealer {identity} sent {command.cmd}") + except zmq.Again: + continue + except Exception as e: + self.logger.warning(e, exc_info=True) + + def start(self) -> None: + """ + Bind to endpoint. + Launch a poll loop thread. + """ + if self._started is True: + raise Exception("already started") + + self.router.bind(f"tcp://{self.hostname}:{self.port}") + self.router.setsockopt(zmq.RCVTIMEO, 1000) + self.logger.info("started") - except Exception as e: - self.logger.exception(e) - pass - finally: - self.router.close() + self._stop_event = threading.Event() + self._poll_thread = threading.Thread(target=self._poll) + self._poll_thread.start() + + self._started = True def stop(self) -> None: - self.router.close() + if self._started is False: + raise Exception("not started") + + self._stop_event.set() + self._poll_thread.join(2.0) + + self.router.close(linger=0) + self._stop_event = None + self._poll_thread = None + + self._started = False def send_command( - self, dealer_name: str, command: Command, **kwargs: Mapping[str, Any] + self, dealer_identity: str, command: Command, **kwargs: Mapping[str, Any] ) -> None: - if dealer_name not in self.dealers: - raise ValueError("Dealer not registered") + if self._started is False: + raise Exception("not started") + + if dealer_identity not in self.dealers: + raise ValueError(f"Dealer {dealer_identity} not registered") event = CommandEvent(cmd=command, payload=kwargs) - self.router.send_multipart(event.serialize()) + self.logger.info(f"Sending Command {command} to: {dealer_identity}") + self.router.send_multipart([dealer_identity] + event.serialize()) def send_commands(self, command: Command, **kwargs: Mapping[str, Any]) -> None: - for dealer_name, _ in self.dealers.items(): - self.logger.info(f"Sending Command to: {dealer_name}") - self.send_command(dealer_name, command, **kwargs) + if self._started is False: + raise Exception("not started") + for dealer_identity, _ in self.dealers.items(): + self.send_command(dealer_identity, command, **kwargs) -class Dealer: + +class Dealer: # todo heartbeat def __init__( self, hostname: str, port: int, - executor: Callable[[Command], bool], + handler: Callable[[Command], bool], logger: Optional[logging.Logger] = None, identity: Optional[str] = None, ): @@ -111,52 +129,74 @@ def __init__( self.hostname = hostname self.port = port - self.executor = executor + self.handler = handler self.identity = identity or str(uuid.uuid4()) # TODO agent name + self._stop_event = None + self._poll_thread = None + self._started = False + self.logger = logger or logging.getLogger(f"Dealer {self.identity}") - def start(self): - self.dealer.connect(f"tcp://{self.hostname}:{self.port}") - self.dealer.setsockopt(zmq.IDENTITY, self.identity.encode()) - self.logger.info(f"Dealer started: {self.identity}") + def _poll(self) -> None: + """ + Start a blocking poller loop. + call self.stop() to stop. + """ - try: - register = CommandEvent( - cmd=Command.REGISTER, - payload={ - "auth": "oui", - "name": self.identity, - "config": {"none": None}, - }, - ) - self.dealer.send_multipart(register.serialize()) - - while True: - # self.dealer. - self.logger.info("received nothing still") + while not self._stop_event.is_set(): + try: data = self.dealer.recv_multipart() - self.logger.info("received") command = CommandEvent.deserialize(data) - self.logger.info("received command") - result = self.execute(command) + self.logger.info(f"Received Command {command.cmd}") + result = self.handler(command) # self.dealer.send_multipart([b"RESULT", result]) - except Exception as e: - self.logger.exception(e) - finally: - self.dealer.close() + except zmq.Again: + continue + except Exception as e: + self.logger.exception(e) - def execute(self, command: CommandEvent) -> bytes: + def start(self) -> None: """ - Execute command sent by Router + Connect to endpoint. + Launch a poll loop thread. + Send Register command (wip). """ - self.executor(command) + if self._started is True: + raise Exception("already started") + + self.dealer.connect(f"tcp://{self.hostname}:{self.port}") + self.dealer.setsockopt(zmq.IDENTITY, b"name") + self.dealer.setsockopt(zmq.RCVTIMEO, 1000) + self.logger.info(f"Dealer started: {self.identity}") + + self._stop_event = threading.Event() + self._poll_thread = threading.Thread(target=self._poll) + self._poll_thread.start() - # Example execution - result = f"Executed: {command.cmd}" - return result.encode() + register = CommandEvent( + cmd=Command.REGISTER, + payload={ + "auth": "oui", + "name": self.identity, + "config": {"none": None}, + }, + ) + self.dealer.send_multipart(register.serialize()) + + self._started = True def stop(self) -> None: + if self._started is False: + raise Exception("not started") + + self._stop_event.set() + self._poll_thread.join(2.0) + self.dealer.close(linger=0) + self._stop_event = None + self._poll_thread = None + + self._started = False From 29079c1422cdf49ed0e0d81c942ac84a87ae42c8 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Thu, 15 Jan 2026 10:43:53 +0100 Subject: [PATCH 06/21] evol(agent): thread is now inside zmq classes --- src/core/agent.py | 81 ++++++++++++++++++++++--------------- src/core/huri.py | 47 +++++++++------------ src/core/zmq/event_proxy.py | 34 ++++++++++++++-- src/core/zmq/log_channel.py | 47 +++++++++++++++++---- 4 files changed, 139 insertions(+), 70 deletions(-) diff --git a/src/core/agent.py b/src/core/agent.py index 32d5f51..33eb233 100644 --- a/src/core/agent.py +++ b/src/core/agent.py @@ -4,7 +4,8 @@ from dataclasses import dataclass from multiprocessing.synchronize import Event from typing import Any, Dict, Mapping - +import sys +import os from src.core.events import Command, CommandEvent from src.modules.factory import ModuleFactory @@ -87,7 +88,7 @@ def __init__(self, config: AgentConfig) -> None: self.processes: Dict[str, mp.Process] = {} self.stop_events: Dict[str, Event] = {} - self.threads: Dict[str, threading.Thread] = {} + self.stop_event: threading.Event = threading.Event() self.log_pusher = LogPusher( hostname=config.huri.hostname, port=config.huri.log_puller.port @@ -96,9 +97,10 @@ def __init__(self, config: AgentConfig) -> None: self.dealer = Dealer( hostname=config.huri.hostname, port=config.huri.router.port, - executor=self._command_handler, + handler=self._command_handler, logger=setup_logger("Dealer", log_queue=self.log_pusher.log_queue), ) + self.auth_ok = threading.Event() self.up_proxy = EventProxy( hostname=config.hostname, @@ -119,15 +121,19 @@ def __init__(self, config: AgentConfig) -> None: f"Agent {self.dealer.identity}", log_queue=self.log_pusher.log_queue ) - def _command_handler(self, command: CommandEvent) -> bool: + def _command_handler(self, command: CommandEvent) -> bool: # todo data race ? match command.cmd: + case Command.AUTH_OK: + self.auth_ok.set() + return True case Command.START: - for name in self.modules: + for name in list(self.modules.keys()): self.start_module(name) - pass + return True case Command.STOP: for name in list(self.processes.keys()): self.stop_module(name) + return True case Command.START_MODULE: return self.start_module(**command.payload) case Command.STOP_MODULE: @@ -135,8 +141,10 @@ def _command_handler(self, command: CommandEvent) -> bool: case Command.STATUS: return self.status() case Command.EXIT: - self.exit() - pass + "Stop run loop" + self.stop_event.set() + os.close(sys.stdin.fileno()) + return True case _: return False # todo log @@ -214,18 +222,17 @@ def stop_module(self, name) -> None: del self.stop_events[name] self.log_pusher.level_filter.del_level(name) - def exit(self) -> None: + def stop(self) -> None: for name in list(self.processes.keys()): self.stop_module(name) self.dealer.stop() self.up_proxy.stop() self.down_proxy.stop() - for name, thread in self.threads.items(): - self.logger.info(f"Stopping {name} thread...") - thread.join(timeout=5) - self.logger.info(f"{name} thread stopped") - self.log_pusher.level_filter.del_level(name) + + self.log_pusher.level_filter.del_level("Dealer") + self.log_pusher.level_filter.del_level("UpProxy") + self.log_pusher.level_filter.del_level("DownProxy") self.log_pusher.stop() print("Fully stopped") @@ -251,36 +258,46 @@ def set_log_level(self, name: str, level: int) -> None: def set_log_levels(self, level: int) -> None: self.log_pusher.level_filter.set_levels(level) - def _connect_to_huri(self) -> None: - self.log_pusher.level_filter.add_level("Dealer") - self.threads["Dealer"] = threading.Thread(target=self.dealer.start) - self.threads["Dealer"].start() - def _start_event_proxies(self) -> None: """Used to handle inter-module communication, though events""" self.log_pusher.level_filter.add_level("UpProxy") self.log_pusher.level_filter.add_level("DownProxy") - self.threads["UpProxy"] = threading.Thread( - target=self.up_proxy.start, args=[True, False] - ) - self.threads["DownProxy"] = threading.Thread( - target=self.down_proxy.start, args=[False, True] - ) - self.threads["UpProxy"].start() - self.threads["DownProxy"].start() + self.up_proxy.start(True, False) + self.down_proxy.start(False, True) def run(self) -> None: - """Start event router and modules""" # TODO config (also logs levels) + """ + Start Dealer and check auth. + Then start EventProxies and LogPusher. + Then loop over input() to send input as Event. + Then, when exit is requested, call stop() + """ # TODO config (also logs levels) try: + self.log_pusher.level_filter.add_level("Dealer") + self.dealer.start() + + if self.auth_ok.wait(5.0) is False: + raise Exception("not authentificated") + self.log_pusher.start() - self._connect_to_huri() self._start_event_proxies() except Exception as e: self.logger.error(e) return - while True: - data = input() - self.down_proxy.publish("std.in", data=data) + while not self.stop_event.is_set(): + try: + data = "" + data = input() + except EOFError: + self.logger.info("pressed EOF") + print("^D") + + if data == "": + self.down_proxy.publish("std.out") + else: + self.down_proxy.publish("std.in", data=data) + + self.stop() diff --git a/src/core/huri.py b/src/core/huri.py index 18f52f3..ab84fc4 100644 --- a/src/core/huri.py +++ b/src/core/huri.py @@ -1,7 +1,7 @@ import sys import threading from dataclasses import dataclass -from typing import Dict +from time import sleep from src.tools.logger import setup_logger @@ -53,45 +53,38 @@ def __init__(self, config: HuriConfig) -> None: ) self.log_channel = LogPuller(config.hostname, config.log_puller.port) - self.threads: Dict[str, threading.Thread] = {} + self.stop_event = threading.Event() self.logger = setup_logger("HuRI") - def _start_router(self) -> None: - """Used to handle Agent registration and control""" - self.threads["Router"] = threading.Thread(target=self.router.start) - self.threads["Router"].start() - - def _start_event_proxy(self) -> None: - """Used to handle inter-module communication, though events""" - self.threads["EventProxy"] = threading.Thread( - target=self.event_proxy.start, args=[False, False] - ) - self.threads["EventProxy"].start() - - def _start_log_channel(self) -> None: - """Used to handle Agent registration and control""" - self.threads["LogChannel"] = threading.Thread(target=self.log_channel.start) - self.threads["LogChannel"].start() - def run(self) -> None: - self._start_log_channel() - self._start_router() - self._start_event_proxy() + """ + Start LogPuller. + Start Router. + Start EventProxy. + Then loop over RobotShell.cmdloop() to send input as commandst. + Then, when exit is requested, call stop() + """ + + "Used to handle log filtering and displaying" + self.log_channel.start() + "Used to handle Agent registration and control" + self.router.start() + "Used to handle inter-module communication, though events" + self.event_proxy.start(False, False) if not sys.stdin.isatty(): - threading.Event().wait() + self.stop_event.wait() return from src.core.shell import RobotShell RobotShell(self).cmdloop() + self.stop() + def stop(self) -> None: self.router.stop() self.event_proxy.stop() self.log_channel.stop() - for name, thread in self.threads.items(): - self.logger.info(f"Stopping {name} thread...") - thread.join(timeout=5) - self.logger.info(f"{name} thread stopped") + print("Fully stopped") diff --git a/src/core/zmq/event_proxy.py b/src/core/zmq/event_proxy.py index ba8ea9d..0ac2514 100644 --- a/src/core/zmq/event_proxy.py +++ b/src/core/zmq/event_proxy.py @@ -1,3 +1,4 @@ +import threading from dataclasses import dataclass from typing import Any, Mapping, Optional @@ -32,9 +33,18 @@ def __init__( self.xpub_port = xpub_port self.xsub_port = xsub_port + self._started: bool = False + self.logger = logger or logging.getLogger(__name__) def start(self, xpub_connect: bool, xsub_connect: bool): + """ + Connect to endpoint. + Launch a proxy thread. + """ + if self._started is True: + raise Exception("already started") + if xpub_connect: self.xpub.connect(f"tcp://{self.connect_hostname}:{self.xpub_port}") else: @@ -44,21 +54,39 @@ def start(self, xpub_connect: bool, xsub_connect: bool): else: self.xsub.bind(f"tcp://{self.hostname}:{self.xsub_port}") + self.logger.info("Correctly initialized, starting proxy") + + self._proxy_thread = threading.Thread(target=self._proxy) + self._proxy_thread.start() + + self._started = True + + def _proxy(self) -> None: try: - self.logger.info("Correctly initialized, starting proxy") - zmq.proxy(self.xsub, self.xpub) + zmq.proxy(self.xsub, self.xpub) # todo capture to stop except Exception as e: self.logger.error(e) def stop(self) -> None: - self.xsub.close(linger=0) + if self._started is False: + raise Exception("not started") + + self.xsub.close(linger=0) # todo capture self.xpub.close(linger=0) + self._proxy_thread.join(2.0) + self._proxy_thread = None + + self._started = False + def publish( self, topic: str, **kwargs: Mapping[str, Any], ) -> None: + if self._started is False: + raise Exception("not started") + event = ModuleEvent(topic=topic, payload=kwargs) self.xpub.send_multipart(event.serialize()) self.logger.info(f"Publish: {topic} str") diff --git a/src/core/zmq/log_channel.py b/src/core/zmq/log_channel.py index 6eb2a8e..a9bc8d3 100644 --- a/src/core/zmq/log_channel.py +++ b/src/core/zmq/log_channel.py @@ -1,4 +1,5 @@ import json +import threading import time from typing import Any, Dict, Optional @@ -66,24 +67,54 @@ def __init__( logger: Optional[logging.Logger] = setup_logger("LogPuller"), ) -> None: self.ctx = zmq.Context.instance() - self.pull = self.ctx.socket(zmq.PULL) + self.pull: zmq.Socket[bytes] = self.ctx.socket(zmq.PULL) self.hostname = hostname self.port = port + self._stop_event = None + self._poll_thread = None + self._started = False + self.logger = logger or logging.getLogger(__name__) + def _poll(self) -> None: + while not self._stop_event.is_set(): + try: + payload = self.pull.recv() + + self.logger.handle(dict_to_record(json.loads(payload.decode()))) + except zmq.Again: + continue + except Exception as e: + self.logger.exception(e) + def start(self) -> None: - self.pull.bind(f"tcp://{self.hostname}:{self.port}") + if self._started is True: + raise Exception("already started") + self.pull.bind(f"tcp://{self.hostname}:{self.port}") + self.pull.setsockopt(zmq.RCVTIMEO, 1000) self.logger.info("started") - while True: - payload = self.pull.recv() - self.logger.handle(dict_to_record(json.loads(payload.decode()))) + self._stop_event = threading.Event() + self._poll_thread = threading.Thread(target=self._poll) + self._poll_thread.start() + + self._started = True def stop(self) -> None: - self.pull.close() + if self._started is False: + raise Exception("not started") + + self._stop_event.set() + self._poll_thread.join(2.0) + + self.pull.close(linger=0) + self._stop_event = None + self._poll_thread = None + + self._started = False class LogPusher: @@ -95,7 +126,7 @@ def __init__( ): super().__init__() self.ctx = zmq.Context.instance() - self.socket = self.ctx.socket(zmq.PUSH) + self.socket: zmq.Socket[bytes] = self.ctx.socket(zmq.PUSH) self.hostname = hostname self.port = port @@ -135,7 +166,7 @@ def start(self) -> None: self.log_handler.start() self.log_listener.start() - def stop(self): + def stop(self): # todo _started self.logger.info("stopping") time.sleep(0.2) self.log_listener.stop() From 31c8fb1bd16f4c023f42a887c4ed7526b0bdbf52 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Thu, 15 Jan 2026 10:44:35 +0100 Subject: [PATCH 07/21] fix(core): agent and huri now exit cleanly --- src/core/shell.py | 3 ++- src/launch_agent.py | 3 ++- src/launch_huri.py | 1 + 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/core/shell.py b/src/core/shell.py index 5d01985..bf2781b 100644 --- a/src/core/shell.py +++ b/src/core/shell.py @@ -1,7 +1,7 @@ import cmd +from src.core.events import Command, CommandEvent from src.core.huri import HuRI -from src.core.events import CommandEvent, Command class RobotShell(cmd.Cmd): @@ -23,6 +23,7 @@ def do_start(self, arg) -> None: def do_stop(self, arg) -> None: "Stop a module." self.huri.router.send_commands(Command.STOP, arg) + self.huri.stop_event.set() def do_exit(self, arg) -> None: "Exit HuRi." diff --git a/src/launch_agent.py b/src/launch_agent.py index 1d512ee..dd91427 100644 --- a/src/launch_agent.py +++ b/src/launch_agent.py @@ -34,9 +34,10 @@ def main() -> None: try: agent.run() except KeyboardInterrupt: - agent.exit() + agent.stop() except Exception as e: logging.getLogger(__name__).error(e) + agent.stop() if __name__ == "__main__": diff --git a/src/launch_huri.py b/src/launch_huri.py index b4f9fd0..b1f9485 100644 --- a/src/launch_huri.py +++ b/src/launch_huri.py @@ -37,6 +37,7 @@ def main() -> None: huri.stop() except Exception as e: logging.getLogger(__name__).error(e) + huri.stop() if __name__ == "__main__": From 3ad3368096663e5aea7c02ce0955f4d68811cbe0 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Thu, 15 Jan 2026 12:17:30 +0100 Subject: [PATCH 08/21] evol(events): update events naming --- src/core/events.py | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/core/events.py b/src/core/events.py index a510e9d..11f8aa1 100644 --- a/src/core/events.py +++ b/src/core/events.py @@ -28,35 +28,37 @@ def deserialize(cls, raw: List[bytes]): return cls(topic=topic.decode(), payload=json.loads(payload.decode())) -class Command(Enum): - REGISTER = "REGISTER" - AUTH_OK = "AUTH_OK" - START = "START" - STOP = "STOP" - START_MODULE = "START_MODULE" - STOP_MODULE = "STOP_MODULE" - STATUS = "STATUS" - EXIT = "EXIT" +class Control(Enum): + # Agent -> HuRI + REGISTER = "REGISTER" # send auth + agent config + HEARTBEAT = "HEARTBEAT" # send agent heartbeat + modified config + # HuRI -> Agents + AUTH_OK = "AUTH_OK" # send huri config (after) + START = "START" # start all modules + STOP = "STOP" # stop all modules + START_MODULE = "START_MODULE" # start specific modules + STOP_MODULE = "STOP_MODULE" # stop specific modules + EXIT = "EXIT" # exit agent @dataclass -class CommandEvent: - cmd: Command +class ControlEvent: + ctrl: Control payload: Mapping[str, Any] @classmethod def from_dict(cls, raw: Dict): - return cls(cmd=Command(raw["topic"]), payload=raw["payload"]) + return cls(ctrl=Control(raw["ctrl"]), payload=raw["payload"]) def serialize(self) -> Sequence: - print(self.cmd.value) + print(self.ctrl.value) return [ - self.cmd.value.encode(), + self.ctrl.value.encode(), json.dumps(self.payload).encode(), ] @classmethod def deserialize(cls, raw: List[bytes]): - cmd, payload = raw - return cls(cmd=Command(cmd.decode()), payload=json.loads(payload.decode())) + ctrl, payload = raw + return cls(ctrl=Control(ctrl.decode()), payload=json.loads(payload.decode())) From 97f949d2af638c8e38ae886b1879133da360e3e6 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Thu, 22 Jan 2026 10:14:33 +0100 Subject: [PATCH 09/21] wip(control_channel): huri can handle control --- src/core/agent.py | 20 ++++++------ src/core/events.py | 1 + src/core/huri.py | 54 ++++++++++++++++++++++++++++++--- src/core/zmq/control_channel.py | 45 +++++++++++++-------------- 4 files changed, 84 insertions(+), 36 deletions(-) diff --git a/src/core/agent.py b/src/core/agent.py index 33eb233..27382cf 100644 --- a/src/core/agent.py +++ b/src/core/agent.py @@ -6,7 +6,7 @@ from typing import Any, Dict, Mapping import sys import os -from src.core.events import Command, CommandEvent +from src.core.events import Control, ControlEvent from src.modules.factory import ModuleFactory from src.tools.logger import logging, setup_logger @@ -121,26 +121,26 @@ def __init__(self, config: AgentConfig) -> None: f"Agent {self.dealer.identity}", log_queue=self.log_pusher.log_queue ) - def _command_handler(self, command: CommandEvent) -> bool: # todo data race ? - match command.cmd: - case Command.AUTH_OK: + def _control_handler(self, command: ControlEvent) -> bool: # todo data race ? + match command.ctrl: + case Control.AUTH_OK: self.auth_ok.set() return True - case Command.START: + case Control.START: for name in list(self.modules.keys()): self.start_module(name) return True - case Command.STOP: + case Control.STOP: for name in list(self.processes.keys()): self.stop_module(name) return True - case Command.START_MODULE: + case Control.START_MODULE: return self.start_module(**command.payload) - case Command.STOP_MODULE: + case Control.STOP_MODULE: return self.stop_module(**command.payload) - case Command.STATUS: + case Control.STATUS: return self.status() - case Command.EXIT: + case Control.EXIT: "Stop run loop" self.stop_event.set() os.close(sys.stdin.fileno()) diff --git a/src/core/events.py b/src/core/events.py index 11f8aa1..d3ca555 100644 --- a/src/core/events.py +++ b/src/core/events.py @@ -32,6 +32,7 @@ class Control(Enum): # Agent -> HuRI REGISTER = "REGISTER" # send auth + agent config HEARTBEAT = "HEARTBEAT" # send agent heartbeat + modified config + EXITED = "EXITED" # send exited info # HuRI -> Agents AUTH_OK = "AUTH_OK" # send huri config (after) START = "START" # start all modules diff --git a/src/core/huri.py b/src/core/huri.py index ab84fc4..664aab4 100644 --- a/src/core/huri.py +++ b/src/core/huri.py @@ -1,13 +1,14 @@ import sys import threading from dataclasses import dataclass -from time import sleep from src.tools.logger import setup_logger - +from typing import Dict from .zmq.control_channel import Router +from src.core.events import Control, ControlEvent from .zmq.event_proxy import EventProxy from .zmq.log_channel import LogPuller +from src.core.agent import AgentConfig @dataclass @@ -43,26 +44,71 @@ def from_dict(cls, raw: dict): ) +@dataclass +class AgentStatus: + update_time: int + config: AgentConfig + status: Dict[str, int] + + @classmethod + def from_dict(cls, raw: dict): + return cls( + update_time=raw["update_time"], + config=AgentConfig.from_dict(raw["config"]), + status=raw["status"], + ) + + class HuRI: """Wait for Agent to connect, handle module communication and Logging""" def __init__(self, config: HuriConfig) -> None: - self.router = Router(config.hostname, config.router.port) + self.config = config + + self.router = Router(config.hostname, config.router.port, self._control_handler) self.event_proxy = EventProxy( config.hostname, "", config.event_proxy.xpub, config.event_proxy.xsub ) self.log_channel = LogPuller(config.hostname, config.log_puller.port) + self.agents: Dict[bytes, AgentStatus] = {} + self.stop_event = threading.Event() self.logger = setup_logger("HuRI") + def _control_handler( + self, identity: bytes, event: ControlEvent + ) -> bool: # todo data race ? + match event.ctrl: + case Control.REGISTER: + if event.payload["auth"] != "oui": # todo wip + return False + self.router.dealers[identity] = True + self.agents[identity] = AgentStatus.from_dict(**event.payload["agent"]) + + self.router.send_control( + identity, Control.AUTH_OK, self.config + ) # todo send all config ? + self.router.send_control(identity, Control.START) + return True + case Control.HEARTBEAT: + # previous_config = self.router.dealers[identity] + # self.agents[identity] = todo + # todo AgentStatus concat + return True + case Control.EXITED: + del self.router.dealers[identity] + del self.agents[identity] + case _: + return False # todo log + def run(self) -> None: """ Start LogPuller. Start Router. Start EventProxy. - Then loop over RobotShell.cmdloop() to send input as commandst. + Then loop over RobotShell.cmdloop() to use HuRI commands. Then, when exit is requested, call stop() """ diff --git a/src/core/zmq/control_channel.py b/src/core/zmq/control_channel.py index 5e078f1..1f25719 100644 --- a/src/core/zmq/control_channel.py +++ b/src/core/zmq/control_channel.py @@ -6,7 +6,7 @@ import zmq -from src.core.events import Command, CommandEvent +from src.core.events import Control, ControlEvent from src.tools.logger import logging, setup_logger @@ -15,6 +15,7 @@ def __init__( self, hostname: str, port: int, + handler: Callable[[bytes, ControlEvent], bool], logger: Optional[logging.Logger] = setup_logger("Router"), ): self.ctx = zmq.Context.instance() @@ -22,6 +23,8 @@ def __init__( self.hostname = hostname self.port = port + self.handler = handler + self._stop_event = None self._poll_thread = None self._started = False @@ -30,7 +33,7 @@ def __init__( self.logger = logger or logging.getLogger(__name__) - def _register_dealer( + def register_dealer( self, identity: bytes, auth: str, name: str, config: Dict[str, Any] ) -> None: if auth != "oui": @@ -39,9 +42,6 @@ def _register_dealer( self.dealers[identity] = config self.logger.info(f"Dealer registered: {identity}") - self.send_command(identity, Command.AUTH_OK) - self.send_command(identity, Command.START) - def _poll(self) -> None: # todo poller """ Start a blocking poller loop. @@ -51,12 +51,13 @@ def _poll(self) -> None: # todo poller while not self._stop_event.is_set(): try: identity, *data = self.router.recv_multipart() - command = CommandEvent.deserialize(data) + event = ControlEvent.deserialize(data) - if command.cmd == Command.REGISTER: - self._register_dealer(identity, **command.payload) + if self.handler(identity, event) is False: + self.logger.warning("Could not execute control") else: - raise Exception(f"Dealer {identity} sent {command.cmd}") + self.logger.info("Control executed") + except zmq.Again: continue except Exception as e: @@ -93,8 +94,8 @@ def stop(self) -> None: self._started = False - def send_command( - self, dealer_identity: str, command: Command, **kwargs: Mapping[str, Any] + def send_control( + self, dealer_identity: str, Control: Control, **kwargs: Mapping[str, Any] ) -> None: if self._started is False: raise Exception("not started") @@ -102,16 +103,16 @@ def send_command( if dealer_identity not in self.dealers: raise ValueError(f"Dealer {dealer_identity} not registered") - event = CommandEvent(cmd=command, payload=kwargs) - self.logger.info(f"Sending Command {command} to: {dealer_identity}") + event = ControlEvent(cmd=Control, payload=kwargs) + self.logger.info(f"Sending Control {Control} to: {dealer_identity}") self.router.send_multipart([dealer_identity] + event.serialize()) - def send_commands(self, command: Command, **kwargs: Mapping[str, Any]) -> None: + def send_controls(self, Control: Control, **kwargs: Mapping[str, Any]) -> None: if self._started is False: raise Exception("not started") for dealer_identity, _ in self.dealers.items(): - self.send_command(dealer_identity, command, **kwargs) + self.send_control(dealer_identity, Control, **kwargs) class Dealer: # todo heartbeat @@ -119,7 +120,7 @@ def __init__( self, hostname: str, port: int, - handler: Callable[[Command], bool], + handler: Callable[[Control], bool], logger: Optional[logging.Logger] = None, identity: Optional[str] = None, ): @@ -147,10 +148,10 @@ def _poll(self) -> None: while not self._stop_event.is_set(): try: data = self.dealer.recv_multipart() - command = CommandEvent.deserialize(data) + Control = ControlEvent.deserialize(data) - self.logger.info(f"Received Command {command.cmd}") - result = self.handler(command) + self.logger.info(f"Received Control {Control.cmd}") + result = self.handler(Control) # self.dealer.send_multipart([b"RESULT", result]) except zmq.Again: @@ -162,7 +163,7 @@ def start(self) -> None: """ Connect to endpoint. Launch a poll loop thread. - Send Register command (wip). + Send Register Control (wip). """ if self._started is True: raise Exception("already started") @@ -176,8 +177,8 @@ def start(self) -> None: self._poll_thread = threading.Thread(target=self._poll) self._poll_thread.start() - register = CommandEvent( - cmd=Command.REGISTER, + register = ControlEvent( + cmd=Control.REGISTER, payload={ "auth": "oui", "name": self.identity, From 9c2e5645ea4f0898ae6022d9e3b35f87a4954287 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 16 Mar 2026 10:15:51 +0100 Subject: [PATCH 10/21] remove(archi): old deprecated archi files --- .gitignore | 3 + src/core/agent.py | 303 ---------------------------- src/core/shell.py | 32 --- src/core/zmq/__init__.py | 0 src/core/zmq/control_channel.py | 203 ------------------- src/core/zmq/event_proxy.py | 92 --------- src/core/zmq/log_channel.py | 173 ---------------- src/emotional_hub/input_analysis.py | 24 --- src/launch_agent.py | 44 ---- src/launch_huri.py | 44 ---- src/modules/factory.py | 33 --- src/modules/rag/__init__.py | 0 src/modules/rag/mode_controller.py | 37 ---- src/modules/rag/rag.py | 97 --------- src/modules/textIO/input.py | 17 -- src/modules/textIO/output.py | 10 - 16 files changed, 3 insertions(+), 1109 deletions(-) delete mode 100644 src/core/agent.py delete mode 100644 src/core/shell.py delete mode 100644 src/core/zmq/__init__.py delete mode 100644 src/core/zmq/control_channel.py delete mode 100644 src/core/zmq/event_proxy.py delete mode 100644 src/core/zmq/log_channel.py delete mode 100644 src/emotional_hub/input_analysis.py delete mode 100644 src/launch_agent.py delete mode 100644 src/launch_huri.py delete mode 100644 src/modules/factory.py delete mode 100644 src/modules/rag/__init__.py delete mode 100644 src/modules/rag/mode_controller.py delete mode 100644 src/modules/rag/rag.py delete mode 100644 src/modules/textIO/input.py delete mode 100644 src/modules/textIO/output.py diff --git a/.gitignore b/.gitignore index b953209..99ba63f 100644 --- a/.gitignore +++ b/.gitignore @@ -175,3 +175,6 @@ cython_debug/ # PyPI configuration file .pypirc + +# Others +.trash \ No newline at end of file diff --git a/src/core/agent.py b/src/core/agent.py deleted file mode 100644 index 27382cf..0000000 --- a/src/core/agent.py +++ /dev/null @@ -1,303 +0,0 @@ -import multiprocessing as mp -import signal -import threading -from dataclasses import dataclass -from multiprocessing.synchronize import Event -from typing import Any, Dict, Mapping -import sys -import os -from src.core.events import Control, ControlEvent - -from src.modules.factory import ModuleFactory -from src.tools.logger import logging, setup_logger - -from .huri import HuriConfig -from .zmq.control_channel import Dealer -from .zmq.event_proxy import EventProxy -from .zmq.log_channel import LogPusher - - -@dataclass -class ForwarderProxyConfig: - down_xsub: int - up_xpub: int - - @classmethod - def from_dict(cls, raw: dict): - return cls( - down_xsub=raw["down-xsub"], - up_xpub=raw["up-xpub"], - ) - - -@dataclass -class ModuleConfig: - name: str - args: Mapping[str, Any] - logging: int - - @classmethod - def from_dict(cls, raw: dict): - level = logging._nameToLevel.get( - raw.get("logging", "INFO"), - logging.INFO, - ) - return cls( - name=raw["name"], - args=raw.get("args", {}), - logging=level, - ) - - -@dataclass -class AgentConfig: - id: str - hostname: str - huri: HuriConfig - logging: int - forwarder_proxy: ForwarderProxyConfig - modules: Dict[str, ModuleConfig] - - @classmethod - def from_dict(cls, raw: dict): - level = logging._nameToLevel.get( - raw.get("logging", "INFO").upper(), - logging.INFO, - ) - modules = { - module_id: ModuleConfig.from_dict(mod_raw) - for module_id, mod_raw in raw.get("modules", {}).items() - } - return cls( - id=raw["id"], - hostname=raw["hostname"], - huri=HuriConfig.from_dict(raw["huri"]), - forwarder_proxy=ForwarderProxyConfig.from_dict(raw["forwarder-proxy"]), - logging=level, - modules=modules, - ) - - -class Agent: - """Control Modules and communication with HuRI""" - - def __init__(self, config: AgentConfig) -> None: - self.modules: Dict[str, ModuleConfig] = config.modules - self.config = config - - self.processes: Dict[str, mp.Process] = {} - self.stop_events: Dict[str, Event] = {} - - self.stop_event: threading.Event = threading.Event() - - self.log_pusher = LogPusher( - hostname=config.huri.hostname, port=config.huri.log_puller.port - ) - - self.dealer = Dealer( - hostname=config.huri.hostname, - port=config.huri.router.port, - handler=self._command_handler, - logger=setup_logger("Dealer", log_queue=self.log_pusher.log_queue), - ) - self.auth_ok = threading.Event() - - self.up_proxy = EventProxy( - hostname=config.hostname, - connect_hostname=config.huri.hostname, - xpub_port=config.huri.event_proxy.xsub, - xsub_port=config.forwarder_proxy.up_xpub, - logger=setup_logger("UpProxy", log_queue=self.log_pusher.log_queue), - ) - self.down_proxy = EventProxy( - hostname=config.hostname, - connect_hostname=config.huri.hostname, - xpub_port=config.forwarder_proxy.down_xsub, - xsub_port=config.huri.event_proxy.xpub, - logger=setup_logger("DownProxy", log_queue=self.log_pusher.log_queue), - ) - - self.logger = setup_logger( - f"Agent {self.dealer.identity}", log_queue=self.log_pusher.log_queue - ) - - def _control_handler(self, command: ControlEvent) -> bool: # todo data race ? - match command.ctrl: - case Control.AUTH_OK: - self.auth_ok.set() - return True - case Control.START: - for name in list(self.modules.keys()): - self.start_module(name) - return True - case Control.STOP: - for name in list(self.processes.keys()): - self.stop_module(name) - return True - case Control.START_MODULE: - return self.start_module(**command.payload) - case Control.STOP_MODULE: - return self.stop_module(**command.payload) - case Control.STATUS: - return self.status() - case Control.EXIT: - "Stop run loop" - self.stop_event.set() - os.close(sys.stdin.fileno()) - return True - case _: - return False # todo log - - @staticmethod - def _start_module( - name: str, - module_config: ModuleConfig, - agent_config: AgentConfig, - log_queue: mp.Queue, - stop_event: Event, - ) -> None: - """Helper function to start module in child process.""" - logger = setup_logger( - module_config.name, level=module_config.logging, log_queue=log_queue - ) - - module = ModuleFactory.create(name, module_config.args) - module.set_custom_logger(logger) - - def handle_sigint(signum, frame): - logger.info("Ctrl+C ignored in child module") - - signal.signal(signal.SIGINT, handle_sigint) - - module.start_module( - agent_config.hostname, - agent_config.forwarder_proxy.up_xpub, - agent_config.forwarder_proxy.down_xsub, - stop_event=stop_event, - ) - - def start_module(self, name) -> None: - """Check if module is registered and not already running, and start a child process.""" - if name not in self.modules: - self.logger.warning( - f"{name} is not in the registered Modules: {self.modules.keys()}" - ) - return - if name in self.processes: - self.logger.warning( - f"{name} is already running (PID={self.processes[name].pid})" - ) - return - - module_config = self.modules[name] - stop_event = mp.Event() - p = mp.Process( - target=self._start_module, - args=( - name, - module_config, - self.config, - self.log_pusher.log_queue, - stop_event, - ), - daemon=True, - ) - self.processes[name] = p - self.stop_events[name] = stop_event - self.log_pusher.level_filter.add_level(name) - - p.start() - self.logger.info(f"{name} ({module_config.name}) started (PID={p.pid})") - - def stop_module(self, name) -> None: - if name in self.processes: - self.logger.info(f"Stopping {name}...") - self.stop_events[name].set() - self.processes[name].join(timeout=5) - if self.processes[name].is_alive(): - self.logger.warning(f"{name} did not stop in time, killing") - self.processes[name].kill() - self.logger.info(f"{name} stopped") - del self.processes[name] - del self.stop_events[name] - self.log_pusher.level_filter.del_level(name) - - def stop(self) -> None: - for name in list(self.processes.keys()): - self.stop_module(name) - - self.dealer.stop() - self.up_proxy.stop() - self.down_proxy.stop() - - self.log_pusher.level_filter.del_level("Dealer") - self.log_pusher.level_filter.del_level("UpProxy") - self.log_pusher.level_filter.del_level("DownProxy") - - self.log_pusher.stop() - print("Fully stopped") - - def status(self) -> None: - """Print status of all modules and router.""" - print("=== Module Status ===") - for name in self.modules: - process = self.processes.get(name) - if process: - state = "alive" if process.is_alive() else "stopped" - print(f"- {name}: {state} (PID={process.pid})") - else: - print(f"- {name}: stopped") - print("=====================") - - def set_root_log_level(self, level: int) -> None: - self.log_pusher.level_filter.set_root_level(level) - - def set_log_level(self, name: str, level: int) -> None: - self.log_pusher.level_filter.set_level(name, level) - - def set_log_levels(self, level: int) -> None: - self.log_pusher.level_filter.set_levels(level) - - def _start_event_proxies(self) -> None: - """Used to handle inter-module communication, though events""" - self.log_pusher.level_filter.add_level("UpProxy") - self.log_pusher.level_filter.add_level("DownProxy") - - self.up_proxy.start(True, False) - self.down_proxy.start(False, True) - - def run(self) -> None: - """ - Start Dealer and check auth. - Then start EventProxies and LogPusher. - Then loop over input() to send input as Event. - Then, when exit is requested, call stop() - """ # TODO config (also logs levels) - - try: - self.log_pusher.level_filter.add_level("Dealer") - self.dealer.start() - - if self.auth_ok.wait(5.0) is False: - raise Exception("not authentificated") - - self.log_pusher.start() - self._start_event_proxies() - except Exception as e: - self.logger.error(e) - return - - while not self.stop_event.is_set(): - try: - data = "" - data = input() - except EOFError: - self.logger.info("pressed EOF") - print("^D") - - if data == "": - self.down_proxy.publish("std.out") - else: - self.down_proxy.publish("std.in", data=data) - - self.stop() diff --git a/src/core/shell.py b/src/core/shell.py deleted file mode 100644 index bf2781b..0000000 --- a/src/core/shell.py +++ /dev/null @@ -1,32 +0,0 @@ -import cmd - -from src.core.events import Command, CommandEvent -from src.core.huri import HuRI - - -class RobotShell(cmd.Cmd): - intro = "HuRI's shell. Type 'help' to see command's list." - prompt = "(HuRI) " - - def __init__(self, huri: HuRI) -> None: - super().__init__() - self.huri = huri - - def do_status(self, arg) -> None: - "Display modules and router status." - self.huri.router.send_commands(Command.STATUS) - - def do_start(self, arg) -> None: - "Start a module." - self.huri.router.send_commands(Command.START, arg) - - def do_stop(self, arg) -> None: - "Stop a module." - self.huri.router.send_commands(Command.STOP, arg) - self.huri.stop_event.set() - - def do_exit(self, arg) -> None: - "Exit HuRi." - self.huri.router.send_commands(Command.EXIT) - print("Bye !") - return True diff --git a/src/core/zmq/__init__.py b/src/core/zmq/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/core/zmq/control_channel.py b/src/core/zmq/control_channel.py deleted file mode 100644 index 1f25719..0000000 --- a/src/core/zmq/control_channel.py +++ /dev/null @@ -1,203 +0,0 @@ -import json -import threading -import uuid -from dataclasses import asdict, dataclass -from typing import Any, Callable, Dict, List, Mapping, Optional - -import zmq - -from src.core.events import Control, ControlEvent -from src.tools.logger import logging, setup_logger - - -class Router: - def __init__( - self, - hostname: str, - port: int, - handler: Callable[[bytes, ControlEvent], bool], - logger: Optional[logging.Logger] = setup_logger("Router"), - ): - self.ctx = zmq.Context.instance() - self.router: zmq.Socket[bytes] = self.ctx.socket(zmq.ROUTER) - self.hostname = hostname - self.port = port - - self.handler = handler - - self._stop_event = None - self._poll_thread = None - self._started = False - - self.dealers: Dict[bytes, bool] = {} - - self.logger = logger or logging.getLogger(__name__) - - def register_dealer( - self, identity: bytes, auth: str, name: str, config: Dict[str, Any] - ) -> None: - if auth != "oui": - return - - self.dealers[identity] = config - self.logger.info(f"Dealer registered: {identity}") - - def _poll(self) -> None: # todo poller - """ - Start a blocking poller loop. - call self.stop() to stop. - """ - - while not self._stop_event.is_set(): - try: - identity, *data = self.router.recv_multipart() - event = ControlEvent.deserialize(data) - - if self.handler(identity, event) is False: - self.logger.warning("Could not execute control") - else: - self.logger.info("Control executed") - - except zmq.Again: - continue - except Exception as e: - self.logger.warning(e, exc_info=True) - - def start(self) -> None: - """ - Bind to endpoint. - Launch a poll loop thread. - """ - if self._started is True: - raise Exception("already started") - - self.router.bind(f"tcp://{self.hostname}:{self.port}") - self.router.setsockopt(zmq.RCVTIMEO, 1000) - self.logger.info("started") - - self._stop_event = threading.Event() - self._poll_thread = threading.Thread(target=self._poll) - self._poll_thread.start() - - self._started = True - - def stop(self) -> None: - if self._started is False: - raise Exception("not started") - - self._stop_event.set() - self._poll_thread.join(2.0) - - self.router.close(linger=0) - self._stop_event = None - self._poll_thread = None - - self._started = False - - def send_control( - self, dealer_identity: str, Control: Control, **kwargs: Mapping[str, Any] - ) -> None: - if self._started is False: - raise Exception("not started") - - if dealer_identity not in self.dealers: - raise ValueError(f"Dealer {dealer_identity} not registered") - - event = ControlEvent(cmd=Control, payload=kwargs) - self.logger.info(f"Sending Control {Control} to: {dealer_identity}") - self.router.send_multipart([dealer_identity] + event.serialize()) - - def send_controls(self, Control: Control, **kwargs: Mapping[str, Any]) -> None: - if self._started is False: - raise Exception("not started") - - for dealer_identity, _ in self.dealers.items(): - self.send_control(dealer_identity, Control, **kwargs) - - -class Dealer: # todo heartbeat - def __init__( - self, - hostname: str, - port: int, - handler: Callable[[Control], bool], - logger: Optional[logging.Logger] = None, - identity: Optional[str] = None, - ): - self.ctx = zmq.Context.instance() - self.dealer: zmq.Socket[bytes] = self.ctx.socket(zmq.DEALER) - - self.hostname = hostname - self.port = port - - self.handler = handler - self.identity = identity or str(uuid.uuid4()) # TODO agent name - - self._stop_event = None - self._poll_thread = None - self._started = False - - self.logger = logger or logging.getLogger(f"Dealer {self.identity}") - - def _poll(self) -> None: - """ - Start a blocking poller loop. - call self.stop() to stop. - """ - - while not self._stop_event.is_set(): - try: - data = self.dealer.recv_multipart() - Control = ControlEvent.deserialize(data) - - self.logger.info(f"Received Control {Control.cmd}") - result = self.handler(Control) - - # self.dealer.send_multipart([b"RESULT", result]) - except zmq.Again: - continue - except Exception as e: - self.logger.exception(e) - - def start(self) -> None: - """ - Connect to endpoint. - Launch a poll loop thread. - Send Register Control (wip). - """ - if self._started is True: - raise Exception("already started") - - self.dealer.connect(f"tcp://{self.hostname}:{self.port}") - self.dealer.setsockopt(zmq.IDENTITY, b"name") - self.dealer.setsockopt(zmq.RCVTIMEO, 1000) - self.logger.info(f"Dealer started: {self.identity}") - - self._stop_event = threading.Event() - self._poll_thread = threading.Thread(target=self._poll) - self._poll_thread.start() - - register = ControlEvent( - cmd=Control.REGISTER, - payload={ - "auth": "oui", - "name": self.identity, - "config": {"none": None}, - }, - ) - self.dealer.send_multipart(register.serialize()) - - self._started = True - - def stop(self) -> None: - if self._started is False: - raise Exception("not started") - - self._stop_event.set() - self._poll_thread.join(2.0) - - self.dealer.close(linger=0) - self._stop_event = None - self._poll_thread = None - - self._started = False diff --git a/src/core/zmq/event_proxy.py b/src/core/zmq/event_proxy.py deleted file mode 100644 index 0ac2514..0000000 --- a/src/core/zmq/event_proxy.py +++ /dev/null @@ -1,92 +0,0 @@ -import threading -from dataclasses import dataclass -from typing import Any, Mapping, Optional - -import zmq - -from src.core.events import ModuleEvent -from src.tools.logger import logging, setup_logger - - -@dataclass -class ZMQEventPorts: - xpub: str - xsub: str - - -class EventProxy: - def __init__( - self, - hostname: str, - connect_hostname: str, - xpub_port: int, - xsub_port: int, - logger: Optional[logging.Logger] = setup_logger("EventProxy"), - ): - - self.ctx = zmq.Context.instance() - self.xpub: zmq.Socket[bytes] = self.ctx.socket(zmq.XPUB) - self.xsub: zmq.Socket[bytes] = self.ctx.socket(zmq.XSUB) - - self.hostname = hostname - self.connect_hostname = connect_hostname - self.xpub_port = xpub_port - self.xsub_port = xsub_port - - self._started: bool = False - - self.logger = logger or logging.getLogger(__name__) - - def start(self, xpub_connect: bool, xsub_connect: bool): - """ - Connect to endpoint. - Launch a proxy thread. - """ - if self._started is True: - raise Exception("already started") - - if xpub_connect: - self.xpub.connect(f"tcp://{self.connect_hostname}:{self.xpub_port}") - else: - self.xpub.bind(f"tcp://{self.hostname}:{self.xpub_port}") - if xsub_connect: - self.xsub.connect(f"tcp://{self.connect_hostname}:{self.xsub_port}") - else: - self.xsub.bind(f"tcp://{self.hostname}:{self.xsub_port}") - - self.logger.info("Correctly initialized, starting proxy") - - self._proxy_thread = threading.Thread(target=self._proxy) - self._proxy_thread.start() - - self._started = True - - def _proxy(self) -> None: - try: - zmq.proxy(self.xsub, self.xpub) # todo capture to stop - except Exception as e: - self.logger.error(e) - - def stop(self) -> None: - if self._started is False: - raise Exception("not started") - - self.xsub.close(linger=0) # todo capture - self.xpub.close(linger=0) - - self._proxy_thread.join(2.0) - self._proxy_thread = None - - self._started = False - - def publish( - self, - topic: str, - **kwargs: Mapping[str, Any], - ) -> None: - if self._started is False: - raise Exception("not started") - - event = ModuleEvent(topic=topic, payload=kwargs) - self.xpub.send_multipart(event.serialize()) - self.logger.info(f"Publish: {topic} str") diff --git a/src/core/zmq/log_channel.py b/src/core/zmq/log_channel.py deleted file mode 100644 index a9bc8d3..0000000 --- a/src/core/zmq/log_channel.py +++ /dev/null @@ -1,173 +0,0 @@ -import json -import threading -import time -from typing import Any, Dict, Optional - -import zmq - -from src.tools.logger import ( - LevelFilter, - QueueListener, - logging, - mp, - setup_log_listener, - setup_logger, -) - - -def record_to_dict(record: logging.LogRecord) -> Dict[str, Any]: - return { - "name": record.name, - "levelno": record.levelno, - "levelname": record.levelname, - "message": record.getMessage(), - "created": record.created, - "asctime": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(record.created)), - "process": record.process, - "processName": record.processName, - "thread": record.thread, - "threadName": record.threadName, - "module": record.module, - "filename": record.filename, - "pathname": record.pathname, - "lineno": record.lineno, - "funcName": record.funcName, - } - - -def dict_to_record(data: Dict[str, Any]) -> logging.LogRecord: - record = logging.LogRecord( - name=data["name"], - level=data["levelno"], - pathname=data["pathname"], - lineno=data["lineno"], - msg=data["message"], - args=(), - exc_info=None, - func=data["funcName"], - ) - - # Restore metadata - record.created = data["created"] - record.process = data["process"] - record.processName = data["processName"] - record.thread = data["thread"] - record.threadName = data["threadName"] - record.module = data["module"] - record.filename = data["filename"] - - return record - - -class LogPuller: - def __init__( - self, - hostname: str, - port: int, - logger: Optional[logging.Logger] = setup_logger("LogPuller"), - ) -> None: - self.ctx = zmq.Context.instance() - self.pull: zmq.Socket[bytes] = self.ctx.socket(zmq.PULL) - - self.hostname = hostname - self.port = port - - self._stop_event = None - self._poll_thread = None - self._started = False - - self.logger = logger or logging.getLogger(__name__) - - def _poll(self) -> None: - while not self._stop_event.is_set(): - try: - payload = self.pull.recv() - - self.logger.handle(dict_to_record(json.loads(payload.decode()))) - except zmq.Again: - continue - except Exception as e: - self.logger.exception(e) - - def start(self) -> None: - if self._started is True: - raise Exception("already started") - - self.pull.bind(f"tcp://{self.hostname}:{self.port}") - self.pull.setsockopt(zmq.RCVTIMEO, 1000) - self.logger.info("started") - - self._stop_event = threading.Event() - self._poll_thread = threading.Thread(target=self._poll) - self._poll_thread.start() - - self._started = True - - def stop(self) -> None: - if self._started is False: - raise Exception("not started") - - self._stop_event.set() - self._poll_thread.join(2.0) - - self.pull.close(linger=0) - self._stop_event = None - self._poll_thread = None - - self._started = False - - -class LogPusher: - class LogPusherHandler(logging.Handler): - def __init__( - self, - hostname: str, - port: int, - ): - super().__init__() - self.ctx = zmq.Context.instance() - self.socket: zmq.Socket[bytes] = self.ctx.socket(zmq.PUSH) - - self.hostname = hostname - self.port = port - - def emit(self, record: logging.LogRecord) -> None: - try: - payload = json.dumps(record_to_dict(record)).encode() - self.socket.send(payload) - except Exception: - self.handleError(record) - except Exception: - self.handleError(record) - - def start(self) -> None: - self.socket.connect(f"tcp://{self.hostname}:{self.port}") - - def stop(self) -> None: - self.socket.close() - - def __init__( - self, - hostname: str, - port: int, - ): - - self.log_queue = mp.Queue() - - self.log_handler = self.LogPusherHandler(hostname, port) - self.level_filter = LevelFilter(logging.DEBUG) - self.log_listener: QueueListener = setup_log_listener( - self.log_queue, self.level_filter, self.log_handler - ) - - self.logger = setup_logger("LogPusher", log_queue=self.log_queue) - - def start(self) -> None: - self.log_handler.start() - self.log_listener.start() - - def stop(self): # todo _started - self.logger.info("stopping") - time.sleep(0.2) - self.log_listener.stop() - self.log_handler.stop() diff --git a/src/emotional_hub/input_analysis.py b/src/emotional_hub/input_analysis.py deleted file mode 100644 index 7391578..0000000 --- a/src/emotional_hub/input_analysis.py +++ /dev/null @@ -1,24 +0,0 @@ -import numpy as np -import torch -from transformers import AutoModelForAudioClassification, Wav2Vec2FeatureExtractor - -MODEL_NAME = "superb/hubert-large-superb-er" -model = AutoModelForAudioClassification.from_pretrained(MODEL_NAME) -feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(MODEL_NAME) - - -def predict_emotion(audio_np: np.ndarray, sr=16000): - if audio_np.dtype != np.float32: - audio_np = audio_np.astype(np.float32) - - inputs = feature_extractor( - audio_np, sampling_rate=sr, return_tensors="pt", padding=True - ) - - with torch.no_grad(): - logits = model(**inputs).logits - - predicted_id = torch.argmax(logits, dim=-1).item() - predicted_label = model.config.id2label[predicted_id] - - return predicted_label diff --git a/src/launch_agent.py b/src/launch_agent.py deleted file mode 100644 index dd91427..0000000 --- a/src/launch_agent.py +++ /dev/null @@ -1,44 +0,0 @@ -import argparse -import logging -import time - -import yaml - -from src.core.agent import Agent, AgentConfig -from src.modules.factory import build_module_factory - - -def load_config(path: str) -> AgentConfig: - with open(path) as f: - raw = yaml.safe_load(f) - - return AgentConfig.from_dict(raw) - - -def main() -> None: - parser = argparse.ArgumentParser(description="HuRI core") - parser.add_argument( - "--config", - required=True, - help="Path to HuRI config file (YAML)", - ) - - args = parser.parse_args() - - config = load_config(args.config) - - build_module_factory() - - agent = Agent(config) - time.sleep(0.1) - try: - agent.run() - except KeyboardInterrupt: - agent.stop() - except Exception as e: - logging.getLogger(__name__).error(e) - agent.stop() - - -if __name__ == "__main__": - main() diff --git a/src/launch_huri.py b/src/launch_huri.py deleted file mode 100644 index b1f9485..0000000 --- a/src/launch_huri.py +++ /dev/null @@ -1,44 +0,0 @@ -import argparse -import logging -import time - -import yaml - -from src.core.huri import HuRI, HuriConfig -from src.modules.factory import build_module_factory - - -def load_config(path: str) -> HuriConfig: - with open(path) as f: - raw = yaml.safe_load(f) - - return HuriConfig.from_dict(raw) - - -def main() -> None: - parser = argparse.ArgumentParser(description="HuRI core") - parser.add_argument( - "--config", - required=True, - help="Path to HuRI config file (YAML)", - ) - - args = parser.parse_args() - - config = load_config(args.config) - - build_module_factory() - - huri = HuRI(config) - time.sleep(0.1) - try: - huri.run() - except KeyboardInterrupt: - huri.stop() - except Exception as e: - logging.getLogger(__name__).error(e) - huri.stop() - - -if __name__ == "__main__": - main() diff --git a/src/modules/factory.py b/src/modules/factory.py deleted file mode 100644 index 74bba03..0000000 --- a/src/modules/factory.py +++ /dev/null @@ -1,33 +0,0 @@ -from typing import Any, Mapping - -from src.core.module import Module - -from .rag.mode_controller import ModeController -from .rag.rag import Rag -from .speech_to_text.record_speech import RecordSpeech -from .speech_to_text.speech_to_text import SpeechToText -from .textIO.input import TextInput -from .textIO.output import TextOutput - - -class ModuleFactory: - _registry = {} - - @classmethod - def register(cls, name: str, module_cls): - cls._registry[name] = module_cls - - @classmethod - def create(cls, name: str, args: Mapping[str, Any] | None = None) -> Module: - if name not in cls._registry: - raise ValueError(f"Unknown module '{name}'") - return cls._registry[name](**args) - - -def build_module_factory() -> None: - ModuleFactory.register("mic", RecordSpeech) - ModuleFactory.register("stt", SpeechToText) - ModuleFactory.register("inp", TextInput) - ModuleFactory.register("out", TextOutput) - ModuleFactory.register("rag", Rag) - ModuleFactory.register("mod", ModeController) diff --git a/src/modules/rag/__init__.py b/src/modules/rag/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/modules/rag/mode_controller.py b/src/modules/rag/mode_controller.py deleted file mode 100644 index e4b6c09..0000000 --- a/src/modules/rag/mode_controller.py +++ /dev/null @@ -1,37 +0,0 @@ -from enum import Enum - -from src.core.module import Module - - -class Modes(Enum): - LLM = 0 - CONTEXT = 1 - RAG = 2 - - -class ModeController(Module): - def __init__(self, default_mode: Modes = Modes.LLM): - super().__init__() - self.mode = default_mode - - def switchMode(self, mode: str) -> None: - self.mode = mode - - def processTextInput(self, text: str): - if "switch llm" in text.lower(): - self.switchMode(Modes.LLM) - elif "switch context" in text.lower(): - self.switchMode(Modes.CONTEXT) - elif "switch rag" in text.lower(): - self.switchMode(Modes.RAG) - elif "bye bye" in text.lower(): - self.publish("exit") # TODO handle (manager being a module) usefull ? - elif text.strip() == "": - return - else: - topic = f"{str(self.mode.name).lower()}.in" - self.publish(topic, text=text) - - def set_subscriptions(self): - self.subscribe("text.in", self.processTextInput) - self.subscribe("mode.switch", self.switchMode) diff --git a/src/modules/rag/rag.py b/src/modules/rag/rag.py deleted file mode 100644 index b083abd..0000000 --- a/src/modules/rag/rag.py +++ /dev/null @@ -1,97 +0,0 @@ -import json -import pathlib - -from langchain.chains import create_retrieval_chain -from langchain.chains.combine_documents import create_stuff_documents_chain -from langchain.text_splitter import RecursiveCharacterTextSplitter -from langchain_chroma import Chroma -from langchain_community.document_loaders import TextLoader -from langchain_core.documents import Document -from langchain_core.prompts import ChatPromptTemplate -from langchain_ollama.embeddings import OllamaEmbeddings -from langchain_ollama.llms import OllamaLLM -from langgraph.checkpoint.memory import MemorySaver - -from src.core.module import Module - - -class Rag(Module): - def __init__( - self, - model: str = "deepseek-v2:16b", - collectionName: str = "vectorStore", - vectorstorePath: str = "src/rag/vectorStore", - ): - super().__init__() - self.memory = MemorySaver() - self.embeddings = OllamaEmbeddings(model=model) - self.llm = OllamaLLM(model=model) - self.vectorstore = Chroma( - collection_name=collectionName, - embedding_function=self.embeddings, - persist_directory=vectorstorePath, - ) - self.textSplitter = RecursiveCharacterTextSplitter( - chunk_size=1000, chunk_overlap=200 - ) - self.retriever = self.vectorstore.as_retriever() - self.systemPrompt = "Conversation history:\n{history}\n\nContext:\n{context}" - self.prompt = ChatPromptTemplate.from_messages( - [ - ("system", self.systemPrompt), - ("human", "{input}"), - ] - ) - self.questionChain = create_stuff_documents_chain(self.llm, self.prompt) - self.qaChain = create_retrieval_chain(self.retriever, self.questionChain) - self.documents = [] - self.docs = [] - self.conversation = [] - self.conversation_log = {"conversation": []} - - def ragFill(self, text: str) -> None: - self.documents += self.textSplitter.split_documents( - [Document(page_content=text)] - ) - self.vectorstore.add_documents(self.documents) - - def ragLoad(self, folderPath: str, fileType: str) -> None: - if fileType == "txt": - for file in pathlib.Path(folderPath).rglob("*.txt"): - fileLoader = TextLoader(file_path=folderPath + "/" + file.name) - self.documents += self.textSplitter.split_documents(fileLoader.load()) - self.vectorstore.add_documents(self.documents) - - def ragQuestion(self, text: str) -> None: - question = text - self.logger.debug("question:", question) - history = "\n".join( - [ - f"Human: {qa['question']}\nAI: {qa['answer']}" - for qa in self.conversation_log["conversation"] - ] - ) - helpingContext = "Answer with just your message like in a conversation. " - question = helpingContext + question - self.logger.debug("full question:", question) - response = self.qaChain.invoke({"history": history, "input": question}) - answer = response["answer"] - self.logger.debug("answer:", answer) - self.conversation_log["conversation"].append( - {"question": question.split(helpingContext)[1:], "answer": answer} - ) - self.publish("llm.response", text=answer) - - def saveConversation(self, filename: str = "conversation_log.json"): - with open(filename, "w") as f: - json.dump(self.conversation_log, f, indent=4) - - def set_subscriptions(self) -> None: - self.subscribe("rag.load", self.ragLoad) - self.subscribe("llm.in", self.ragQuestion) - self.subscribe("rag.in", self.ragFill) - self.subscribe("rag.save", self.saveConversation) - - def run_module(self, stop_event=None) -> None: - self.ragLoad("tests/rag/docsRag", "txt") - super().run_module(stop_event) diff --git a/src/modules/textIO/input.py b/src/modules/textIO/input.py deleted file mode 100644 index 58ee567..0000000 --- a/src/modules/textIO/input.py +++ /dev/null @@ -1,17 +0,0 @@ -from src.core.module import Module - - -class TextInput(Module): - def set_subscriptions(self): - self.subscribe("std.in", self.stdin_to_text) - self.subscribe("std.out", lambda: print(">> ", end="", flush=True)) - - def stdin_to_text(self, data): - print(">> ", end="", flush=True) - if data == "": - return - self.publish("text.in", text=data) - - def run_module(self, stop_event=None): - print(">> ", end="", flush=True) - stop_event.wait() diff --git a/src/modules/textIO/output.py b/src/modules/textIO/output.py deleted file mode 100644 index c461e7a..0000000 --- a/src/modules/textIO/output.py +++ /dev/null @@ -1,10 +0,0 @@ -from src.core.module import Module - - -class TextOutput(Module): - def set_subscriptions(self) -> None: - self.subscribe("llm.response", self.print_response) - - def print_response(self, text: str) -> None: - print(f"\r<< {text}") - self.publish("std.out") From 2ac1433a92272959fdb3895823dd478fea0e25c3 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 16 Mar 2026 10:21:12 +0100 Subject: [PATCH 11/21] refacto(module): is now a basic processor with one input and output type --- src/core/module.py | 154 ++------------------------------------------- 1 file changed, 6 insertions(+), 148 deletions(-) diff --git a/src/core/module.py b/src/core/module.py index db8e08e..413cd7f 100644 --- a/src/core/module.py +++ b/src/core/module.py @@ -1,151 +1,9 @@ -import threading -from abc import ABC, abstractmethod -from multiprocessing.synchronize import Event -from typing import Any, Callable, Dict, Mapping, final +from typing import Any, Optional -import zmq -from src.core.events import ModuleEvent -from src.tools.logger import logging +class Module: + input_type: str + output_type: str - -class Module(ABC): - def __init__(self): - """Child Modules must call super.__init__() in their __init__() function.""" - self.ctx = None - self.pub_socket = None - self.connect_hostname = None - self.xpub_port = None - self.xsub_port = None - self.subs: Dict[str, zmq.Socket[bytes]] = {} - self.callbacks = {} - self._poller_running = False - self.poller = None - self.logger = logging.getLogger(__name__) - - @final - def _initialize(self) -> None: - """ - Called inside start_module() or manually before usage. - This function exist because ctx cannot be set in __init__, because of multi-processing. maybe deprecated - """ - self.ctx = zmq.Context() - self.pub_socket = self.ctx.socket(zmq.PUB) - self.pub_socket.connect(f"tcp://{self.connect_hostname}:{self.xpub_port}") - self.poller = threading.Thread(target=self._poll_loop, daemon=True) - self.set_subscriptions() - - @abstractmethod - def set_subscriptions(self) -> None: - """Child module must define this funcction with subscriptions""" - ... - - @final - def subscribe(self, topic: str, callback: Callable) -> None: - sub_socket = self.ctx.socket(zmq.SUB) - sub_socket.connect(f"tcp://{self.connect_hostname}:{self.xsub_port}") - sub_socket.setsockopt_string(zmq.SUBSCRIBE, topic) - self.subs[topic] = sub_socket - self.callbacks[topic] = callback - self.logger.info(f"Subscribe: {topic}") - - @final - def publish( - self, - topic: str, - **kwargs: Mapping[str, Any], - ) -> None: - """ - Will publish a ModuleEvent to other modules. - - :param topic: the topic of the event - :type topic: str - :param kwargs: kwargs must be named as the receiving module's callbacks - :type kwargs: Mapping[str, Any] - """ - event = ModuleEvent(topic=topic, payload=kwargs) - self.logger.info(f"Publish: {topic} {kwargs.keys()}") - self.pub_socket.send_multipart(event.serialize()) - - @final - def _start_polling(self) -> None: - self._poller_running = True - self.poller.start() - - @final - def _poll_loop(self) -> None: - poller = zmq.Poller() - for sub in self.subs.values(): - poller.register(sub, zmq.POLLIN) - - while self._poller_running: - events = dict(poller.poll(100)) - for _, sub in self.subs.items(): - if sub in events: - data = sub.recv_multipart() - event = ModuleEvent.deserialize(data) - - self.logger.info(f"Receive: {event.topic} {event.payload.keys()}") - self.callbacks[event.topic](**event.payload) - - @final - def start_module( - self, - connect_hostname: str, - xpub_port: int, - xsub_port: int, - stop_event: Event = None, - ) -> None: - self.connect_hostname = connect_hostname - self.xpub_port = xpub_port - self.xsub_port = xsub_port - self._initialize() - if self.subs != {}: - self._start_polling() - try: - self.run_module(stop_event) - except KeyboardInterrupt: - self.logger.info("Ctrl+C pressed, exiting cleanly") - except Exception as e: - self.logger.error(e) - finally: - self.stop_module() - - @final - def stop_module(self) -> None: - """Stop the module gracefully.""" - - if self._poller_running: - self._poller_running = False - self.poller.join() - - for topic, sub in self.subs.items(): - try: - sub.close(0) - except Exception as e: - self.logger.error(f"Error closing SUB socket for '{topic}': {e}") - - self.subs.clear() - self.callbacks.clear() - - try: - self.pub_socket.close(0) - except Exception as e: - self.logger.error(f"Error closing SUB socket for '{topic}': {e}") - - try: - self.ctx.term() - except Exception as e: - self.logger.error(f"Error terminating ZMQ context: {e}") - - self.logger.info("Module stopped gracefully.") - - def run_module(self, stop_event: Event = None) -> None: - """Child modules override this instead of run(). Default: idle wait.""" - if stop_event: - stop_event.wait() - - @final - def set_custom_logger(self, logger) -> None: - """The default logger in set in __init__.""" - self.logger = logger + async def process(self, _) -> Optional[Any]: + raise NotImplementedError From d6eae24016828a7017587c1ffe85afc72ba6e95b Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 16 Mar 2026 10:22:12 +0100 Subject: [PATCH 12/21] feat(session+EventGraph): has an EventGraph that connects modules and publish events --- src/core/events.py | 76 +++++++++++++-------------------------------- src/core/session.py | 12 +++++++ 2 files changed, 34 insertions(+), 54 deletions(-) create mode 100644 src/core/session.py diff --git a/src/core/events.py b/src/core/events.py index d3ca555..0d3653d 100644 --- a/src/core/events.py +++ b/src/core/events.py @@ -1,65 +1,33 @@ -import json -from dataclasses import dataclass -from enum import Enum -from typing import Any, Dict, List, Mapping, Sequence +import asyncio +from collections import defaultdict +from .module import Module -@dataclass -class ModuleEvent: - """ - Inter-Module communication event - Module subscribe to a topic and link a callback. - The payload must correspond to a mapping of params of the callback. - """ - topic: str - payload: Mapping[str, Any] +class EventGraph: - @classmethod - def from_dict(cls, raw: Dict): - return cls(topic=raw["topic"], payload=raw["payload"]) + def __init__(self): - def serialize(self) -> Sequence: - return [self.topic.encode(), json.dumps(self.payload).encode()] + self.subscribers = defaultdict(list) - @classmethod - def deserialize(cls, raw: List[bytes]): - topic, payload = raw - return cls(topic=topic.decode(), payload=json.loads(payload.decode())) + def register(self, module: Module): + self.subscribers[module.input_type].append(module) + async def publish(self, event_topic, data): + for module in self.subscribers[event_topic]: + asyncio.create_task(self._run(module, data)) -class Control(Enum): - # Agent -> HuRI - REGISTER = "REGISTER" # send auth + agent config - HEARTBEAT = "HEARTBEAT" # send agent heartbeat + modified config - EXITED = "EXITED" # send exited info - # HuRI -> Agents - AUTH_OK = "AUTH_OK" # send huri config (after) - START = "START" # start all modules - STOP = "STOP" # stop all modules - START_MODULE = "START_MODULE" # start specific modules - STOP_MODULE = "STOP_MODULE" # stop specific modules - EXIT = "EXIT" # exit agent + async def _run(self, module: Module, data): + result = module.process(data) -@dataclass -class ControlEvent: - ctrl: Control - payload: Mapping[str, Any] + if hasattr(result, "__aiter__"): + async for item in result: + if item is None: + continue + await self.publish(module.output_type, item) - @classmethod - def from_dict(cls, raw: Dict): - return cls(ctrl=Control(raw["ctrl"]), payload=raw["payload"]) - - def serialize(self) -> Sequence: - print(self.ctrl.value) - - return [ - self.ctrl.value.encode(), - json.dumps(self.payload).encode(), - ] - - @classmethod - def deserialize(cls, raw: List[bytes]): - ctrl, payload = raw - return cls(ctrl=Control(ctrl.decode()), payload=json.loads(payload.decode())) + else: + value = await result + if value is not None: + await self.publish(module.output_type, value) diff --git a/src/core/session.py b/src/core/session.py new file mode 100644 index 0000000..85d6712 --- /dev/null +++ b/src/core/session.py @@ -0,0 +1,12 @@ +from .events import EventGraph + + +class Session: + def __init__(self, modules): + self.event_graph = EventGraph() + + for module in modules: + self.event_graph.register(module) + + async def publish(self, topic, data): + await self.event_graph.publish(topic, data) From efe6cf839936b75f4116904066b66534a8ee2c04 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 16 Mar 2026 10:27:20 +0100 Subject: [PATCH 13/21] refacto(huri): huri is now a fastapi ray server, launching ray deployment (connect via websocket) --- src/core/huri.py | 167 ++++++++++++----------------------------------- 1 file changed, 40 insertions(+), 127 deletions(-) diff --git a/src/core/huri.py b/src/core/huri.py index 664aab4..6883687 100644 --- a/src/core/huri.py +++ b/src/core/huri.py @@ -1,136 +1,49 @@ -import sys -import threading -from dataclasses import dataclass - -from src.tools.logger import setup_logger +import uuid from typing import Dict -from .zmq.control_channel import Router -from src.core.events import Control, ControlEvent -from .zmq.event_proxy import EventProxy -from .zmq.log_channel import LogPuller -from src.core.agent import AgentConfig - - -@dataclass -class RouterConfig: - port: int - - -@dataclass -class EventProxyConfig: - xsub: int - xpub: int - -@dataclass -class LogPullerConfig: - port: int +from fastapi import FastAPI, WebSocket +from ray import serve +from ray.serve import handle +from src.modules.speech_to_text.record_speech import MIC +from src.modules.speech_to_text.speech_to_text import STT +from src.modules.utils.sender import Sender -@dataclass -class HuriConfig: - hostname: str - router: RouterConfig - event_proxy: EventProxyConfig - log_puller: LogPullerConfig +from .session import Session - @classmethod - def from_dict(cls, raw: dict): - return cls( - hostname=raw["hostname"], - router=RouterConfig(**raw["router"]), - event_proxy=EventProxyConfig(**raw["event-proxy"]), - log_puller=LogPullerConfig(**raw["log-puller"]), - ) - - -@dataclass -class AgentStatus: - update_time: int - config: AgentConfig - status: Dict[str, int] - - @classmethod - def from_dict(cls, raw: dict): - return cls( - update_time=raw["update_time"], - config=AgentConfig.from_dict(raw["config"]), - status=raw["status"], - ) +app = FastAPI() +@serve.deployment +@serve.ingress(app) class HuRI: - """Wait for Agent to connect, handle module communication and Logging""" - - def __init__(self, config: HuriConfig) -> None: + def __init__(self, config, handles: Dict[str, handle.DeploymentHandle]) -> None: self.config = config - - self.router = Router(config.hostname, config.router.port, self._control_handler) - self.event_proxy = EventProxy( - config.hostname, "", config.event_proxy.xpub, config.event_proxy.xsub - ) - self.log_channel = LogPuller(config.hostname, config.log_puller.port) - - self.agents: Dict[bytes, AgentStatus] = {} - - self.stop_event = threading.Event() - - self.logger = setup_logger("HuRI") - - def _control_handler( - self, identity: bytes, event: ControlEvent - ) -> bool: # todo data race ? - match event.ctrl: - case Control.REGISTER: - if event.payload["auth"] != "oui": # todo wip - return False - self.router.dealers[identity] = True - self.agents[identity] = AgentStatus.from_dict(**event.payload["agent"]) - - self.router.send_control( - identity, Control.AUTH_OK, self.config - ) # todo send all config ? - self.router.send_control(identity, Control.START) - return True - case Control.HEARTBEAT: - # previous_config = self.router.dealers[identity] - # self.agents[identity] = todo - # todo AgentStatus concat - return True - case Control.EXITED: - del self.router.dealers[identity] - del self.agents[identity] - case _: - return False # todo log - - def run(self) -> None: - """ - Start LogPuller. - Start Router. - Start EventProxy. - Then loop over RobotShell.cmdloop() to use HuRI commands. - Then, when exit is requested, call stop() - """ - - "Used to handle log filtering and displaying" - self.log_channel.start() - "Used to handle Agent registration and control" - self.router.start() - "Used to handle inter-module communication, though events" - self.event_proxy.start(False, False) - - if not sys.stdin.isatty(): - self.stop_event.wait() - return - - from src.core.shell import RobotShell - - RobotShell(self).cmdloop() - - self.stop() - - def stop(self) -> None: - self.router.stop() - self.event_proxy.stop() - self.log_channel.stop() - print("Fully stopped") + self.handles = handles + + self.clients: Dict[str, Session] = {} + + @app.websocket("/session") + async def run_session(self, ws: WebSocket): + await ws.accept() + + modules = [ + STT(self.handles["stt"]), + MIC(5), + Sender(ws, "text"), + ] + session_id = str(uuid.uuid4()) + + self.clients[session_id] = Session(modules) + + async def receive_loop(session: Session, ws: WebSocket): + while True: + msg = await ws.receive() + if "bytes" in msg: + chunk = msg["bytes"] + await session.publish("chunk", chunk) + # else: + # data = msg + # await session.publish(data["type"], data["data"]) + + await receive_loop(self.clients[session_id], ws) From 90b6d01640b29aad1ff712c7a0009b0a5ebeef6d Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 16 Mar 2026 10:28:10 +0100 Subject: [PATCH 14/21] evol(launch_huri): launch deployement and bind them to main ray server (huri) --- src/launch_huri.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 src/launch_huri.py diff --git a/src/launch_huri.py b/src/launch_huri.py new file mode 100644 index 0000000..6e12bcb --- /dev/null +++ b/src/launch_huri.py @@ -0,0 +1,26 @@ +import time + +import ray + +from src.core.huri import Dict, HuRI, handle, serve +from src.modules.speech_to_text.speech_to_text import STTHandle + + +def main() -> None: + ray.init() + + services: Dict[str, handle.DeploymentHandle] = { + "stt": STTHandle.bind(), + } + app = HuRI.bind("", services) + time.sleep(0.1) + try: + serve.run(app, name="HuRI", blocking=True) + except KeyboardInterrupt: + return + except Exception as e: + ray.logger.error(e) + + +if __name__ == "__main__": + main() From 3d6936cc45d9e82428c1b39888741e78f6c5dccf Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 16 Mar 2026 10:29:43 +0100 Subject: [PATCH 15/21] wip(client): connect to huri via websocket, stream audio and receive huri's ou tput --- src/client.py | 123 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 src/client.py diff --git a/src/client.py b/src/client.py new file mode 100644 index 0000000..d3d1dd1 --- /dev/null +++ b/src/client.py @@ -0,0 +1,123 @@ +# import sounddevice as sd +# import websocket +# import numpy as np +# import time + +# # import microphone +# # import stt +# import ray +# from abc import ABC, abstractmethod +# from huri import HuRI +# from ray.actor import ActorHandle +# from fastapi import WebSocket + + +# class Client(ABC): +# def __init__(self): + +# # huri = ray.get_actor(name="HuRI", namespace="h") + +# # c = ray.get(huri.new_client.remote()) +# # self.session = ray.get_actor(name=c, namespace="h") + +# @abstractmethod +# def input(self): +# pass + +# # @abstractmethod +# # def output(self): +# # pass + +# def ingestion(self, type): +# self.session.push_in.remote(self.input(), type) + +# def outgestion(self): +# return ray.get(self.session.get_response.remote()) + + +# class AudioClient(Client): +# def __init__(self): +# super().__init__() +# self.CHUNK_DURATION: float = 0.3 +# self.SAMPLE_RATE: int = 16000 + +# def input(self): +# chunk: np.ndarray = sd.rec( +# int(self.CHUNK_DURATION * self.SAMPLE_RATE), +# samplerate=self.SAMPLE_RATE, +# channels=1, +# dtype="int16", +# ).ravel() +# sd.wait() +# return chunk + + +# class TerminalClient(Client): +# def __init__(self): +# super().__init__() +# self.CHUNK_DURATION: float = 0.3 +# self.SAMPLE_RATE: int = 16000 + +# def input(self): +# text = input() +# return text + + +# def main(): +# ray.init() + +# client = TerminalClient() +# while True: +# client.ingestion() +# print(client.outgestion()) + + +import asyncio +import time +import wave + +import fastapi +import numpy as np +import sounddevice as sd + +import websockets + +SERVER_URL = "ws://localhost:8000/session" +CHUNK_DURATION = 1 +SAMPLE_RATE = 16000 + + +async def stream_audio(): + + async with websockets.connect(SERVER_URL) as ws: + print("Connected to server") + + async def receive(ws: websockets.ClientConnection): + while True: + text = await ws.recv() + print("received:", text) + + async def send(ws: websockets.ClientConnection): + loop = asyncio.get_running_loop() + + queue = asyncio.Queue() + + def callback(indata: np.ndarray, frames, time, status): + loop.call_soon_threadsafe(queue.put_nowait, indata.copy()) + + with sd.InputStream( + samplerate=SAMPLE_RATE, + channels=1, + dtype="int16", + callback=callback, + blocksize=int(CHUNK_DURATION * SAMPLE_RATE), + ): + while True: + chunk = await queue.get() + await ws.send(chunk.tobytes()) + + await asyncio.gather(receive(ws), send(ws)) + + +if __name__ == "__main__": + asyncio.run(stream_audio()) From 047a655205ad99a18683afaccd71a04046d43134 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 16 Mar 2026 10:30:17 +0100 Subject: [PATCH 16/21] evol(MIC+STT): changes for new archi --- src/modules/speech_to_text/record_speech.py | 102 +++---------------- src/modules/speech_to_text/speech_to_text.py | 70 +++++++------ 2 files changed, 50 insertions(+), 122 deletions(-) diff --git a/src/modules/speech_to_text/record_speech.py b/src/modules/speech_to_text/record_speech.py index 3d87edc..655559d 100644 --- a/src/modules/speech_to_text/record_speech.py +++ b/src/modules/speech_to_text/record_speech.py @@ -6,102 +6,26 @@ import numpy as np import sounddevice as sd -from src.core.module import Event, Module +from src.core.module import Module +# from src.core.reactive_layer.IOprocessor import IOprocessor +# from src.core.reactive_layer.IOgestion import IOgestion + + +class MIC(Module): + input_type = "chunk" + output_type = "voice" -class RecordSpeech(Module): def __init__( self, threshold: int = 0, - silence_duration: float = 1.0, - chunk_duration: float = 0.5, - sample_rate: int = 16000, ): super().__init__() self.THRESHOLD: int = threshold - self.SILENCE_DURATION: float = silence_duration - self.CHUNK_DURATION: float = chunk_duration - self.SAMPLE_RATE: int = sample_rate - self.running: bool = False - self.audio_queue: queue.Queue = queue.Queue() - self.transcriptions: queue.Queue = queue.Queue() - self.pause_record = threading.Semaphore(1) - self.audio_to_process = threading.Semaphore(0) - self.prompt_available = threading.Semaphore(0) - self.noise_profile: np.ndarray - - def reduce_noise(self, chunk: np.ndarray) -> np.ndarray: - if np.abs(chunk).mean() <= self.THRESHOLD: - return chunk - - return np.clip(chunk - self.noise_profile, -32768, 32767).astype(np.int16) - - def record_chunk(self) -> np.ndarray: - self.pause_record.acquire() - chunk: np.ndarray = sd.rec( - int(self.CHUNK_DURATION * self.SAMPLE_RATE), - samplerate=self.SAMPLE_RATE, - channels=1, - dtype="int16", - ).ravel() - sd.wait() - self.pause_record.release() - return self.reduce_noise(chunk) - - def calculate_noise_level(self) -> None: - self.logger.info("Listening for 10 seconds to calculate noise level...") - noise_chunk: np.ndarray = sd.rec( - int(10 * self.SAMPLE_RATE), - samplerate=self.SAMPLE_RATE, - channels=1, - dtype="int16", - ).ravel() - sd.wait() - self.noise_profile = noise_chunk.mean(axis=0) - self.THRESHOLD = np.abs(self.reduce_noise(noise_chunk)).mean() - self.logger.info(f"Threshold: {self.THRESHOLD}") - - def record_audio(self, starting_chunk, stop_event: Event = None) -> None: - buffer: List[np.ndarray] = [starting_chunk] - silence_start: Optional[float] = None - - while stop_event is None or not stop_event.is_set(): - chunk = self.record_chunk() - buffer.append(chunk) - - if np.abs(chunk).mean() <= self.THRESHOLD: - if silence_start is None: - silence_start = time.time() - elif time.time() - silence_start >= self.SILENCE_DURATION: - if buffer == []: - break - speech = np.concatenate(buffer, axis=0) - self.publish("speech.in", buffer=speech.tobytes()) - break - else: - silence_start = None - - def set_subscriptions(self) -> None: - self.subscribe("speech.in.pause", lambda: self.pause()) - self.subscribe("speech.in.resume", lambda: self.pause(False)) - - def run_module(self, stop_event: Event = None) -> None: - if not self.THRESHOLD: - self.calculate_noise_level() - else: - self.noise_profile = np.zeros( - int(self.CHUNK_DURATION * self.SAMPLE_RATE), dtype=np.int16 - ) - - while stop_event is None or not stop_event.is_set(): - chunk: np.ndarray = self.record_chunk() - - if np.abs(chunk).mean() > self.THRESHOLD: - self.record_audio(chunk, stop_event) - def pause(self, true: bool = True) -> None: - if true: - self.pause_record.acquire() - else: - self.pause_record.release() + async def process(self, data: bytes) -> np.ndarray: + audio_array = np.frombuffer(data, dtype=np.int16) + if np.abs(audio_array).mean() > self.THRESHOLD: + audio_array = audio_array.astype(np.float32) / 32768.0 + return audio_array diff --git a/src/modules/speech_to_text/speech_to_text.py b/src/modules/speech_to_text/speech_to_text.py index 6c087b0..bcae6c5 100644 --- a/src/modules/speech_to_text/speech_to_text.py +++ b/src/modules/speech_to_text/speech_to_text.py @@ -1,50 +1,54 @@ +import asyncio import queue import threading +from typing import Optional import numpy as np import whisper +from ray import serve +from ray.serve import handle from src.core.module import Module -class SpeechToText(Module): +@serve.deployment(num_replicas=5) +class STTHandle: def __init__( self, - model_name: str = "base.en", - device: str = "cpu", - sample_rate: int = 16000, + model_name: str = "base", ): super().__init__() - print(model_name) - if device == "cpu": - import warnings - - warnings.filterwarnings( - "ignore", message="FP16 is not supported on CPU; using FP32 instead" - ) - self.model: whisper.Whisper = whisper.load_model(model_name, device=device) - self.SAMPLE_RATE: int = sample_rate - self.running: bool = False - self.audio_queue: queue.Queue = queue.Queue() - self.transcriptions: queue.Queue = queue.Queue() - self.pause_record = threading.Semaphore(1) - self.audio_to_process = threading.Semaphore(0) - self.prompt_available = threading.Semaphore(0) - self.noise_profile: np.ndarray - - def process_audio(self, buffer: bytes) -> None: - if not buffer: - return - - audio_array = np.frombuffer(buffer, dtype=np.int16) - audio_array = audio_array.astype(np.float32) / 32768.0 - - result: dict = self.model.transcribe(audio_array, language="en") + + self.model: whisper.Whisper = whisper.load_model(model_name) + + async def process(self, audio_array: np.ndarray) -> Optional[str]: + result: dict = self.model.transcribe( + audio_array.copy(), condition_on_previous_text=False, fp16=False + ) result["text"] = result["text"].strip() if not result["text"] or result["text"] == "": - return + return None + + return result["text"] + + +class STT(Module): + input_type = "voice" + output_type = "text" + + def __init__(self, stt_handle: handle.DeploymentHandle[STTHandle]): + self.stt = stt_handle - self.publish("text.in", text=result["text"]) + self.chunks = [] + self.running = False - def set_subscriptions(self) -> None: - self.subscribe("speech.in", self.process_audio) + async def process(self, audio: np.ndarray) -> Optional[str]: + self.chunks.append(audio) + if self.running is True: + return None + self.running = True + text = await self.stt.process.remote(np.concatenate(self.chunks, axis=0)) + self.chunks.clear() + self.running = False + print(text) + return text From 78513e9d7c0a2d51436fc7c57a3e483e7602e85e Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 16 Mar 2026 10:30:45 +0100 Subject: [PATCH 17/21] add(Sender): module that sends huri output to client --- src/modules/utils/__init__.py | 0 src/modules/utils/sender.py | 18 ++++++++++++++++++ 2 files changed, 18 insertions(+) create mode 100644 src/modules/utils/__init__.py create mode 100644 src/modules/utils/sender.py diff --git a/src/modules/utils/__init__.py b/src/modules/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/modules/utils/sender.py b/src/modules/utils/sender.py new file mode 100644 index 0000000..4d142f6 --- /dev/null +++ b/src/modules/utils/sender.py @@ -0,0 +1,18 @@ +from src.core.module import Module +from src.core.huri import WebSocket +from typing import Any + + +class Sender(Module): + """Module to send output data to the client""" + + input_type = ... + output_type = None + + def __init__(self, ws: WebSocket, type: str): + super().__init__() + self.ws: WebSocket = ws + self.input_type = type + + async def process(self, data: Any): + await self.ws.send_text(data) From 48e750bf6ddfd7ab58b0b46e45d347a85c88efa0 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 16 Mar 2026 10:56:47 +0100 Subject: [PATCH 18/21] remove(logger): deprecated logger --- src/tools/logger.py | 104 -------------------------------------------- 1 file changed, 104 deletions(-) delete mode 100644 src/tools/logger.py diff --git a/src/tools/logger.py b/src/tools/logger.py deleted file mode 100644 index 1b7d5c5..0000000 --- a/src/tools/logger.py +++ /dev/null @@ -1,104 +0,0 @@ -import logging -import multiprocessing as mp -from logging.handlers import QueueHandler, QueueListener -from typing import IO, Dict, Optional - - -def setup_handler( - stream: Optional[IO] = None, - filename: Optional[str] = None, - log_queue: Optional[mp.Queue] = None, - formatter: logging.Formatter = logging.Formatter( - "[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s", datefmt="%H:%M:%S" - ), -) -> logging.Handler: - if stream is not None: - handler = logging.StreamHandler(stream) - elif filename is not None: - handler = logging.FileHandler(filename) - elif log_queue is not None: - return QueueHandler(log_queue) - else: - # Default: stdout - handler = logging.StreamHandler() - - handler.setFormatter(formatter) - - return handler - - -def setup_logger( - name: str, - level: int = logging.DEBUG, - stream: Optional[IO] = None, - filename: Optional[str] = None, - log_queue: Optional[mp.Queue] = None, -) -> logging.Logger: - """ - Creates and returns a logger with optional output: - - log_queue (multiprocessing-safe queue, preferred for child processes) - - stream (e.g., sys.stdout) - - filename (log file) - - defaults to stdout if none is given - """ - logger = logging.getLogger(name) - logger.setLevel(level) - if log_queue: - logger.propagate = False - - logger.handlers.clear() - handler = setup_handler(stream, filename, log_queue) - logger.addHandler(handler) - - return logger - - -class LevelFilter(logging.Filter): - def __init__(self, root_level: int = logging.WARNING): - self.root_level = root_level - self.log_levels: Dict[str, int] = {} - - def filter(self, record: logging.LogRecord) -> bool: - """the root level has priority over custom levels""" - level = self.log_levels.get(record.name, self.root_level) - - return self.root_level <= record.levelno and level <= record.levelno - - def set_root_level(self, level: int) -> None: - self.root_level = level - - def add_level(self, name: str) -> None: - self.log_levels[name] = self.root_level - - def set_level(self, name: str, level: int) -> None: - if name not in self.log_levels: - raise ValueError(f"{name} has no linked log level") - self.log_levels[name] = level - - def set_levels(self, level: int) -> None: - self.set_root_level(level) - for name in self.log_levels: - self.set_level(name, level) - - def del_level(self, name: str) -> None: - del self.log_levels[name] - - -def setup_log_listener( - log_queue: mp.Queue, - filter: logging.Filter, - custom_handler: Optional[logging.Handler] = None, -) -> QueueListener: - """ - Starts a central logging listener that reads LogRecords from a queue - and emits them using normal loggers/handlers. - """ - formatter = logging.Formatter( - "[%(asctime)s] [%(processName)s] [%(name)s] [%(levelname)s] %(message)s", - datefmt="%H:%M:%S", - ) - handler = custom_handler or setup_handler(formatter=formatter) - handler.addFilter(filter) - - listener = QueueListener(log_queue, handler) - return listener From 4bc4b6bb357b8fcec6de06e2eca3be4953a627b4 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 16 Mar 2026 11:12:34 +0100 Subject: [PATCH 19/21] feat(linter): added linter config file + Makefile (make lint) --- Makefile | 10 ++++++++++ pyproject.toml | 31 +++++++++++++++++++++++++++++++ requirements.txt | 7 +++++++ 3 files changed, 48 insertions(+) create mode 100644 Makefile create mode 100644 pyproject.toml diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b1c93cf --- /dev/null +++ b/Makefile @@ -0,0 +1,10 @@ +lint: + black . + isort . + flake8 . + mypy . + +test: + pytest + +check: lint test \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..476f4a8 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,31 @@ +[tool.black] +line-length = 88 +target-version = ["py310"] + +[tool.isort] +profile = "black" +line_length = 88 +multi_line_output = 3 +include_trailing_comma = true +skip_gitignore = true + +[tool.flake8] +max-line-length = 88 +extend-ignore = [] +exclude = """ + __pycache__ + venv + .venv +""" + +[tool.mypy] +python_version = "3.10" +ignore_missing_imports = true +strict_optional = true +warn_unused_ignores = true +warn_return_any = true +warn_unused_configs = true + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py"] diff --git a/requirements.txt b/requirements.txt index 863a44c..dbc30d0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,10 @@ +black +isort +mypy +flake8 +flake8-toml-config +pytest + deepfilternet sounddevice soundfile From d5071e1a5f8a277d50609005b1843c8b8342a582 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 16 Mar 2026 11:12:58 +0100 Subject: [PATCH 20/21] fix(linter): make lint --- src/client.py | 80 +------------------- src/core/module.py | 4 +- src/launch_huri.py | 4 +- src/modules/speech_to_text/record_speech.py | 16 ++-- src/modules/speech_to_text/speech_to_text.py | 12 +-- src/modules/utils/sender.py | 7 +- 6 files changed, 18 insertions(+), 105 deletions(-) diff --git a/src/client.py b/src/client.py index d3d1dd1..6c439ee 100644 --- a/src/client.py +++ b/src/client.py @@ -1,85 +1,7 @@ -# import sounddevice as sd -# import websocket -# import numpy as np -# import time - -# # import microphone -# # import stt -# import ray -# from abc import ABC, abstractmethod -# from huri import HuRI -# from ray.actor import ActorHandle -# from fastapi import WebSocket - - -# class Client(ABC): -# def __init__(self): - -# # huri = ray.get_actor(name="HuRI", namespace="h") - -# # c = ray.get(huri.new_client.remote()) -# # self.session = ray.get_actor(name=c, namespace="h") - -# @abstractmethod -# def input(self): -# pass - -# # @abstractmethod -# # def output(self): -# # pass - -# def ingestion(self, type): -# self.session.push_in.remote(self.input(), type) - -# def outgestion(self): -# return ray.get(self.session.get_response.remote()) - - -# class AudioClient(Client): -# def __init__(self): -# super().__init__() -# self.CHUNK_DURATION: float = 0.3 -# self.SAMPLE_RATE: int = 16000 - -# def input(self): -# chunk: np.ndarray = sd.rec( -# int(self.CHUNK_DURATION * self.SAMPLE_RATE), -# samplerate=self.SAMPLE_RATE, -# channels=1, -# dtype="int16", -# ).ravel() -# sd.wait() -# return chunk - - -# class TerminalClient(Client): -# def __init__(self): -# super().__init__() -# self.CHUNK_DURATION: float = 0.3 -# self.SAMPLE_RATE: int = 16000 - -# def input(self): -# text = input() -# return text - - -# def main(): -# ray.init() - -# client = TerminalClient() -# while True: -# client.ingestion() -# print(client.outgestion()) - - import asyncio -import time -import wave -import fastapi import numpy as np import sounddevice as sd - import websockets SERVER_URL = "ws://localhost:8000/session" @@ -100,7 +22,7 @@ async def receive(ws: websockets.ClientConnection): async def send(ws: websockets.ClientConnection): loop = asyncio.get_running_loop() - queue = asyncio.Queue() + queue: asyncio.Queue = asyncio.Queue() def callback(indata: np.ndarray, frames, time, status): loop.call_soon_threadsafe(queue.put_nowait, indata.copy()) diff --git a/src/core/module.py b/src/core/module.py index 413cd7f..0ae9cad 100644 --- a/src/core/module.py +++ b/src/core/module.py @@ -2,8 +2,8 @@ class Module: - input_type: str - output_type: str + input_type: Optional[str] + output_type: Optional[str] async def process(self, _) -> Optional[Any]: raise NotImplementedError diff --git a/src/launch_huri.py b/src/launch_huri.py index 6e12bcb..729ee79 100644 --- a/src/launch_huri.py +++ b/src/launch_huri.py @@ -10,9 +10,9 @@ def main() -> None: ray.init() services: Dict[str, handle.DeploymentHandle] = { - "stt": STTHandle.bind(), + "stt": STTHandle.bind(), # type: ignore[attr-defined] } - app = HuRI.bind("", services) + app = HuRI.bind("", services) # type: ignore[attr-defined] time.sleep(0.1) try: serve.run(app, name="HuRI", blocking=True) diff --git a/src/modules/speech_to_text/record_speech.py b/src/modules/speech_to_text/record_speech.py index 655559d..0f28755 100644 --- a/src/modules/speech_to_text/record_speech.py +++ b/src/modules/speech_to_text/record_speech.py @@ -1,16 +1,9 @@ -import queue -import threading -import time -from typing import List, Optional +from typing import Optional import numpy as np -import sounddevice as sd from src.core.module import Module -# from src.core.reactive_layer.IOprocessor import IOprocessor -# from src.core.reactive_layer.IOgestion import IOgestion - class MIC(Module): input_type = "chunk" @@ -24,8 +17,9 @@ def __init__( self.THRESHOLD: int = threshold - async def process(self, data: bytes) -> np.ndarray: + async def process(self, data: bytes) -> Optional[np.ndarray]: audio_array = np.frombuffer(data, dtype=np.int16) if np.abs(audio_array).mean() > self.THRESHOLD: - audio_array = audio_array.astype(np.float32) / 32768.0 - return audio_array + audio_array_float = audio_array.astype(np.float32) / 32768.0 + return audio_array_float + return None diff --git a/src/modules/speech_to_text/speech_to_text.py b/src/modules/speech_to_text/speech_to_text.py index bcae6c5..7925f3c 100644 --- a/src/modules/speech_to_text/speech_to_text.py +++ b/src/modules/speech_to_text/speech_to_text.py @@ -1,7 +1,4 @@ -import asyncio -import queue -import threading -from typing import Optional +from typing import Any, List, Optional import numpy as np import whisper @@ -21,7 +18,7 @@ def __init__( self.model: whisper.Whisper = whisper.load_model(model_name) - async def process(self, audio_array: np.ndarray) -> Optional[str]: + async def process(self, audio_array: np.ndarray) -> Optional[Any]: result: dict = self.model.transcribe( audio_array.copy(), condition_on_previous_text=False, fp16=False ) @@ -39,10 +36,10 @@ class STT(Module): def __init__(self, stt_handle: handle.DeploymentHandle[STTHandle]): self.stt = stt_handle - self.chunks = [] + self.chunks: List[np.ndarray] = [] self.running = False - async def process(self, audio: np.ndarray) -> Optional[str]: + async def process(self, audio: np.ndarray) -> Optional[Any]: self.chunks.append(audio) if self.running is True: return None @@ -50,5 +47,4 @@ async def process(self, audio: np.ndarray) -> Optional[str]: text = await self.stt.process.remote(np.concatenate(self.chunks, axis=0)) self.chunks.clear() self.running = False - print(text) return text diff --git a/src/modules/utils/sender.py b/src/modules/utils/sender.py index 4d142f6..e4c45a2 100644 --- a/src/modules/utils/sender.py +++ b/src/modules/utils/sender.py @@ -1,12 +1,13 @@ -from src.core.module import Module -from src.core.huri import WebSocket from typing import Any +from src.core.huri import WebSocket +from src.core.module import Module + class Sender(Module): """Module to send output data to the client""" - input_type = ... + input_type = None output_type = None def __init__(self, ws: WebSocket, type: str): From 06334b72f3e593853b2a9ed04eeff3d0838c010c Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 16 Mar 2026 11:15:14 +0100 Subject: [PATCH 21/21] remove(config): deprecated config files + quick_launch.sh --- config/agent_input.yaml | 34 ---------------------------------- config/agent_io.yaml | 36 ------------------------------------ config/huri.yaml | 11 ----------- quick_launch.sh | 40 ---------------------------------------- 4 files changed, 121 deletions(-) delete mode 100644 config/agent_input.yaml delete mode 100644 config/agent_io.yaml delete mode 100644 config/huri.yaml delete mode 100755 quick_launch.sh diff --git a/config/agent_input.yaml b/config/agent_input.yaml deleted file mode 100644 index c122a8b..0000000 --- a/config/agent_input.yaml +++ /dev/null @@ -1,34 +0,0 @@ -id: agent-io -hostname: localhost - -huri: - hostname: localhost - router: - port: 3000 - event-proxy: - xsub: 5555 - xpub: 5556 - log-puller: - port: 8008 - -forwarder-proxy: - down-xsub: 6665 - up-xpub: 6666 - -logging: INFO - -modules: - inp: - name: INP - logging: INFO - out: - name: OUT - logging: INFO - mod: - name: MOD - logging: INFO - rag: - name: RAG - args: - model: deepseek-v2:16b - logging: INFO diff --git a/config/agent_io.yaml b/config/agent_io.yaml deleted file mode 100644 index c9a5646..0000000 --- a/config/agent_io.yaml +++ /dev/null @@ -1,36 +0,0 @@ -id: agent-io -hostname: localhost - -huri: - hostname: localhost - router: - port: 3000 - event-proxy: - xsub: 5555 - xpub: 5556 - log-puller: - port: 8008 - -forwarder-proxy: - down-xsub: 6665 - up-xpub: 6666 - -logging: INFO - -modules: - mic: - name: mic - args: - sample_rate: 18000 - logging: INFO - stt: - name: stt - args: - sample_rate: 18000 - logging: INFO - # tts: - # name: vibe - # args: - # model: vibe-voice - # voice: adrien - # logging: DEBUG diff --git a/config/huri.yaml b/config/huri.yaml deleted file mode 100644 index 13f06b1..0000000 --- a/config/huri.yaml +++ /dev/null @@ -1,11 +0,0 @@ -hostname: localhost - -router: - port: 3000 - -event-proxy: - xsub: 5555 - xpub: 5556 - -log-puller: - port: 8008 diff --git a/quick_launch.sh b/quick_launch.sh deleted file mode 100755 index a76da2a..0000000 --- a/quick_launch.sh +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env bash - -set -e - -# Check args -if [ "$#" -lt 2 ]; then - echo "Usage: $0 [CLEAN]" - exit 1 -fi - -HURI_CONFIG="$1" -AGENT_CONFIG="$2" - -LOG_DIR="./tmp/log" - -if [[ " $* " == *" CLEAN "* ]]; then - echo "Cleaning previous logs in ${LOG_DIR}" - rm -rf "${LOG_DIR}" -fi - -mkdir -p "$LOG_DIR" - -TIMESTAMP=$(date +"%Y%m%d-%H%M%S") -HURI_LOG="${LOG_DIR}/huri-${TIMESTAMP}.log" - - -# Run huri with output redirected -python -m src.launch_huri --config "$HURI_CONFIG" > "$HURI_LOG" 2>&1 & -HURI_PID=$! -echo "HURI started in background (PID=${HURI_PID}), logging to ${HURI_LOG}" - -# Run agent -python -m src.launch_agent --config "$AGENT_CONFIG" - -# Ensure HURI is killed on script exit (normal or Ctrl+C) -cleanup() { - echo "Stopping HURI (PID=${HURI_PID})" - kill "${HURI_PID}" 2>/dev/null || true -} -trap cleanup EXIT INT TERM \ No newline at end of file