from __future__ import annotations from asyncio import AbstractEventLoop, new_event_loop, sleep from sdbus_async.networkmanager import NetworkConnectionSettings, NetworkManagerSecretAgentInterfaceAsync, NetworkManagerAgentManager, NetworkManagerConnectionProperties, NetworkManagerSettings from sdbus_async.networkmanager.settings import ConnectionProfile, ConnectionSettings, EapolSettings, WirelessSecuritySettings from typing import Any from os import environ import sdbus from .store import PasswordStore AGENT_OWNED = 1 PASS_FORMAT = environ.get("NMPASS_FORMAT", "net/{ssid}") class NetworkManagerPasswordStoreAgent(NetworkManagerSecretAgentInterfaceAsync): store: PasswordStore loop: AbstractEventLoop def __init__(self, loop: AbstractEventLoop): super(NetworkManagerSecretAgentInterfaceAsync, self).__init__() self.store = PasswordStore() self.loop = loop async def set_agent_owned(self, info: ConnectionSettings) -> None: # prevent infinite loops await sleep(1) assert info.uuid is not None settings_path = await NetworkManagerSettings().get_connection_by_uuid(info.uuid) connection = NetworkConnectionSettings(settings_path) profile = await connection.get_profile() if profile.wireless_security is not None: profile.wireless_security.psk_flags = AGENT_OWNED if profile.eapol is not None: profile.eapol.password_flags = AGENT_OWNED await connection.update(profile.to_dbus()) @sdbus.dbus_method_async_override() async def get_secrets( self, connection: NetworkManagerConnectionProperties, connection_path: str, setting_name: str, hints: list[str], flags: int, ) -> dict[str, dict[str, tuple[str, Any]]]: profile = ConnectionProfile.from_dbus(connection) if profile.wireless is None: return {} assert profile.wireless.ssid is not None ssid = profile.wireless.ssid.decode() pass_name = PASS_FORMAT.format(**{ "ssid": ssid, "profile": profile, }) password = self.store.retrieve(pass_name) if password is None: return {} if setting_name == '802-11-wireless-security': assert profile.wireless_security is not None assert profile.wireless_security.psk_flags is not None if (profile.wireless_security.psk_flags & AGENT_OWNED) == 0: self.loop.create_task(self.set_agent_owned(profile.connection)) return { setting_name: WirelessSecuritySettings(psk=password).to_dbus() } if setting_name == '802-1x': assert profile.eapol is not None assert profile.eapol.password_flags is not None if (profile.eapol.password_flags & AGENT_OWNED) == 0: self.loop.create_task(self.set_agent_owned(profile.connection)) return { setting_name: EapolSettings(password=password).to_dbus()} return {} def main(): loop = new_event_loop() sdbus.set_default_bus(sdbus.sd_bus_open_system()) agent = NetworkManagerPasswordStoreAgent(loop) agent.export_to_dbus('/org/freedesktop/NetworkManager/SecretAgent') agent_manager = NetworkManagerAgentManager() try: loop.run_until_complete(agent_manager.register('org.nmpassd')) loop.run_forever() except KeyboardInterrupt: loop.stop() if __name__ == "__main__": main()