From a996f78277d5ba5adccb0daa535bc2494350975c Mon Sep 17 00:00:00 2001 From: srdusr Date: Fri, 26 Sep 2025 12:40:58 +0200 Subject: Initial Commit --- src/cerberus/__init__.py | 11 + src/cerberus/__pycache__/__init__.cpython-313.pyc | Bin 0 -> 533 bytes src/cerberus/automation/__init__.py | 19 + .../__pycache__/__init__.cpython-313.pyc | Bin 0 -> 635 bytes .../__pycache__/discovery.cpython-313.pyc | Bin 0 -> 12850 bytes .../automation/__pycache__/engine.cpython-313.pyc | Bin 0 -> 1864 bytes .../__pycache__/playwright_engine.cpython-313.pyc | Bin 0 -> 4284 bytes .../automation/__pycache__/policy.cpython-313.pyc | Bin 0 -> 1146 bytes .../automation/__pycache__/runner.cpython-313.pyc | Bin 0 -> 3943 bytes .../__pycache__/selenium_engine.cpython-313.pyc | Bin 0 -> 4337 bytes .../automation/__pycache__/types.cpython-313.pyc | Bin 0 -> 1243 bytes src/cerberus/automation/discovery.py | 252 +++++++++ src/cerberus/automation/engine.py | 26 + src/cerberus/automation/playwright_engine.py | 60 ++ src/cerberus/automation/policy.py | 25 + src/cerberus/automation/runner.py | 59 ++ src/cerberus/automation/selenium_engine.py | 64 +++ .../sites/__pycache__/apple.cpython-313.pyc | Bin 0 -> 4023 bytes .../sites/__pycache__/base_site.cpython-313.pyc | Bin 0 -> 1073 bytes .../sites/__pycache__/facebook.cpython-313.pyc | Bin 0 -> 3733 bytes .../sites/__pycache__/github.cpython-313.pyc | Bin 0 -> 3278 bytes .../sites/__pycache__/google.cpython-313.pyc | Bin 0 -> 3377 bytes .../sites/__pycache__/linkedin.cpython-313.pyc | Bin 0 -> 3656 bytes .../sites/__pycache__/microsoft.cpython-313.pyc | Bin 0 -> 3329 bytes .../sites/__pycache__/twitter.cpython-313.pyc | Bin 0 -> 3425 bytes src/cerberus/automation/sites/apple.py | 64 +++ src/cerberus/automation/sites/base_site.py | 14 + src/cerberus/automation/sites/facebook.py | 56 ++ src/cerberus/automation/sites/github.py | 53 ++ src/cerberus/automation/sites/google.py | 45 ++ src/cerberus/automation/sites/linkedin.py | 56 ++ src/cerberus/automation/sites/microsoft.py | 40 ++ src/cerberus/automation/sites/twitter.py | 42 ++ src/cerberus/automation/types.py | 22 + src/cerberus/cli.py | 53 ++ src/cerberus/cli/__init__.py | 305 ++++++++++ .../cli/__pycache__/__init__.cpython-313.pyc | Bin 0 -> 17653 bytes src/cerberus/cli/__pycache__/main.cpython-313.pyc | Bin 0 -> 27863 bytes src/cerberus/cli/main.py | 611 +++++++++++++++++++++ src/cerberus/core/Makefile | 31 ++ src/cerberus/core/__init__.py | 127 +++++ .../core/__pycache__/__init__.cpython-313.pyc | Bin 0 -> 4536 bytes .../core/__pycache__/models.cpython-313.pyc | Bin 0 -> 3340 bytes .../__pycache__/password_manager.cpython-313.pyc | Bin 0 -> 21450 bytes src/cerberus/core/cerberus.c | 530 ++++++++++++++++++ src/cerberus/core/cerberus.h | 144 +++++ src/cerberus/core/cerberus.so | Bin 0 -> 30688 bytes src/cerberus/core/models.py | 58 ++ src/cerberus/core/password_manager.py | 462 ++++++++++++++++ src/cerberus/gui/__init__.py | 6 + src/cerberus/gui/main_window.py | 269 +++++++++ src/cerberus/integrations/__init__.py | 111 ++++ .../__pycache__/__init__.cpython-313.pyc | Bin 0 -> 4411 bytes .../__pycache__/bitwarden.cpython-313.pyc | Bin 0 -> 12752 bytes .../__pycache__/chrome.cpython-313.pyc | Bin 0 -> 5258 bytes .../__pycache__/keepass.cpython-313.pyc | Bin 0 -> 8901 bytes .../__pycache__/lastpass.cpython-313.pyc | Bin 0 -> 5034 bytes src/cerberus/integrations/bitwarden.py | 268 +++++++++ src/cerberus/integrations/chrome.py | 135 +++++ src/cerberus/integrations/keepass.py | 208 +++++++ src/cerberus/integrations/lastpass.py | 126 +++++ src/cerberus/native/host.py | 138 +++++ .../native/manifests/chrome_com.cerberus.pm.json | 9 + .../native/manifests/firefox_com.cerberus.pm.json | 9 + src/cerberus/tui/__init__.py | 16 + .../tui/__pycache__/__init__.cpython-313.pyc | Bin 0 -> 649 bytes src/cerberus/tui/app.py | 243 ++++++++ 67 files changed, 4767 insertions(+) create mode 100644 src/cerberus/__init__.py create mode 100644 src/cerberus/__pycache__/__init__.cpython-313.pyc create mode 100644 src/cerberus/automation/__init__.py create mode 100644 src/cerberus/automation/__pycache__/__init__.cpython-313.pyc create mode 100644 src/cerberus/automation/__pycache__/discovery.cpython-313.pyc create mode 100644 src/cerberus/automation/__pycache__/engine.cpython-313.pyc create mode 100644 src/cerberus/automation/__pycache__/playwright_engine.cpython-313.pyc create mode 100644 src/cerberus/automation/__pycache__/policy.cpython-313.pyc create mode 100644 src/cerberus/automation/__pycache__/runner.cpython-313.pyc create mode 100644 src/cerberus/automation/__pycache__/selenium_engine.cpython-313.pyc create mode 100644 src/cerberus/automation/__pycache__/types.cpython-313.pyc create mode 100644 src/cerberus/automation/discovery.py create mode 100644 src/cerberus/automation/engine.py create mode 100644 src/cerberus/automation/playwright_engine.py create mode 100644 src/cerberus/automation/policy.py create mode 100644 src/cerberus/automation/runner.py create mode 100644 src/cerberus/automation/selenium_engine.py create mode 100644 src/cerberus/automation/sites/__pycache__/apple.cpython-313.pyc create mode 100644 src/cerberus/automation/sites/__pycache__/base_site.cpython-313.pyc create mode 100644 src/cerberus/automation/sites/__pycache__/facebook.cpython-313.pyc create mode 100644 src/cerberus/automation/sites/__pycache__/github.cpython-313.pyc create mode 100644 src/cerberus/automation/sites/__pycache__/google.cpython-313.pyc create mode 100644 src/cerberus/automation/sites/__pycache__/linkedin.cpython-313.pyc create mode 100644 src/cerberus/automation/sites/__pycache__/microsoft.cpython-313.pyc create mode 100644 src/cerberus/automation/sites/__pycache__/twitter.cpython-313.pyc create mode 100644 src/cerberus/automation/sites/apple.py create mode 100644 src/cerberus/automation/sites/base_site.py create mode 100644 src/cerberus/automation/sites/facebook.py create mode 100644 src/cerberus/automation/sites/github.py create mode 100644 src/cerberus/automation/sites/google.py create mode 100644 src/cerberus/automation/sites/linkedin.py create mode 100644 src/cerberus/automation/sites/microsoft.py create mode 100644 src/cerberus/automation/sites/twitter.py create mode 100644 src/cerberus/automation/types.py create mode 100644 src/cerberus/cli.py create mode 100644 src/cerberus/cli/__init__.py create mode 100644 src/cerberus/cli/__pycache__/__init__.cpython-313.pyc create mode 100644 src/cerberus/cli/__pycache__/main.cpython-313.pyc create mode 100644 src/cerberus/cli/main.py create mode 100644 src/cerberus/core/Makefile create mode 100644 src/cerberus/core/__init__.py create mode 100644 src/cerberus/core/__pycache__/__init__.cpython-313.pyc create mode 100644 src/cerberus/core/__pycache__/models.cpython-313.pyc create mode 100644 src/cerberus/core/__pycache__/password_manager.cpython-313.pyc create mode 100644 src/cerberus/core/cerberus.c create mode 100644 src/cerberus/core/cerberus.h create mode 100755 src/cerberus/core/cerberus.so create mode 100644 src/cerberus/core/models.py create mode 100644 src/cerberus/core/password_manager.py create mode 100644 src/cerberus/gui/__init__.py create mode 100644 src/cerberus/gui/main_window.py create mode 100644 src/cerberus/integrations/__init__.py create mode 100644 src/cerberus/integrations/__pycache__/__init__.cpython-313.pyc create mode 100644 src/cerberus/integrations/__pycache__/bitwarden.cpython-313.pyc create mode 100644 src/cerberus/integrations/__pycache__/chrome.cpython-313.pyc create mode 100644 src/cerberus/integrations/__pycache__/keepass.cpython-313.pyc create mode 100644 src/cerberus/integrations/__pycache__/lastpass.cpython-313.pyc create mode 100644 src/cerberus/integrations/bitwarden.py create mode 100644 src/cerberus/integrations/chrome.py create mode 100644 src/cerberus/integrations/keepass.py create mode 100644 src/cerberus/integrations/lastpass.py create mode 100644 src/cerberus/native/host.py create mode 100644 src/cerberus/native/manifests/chrome_com.cerberus.pm.json create mode 100644 src/cerberus/native/manifests/firefox_com.cerberus.pm.json create mode 100644 src/cerberus/tui/__init__.py create mode 100644 src/cerberus/tui/__pycache__/__init__.cpython-313.pyc create mode 100644 src/cerberus/tui/app.py (limited to 'src') diff --git a/src/cerberus/__init__.py b/src/cerberus/__init__.py new file mode 100644 index 0000000..2064933 --- /dev/null +++ b/src/cerberus/__init__.py @@ -0,0 +1,11 @@ +# Avoid importing heavy submodules at top-level to prevent side effects +__all__ = ["PasswordManager", "PasswordEntry"] + +def __getattr__(name): + if name == "PasswordManager": + from .core.password_manager import PasswordManager + return PasswordManager + if name == "PasswordEntry": + from .core.models import PasswordEntry + return PasswordEntry + raise AttributeError(name) diff --git a/src/cerberus/__pycache__/__init__.cpython-313.pyc b/src/cerberus/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..d06ce4e Binary files /dev/null and b/src/cerberus/__pycache__/__init__.cpython-313.pyc differ diff --git a/src/cerberus/automation/__init__.py b/src/cerberus/automation/__init__.py new file mode 100644 index 0000000..9defa53 --- /dev/null +++ b/src/cerberus/automation/__init__.py @@ -0,0 +1,19 @@ +"""Automation layer for password rotation via the web. + +This package provides abstractions and engines (Playwright/Selenium) to automate +site-specific password change flows, along with runners and policy helpers. +""" + +from .types import AutomationResult, AutomationStatus +from .engine import AutomationEngine +from .runner import RotationRunner, RotationSelector + +__all__ = [ + 'AutomationEngine', + 'AutomationResult', + 'AutomationStatus', + 'RotationRunner', + 'RotationSelector', +] + + diff --git a/src/cerberus/automation/__pycache__/__init__.cpython-313.pyc b/src/cerberus/automation/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..81e9bb0 Binary files /dev/null and b/src/cerberus/automation/__pycache__/__init__.cpython-313.pyc differ diff --git a/src/cerberus/automation/__pycache__/discovery.cpython-313.pyc b/src/cerberus/automation/__pycache__/discovery.cpython-313.pyc new file mode 100644 index 0000000..a7be35d Binary files /dev/null and b/src/cerberus/automation/__pycache__/discovery.cpython-313.pyc differ diff --git a/src/cerberus/automation/__pycache__/engine.cpython-313.pyc b/src/cerberus/automation/__pycache__/engine.cpython-313.pyc new file mode 100644 index 0000000..7d6272e Binary files /dev/null and b/src/cerberus/automation/__pycache__/engine.cpython-313.pyc differ diff --git a/src/cerberus/automation/__pycache__/playwright_engine.cpython-313.pyc b/src/cerberus/automation/__pycache__/playwright_engine.cpython-313.pyc new file mode 100644 index 0000000..82a546b Binary files /dev/null and b/src/cerberus/automation/__pycache__/playwright_engine.cpython-313.pyc differ diff --git a/src/cerberus/automation/__pycache__/policy.cpython-313.pyc b/src/cerberus/automation/__pycache__/policy.cpython-313.pyc new file mode 100644 index 0000000..5887ac4 Binary files /dev/null and b/src/cerberus/automation/__pycache__/policy.cpython-313.pyc differ diff --git a/src/cerberus/automation/__pycache__/runner.cpython-313.pyc b/src/cerberus/automation/__pycache__/runner.cpython-313.pyc new file mode 100644 index 0000000..c8e90b2 Binary files /dev/null and b/src/cerberus/automation/__pycache__/runner.cpython-313.pyc differ diff --git a/src/cerberus/automation/__pycache__/selenium_engine.cpython-313.pyc b/src/cerberus/automation/__pycache__/selenium_engine.cpython-313.pyc new file mode 100644 index 0000000..d14e6d7 Binary files /dev/null and b/src/cerberus/automation/__pycache__/selenium_engine.cpython-313.pyc differ diff --git a/src/cerberus/automation/__pycache__/types.cpython-313.pyc b/src/cerberus/automation/__pycache__/types.cpython-313.pyc new file mode 100644 index 0000000..f2e48d4 Binary files /dev/null and b/src/cerberus/automation/__pycache__/types.cpython-313.pyc differ diff --git a/src/cerberus/automation/discovery.py b/src/cerberus/automation/discovery.py new file mode 100644 index 0000000..d03c3af --- /dev/null +++ b/src/cerberus/automation/discovery.py @@ -0,0 +1,252 @@ +""" +Heuristic discovery for password change and reset flows. + +This module attempts to dynamically locate "Change password" or "Forgot/Reset password" +paths and, when possible, automatically submit a password rotation using best-effort +selectors. It works with any engine that implements the AutomationEngine Protocol. +""" +from __future__ import annotations + +from dataclasses import dataclass +import logging +from typing import List, Optional, Dict, Any, Protocol + +from .types import AutomationResult, AutomationStatus +from ..core.models import PasswordEntry + +logger = logging.getLogger("cerberus") + + +DISCOVERY_KEYWORDS = [ + # English + "change password", + "password change", + "update password", + "reset password", + "forgot password", + "forgot your password", + "security", + "account settings", + # Common non-English hints (basic) + "contraseña", "senha", "mot de passe", + "passwort", "lozinka", "hasło", +] + +# Common input name/id candidates +OLD_PW_CANDIDATES = [ + "current_password", + "old_password", + "password_current", + "passwordOld", + "password_old", + "existing_password", +] +NEW_PW_CANDIDATES = [ + "new_password", + "password_new", + "password1", + "password", + "newPassword", +] +CONFIRM_PW_CANDIDATES = [ + "confirm_password", + "password_confirm", + "password2", + "confirmNewPassword", +] + +SUBMIT_CANDIDATES = [ + 'button[type="submit"]', + 'input[type="submit"]', + 'button.primary', + 'button.save', + 'button.update', +] + + +@dataclass +class DiscoveredEndpoint: + label: str + href: str + + +def _js_find_links_script() -> str: + # Returns JSON array of {text, href} + return ( + "(() => {" + " const matches = [];" + " const anchors = Array.from(document.querySelectorAll('a, button'));" + " const kws = new Set([" + ",".join([f"'{' '.join(k.split())}'" for k in DISCOVERY_KEYWORDS]) + "]);" + " for (const el of anchors) {" + " const text = (el.textContent || '').trim().toLowerCase();" + " const aria = (el.getAttribute('aria-label') || '').trim().toLowerCase();" + " const title = (el.getAttribute('title') || '').trim().toLowerCase();" + " for (const kw of kws) {" + " if (text.includes(kw) || aria.includes(kw) || title.includes(kw)) {" + " const href = el.getAttribute('href') || '';" + " matches.push({text, href});" + " break;" + " }" + " }" + " return matches;" + "})()" + ) + + +def discover_password_change(engine, base_url: Optional[str]) -> List[DiscoveredEndpoint]: + """Attempt to discover password change or reset endpoints from a base URL. + + Heuristic approach: scan DOM for anchors/buttons whose text matches discovery keywords. + """ + try: + # Ensure we're on the base URL first + if base_url: + engine.goto(base_url) + logger.debug("[discovery] scanning links/buttons for keywords") + except Exception: + logger.debug("[discovery] failed to navigate to base URL") + matches = engine.evaluate(_js_find_links_script()) + endpoints: List[DiscoveredEndpoint] = [] + if isinstance(matches, list): + for m in matches: + text = (m.get("text") or "").strip() + href = (m.get("href") or "").strip() + if href: + endpoints.append(DiscoveredEndpoint(text=text, href=href)) + # Deduplicate by href + unique: Dict[str, DiscoveredEndpoint] = {} + for e in endpoints: + unique[e.href] = e + logger.debug(f"[discovery] found {len(unique)} unique endpoints") + return list(unique.values()) + + +def _try_type(engine, selector: str, value: str) -> bool: + try: + engine.wait_for(selector, timeout_ms=1000) + engine.type(selector, value) + logger.debug(f"[discovery] typed into {selector}") + return True + except Exception: + logger.debug(f"[discovery] could not type into {selector}") + return False + + +def _try_click(engine, selector: str) -> bool: + try: + engine.wait_for(selector, timeout_ms=1000) + engine.click(selector) + logger.debug(f"[discovery] clicked {selector}") + return True + except Exception: + logger.debug(f"[discovery] could not click {selector}") + return False + + +def _try_login_if_present(engine, entry: PasswordEntry) -> bool: + """Best-effort login if a login form is present on the current page.""" + try: + has_form = engine.evaluate( + "(() => { const p = document.querySelector('form input[type=\\'password\\']'); return !!p; })()" + ) + except Exception: + has_form = False + if not has_form: + logger.debug("[discovery] no login form present on page") + return False + candidates_user = [ + "input[type='email']", + "input[type='text']", + "input[name*='user']", + "input[id*='user']", + "input[name*='email']", + "input[id*='email']", + "input[name*='login']", + "input[id*='login']", + ] + _ = any(_try_type(engine, sel, entry.username) for sel in candidates_user) + typed_pass = _try_type(engine, "input[type='password']", entry.password) + submitted = any(_try_click(engine, sel) for sel in SUBMIT_CANDIDATES) + if not submitted: + try: + engine.evaluate("(() => { const p = document.querySelector('input[type=\\'password\\']'); if (p && p.form) { p.form.submit(); return true;} return false; })()") + submitted = True + except Exception: + pass + ok = typed_pass and submitted + logger.debug(f"[discovery] login attempt result ok={ok}") + return ok + + +def try_submit_password_change(engine, entry: PasswordEntry, new_password: str) -> AutomationResult: + """Attempt to submit a password change on the current page using common selectors.""" + def candidates(names: List[str]) -> List[str]: + sels: List[str] = [] + for n in names: + sels.append(f"input[name='{n}']") + sels.append(f"input[id='{n}']") + sels.append(f"input[type='password'][name='{n}']") + sels.append(f"input[type='password'][id='{n}']") + sels.append(f"input[placeholder*='{n}']") + sels.append(f"input[aria-label*='{n}']") + return sels + + success_old = any(_try_type(engine, sel, entry.password) for sel in candidates(OLD_PW_CANDIDATES)) + success_new = any(_try_type(engine, sel, new_password) for sel in candidates(NEW_PW_CANDIDATES)) + success_confirm = any(_try_type(engine, sel, new_password) for sel in candidates(CONFIRM_PW_CANDIDATES)) or success_new + + submitted = any(_try_click(engine, sel) for sel in SUBMIT_CANDIDATES) + if not submitted: + try: + engine.evaluate("(() => { const el = document.querySelector('input[type=\\'password\\']'); if (el) { el.dispatchEvent(new KeyboardEvent('keydown', {key: 'Enter', bubbles: true})); el.form && el.form.submit && el.form.submit(); return true;} return false; })()") + submitted = True + except Exception: + pass + + if (success_new and success_confirm) and (submitted or success_old): + return AutomationResult(status=AutomationStatus.SUCCESS, message="Submitted change attempt") + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Could not auto-submit on page") + + +def auto_change_flow(engine, entry: PasswordEntry, new_password: str) -> AutomationResult: + """End-to-end best-effort flow: optional login, discover change endpoint, attempt update.""" + base = entry.url or (("https://" + entry.website) if entry.website and not entry.website.startswith("http") else entry.website) or "" + if base: + try: + logger.debug(f"[discovery] navigating to base URL: {base}") + engine.goto(base) + _try_login_if_present(engine, entry) + except Exception: + logger.debug("[discovery] failed to navigate or login at base URL") + + endpoints = discover_password_change(engine, base if base else None) + logger.debug(f"[discovery] candidate endpoints: {len(endpoints)}") + + # Also try common guesses relative to base + guesses = [ + "/account/security", + "/settings/security", + "/settings/password", + "/profile/security", + "/user/security", + ] + for g in guesses: + endpoints.append(DiscoveredEndpoint(label=f"guess:{g}", href=(base + g) if base else g)) + + for ep in endpoints: + try: + target = ep.href + if not target.startswith("http") and base: + target = base + ep.href + if not target: + continue + logger.debug(f"[discovery] trying endpoint: {target}") + engine.goto(target) + res = try_submit_password_change(engine, entry, new_password) + if res.status == AutomationStatus.SUCCESS: + return res + except Exception: + logger.debug("[discovery] endpoint attempt failed, trying next") + continue + + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="No automated flow succeeded") diff --git a/src/cerberus/automation/engine.py b/src/cerberus/automation/engine.py new file mode 100644 index 0000000..ade3e53 --- /dev/null +++ b/src/cerberus/automation/engine.py @@ -0,0 +1,26 @@ +from typing import Protocol, Optional, Dict, Any + + +class AutomationEngine(Protocol): + def start(self, headless: bool = True, user_data_dir: Optional[str] = None) -> None: + ... + + def stop(self) -> None: + ... + + def goto(self, url: str, wait_until: str = "networkidle") -> None: + ... + + def type(self, selector: str, value: str, clear: bool = True) -> None: + ... + + def click(self, selector: str) -> None: + ... + + def wait_for(self, selector: str, timeout_ms: int = 15000) -> None: + ... + + def evaluate(self, script: str, arg: Optional[Dict[str, Any]] = None) -> Any: + ... + + diff --git a/src/cerberus/automation/playwright_engine.py b/src/cerberus/automation/playwright_engine.py new file mode 100644 index 0000000..a61920b --- /dev/null +++ b/src/cerberus/automation/playwright_engine.py @@ -0,0 +1,60 @@ +from typing import Optional, Dict, Any + +from playwright.sync_api import sync_playwright, Page, Browser, BrowserContext + + +class PlaywrightEngine: + def __init__(self): + self._pw = None + self._browser: Optional[Browser] = None + self._context: Optional[BrowserContext] = None + self._page: Optional[Page] = None + + def start(self, headless: bool = True, user_data_dir: Optional[str] = None) -> None: + self._pw = sync_playwright().start() + launch_args = {"headless": headless} + if user_data_dir: + self._context = self._pw.chromium.launch_persistent_context(user_data_dir, **launch_args) + pages = self._context.pages + self._page = pages[0] if pages else self._context.new_page() + else: + self._browser = self._pw.chromium.launch(**launch_args) + self._context = self._browser.new_context() + self._page = self._context.new_page() + + def stop(self) -> None: + if self._context: + self._context.close() + if self._browser: + self._browser.close() + if self._pw: + self._pw.stop() + self._page = None + self._context = None + self._browser = None + self._pw = None + + def goto(self, url: str, wait_until: str = "networkidle") -> None: + assert self._page is not None + self._page.goto(url, wait_until=wait_until) + + def type(self, selector: str, value: str, clear: bool = True) -> None: + assert self._page is not None + locator = self._page.locator(selector) + if clear: + locator.fill("") + locator.type(value) + + def click(self, selector: str) -> None: + assert self._page is not None + self._page.click(selector) + + def wait_for(self, selector: str, timeout_ms: int = 15000) -> None: + assert self._page is not None + self._page.wait_for_selector(selector, timeout=timeout_ms) + + def evaluate(self, script: str, arg: Optional[Dict[str, Any]] = None) -> Any: + assert self._page is not None + return self._page.evaluate(script, arg) + + diff --git a/src/cerberus/automation/policy.py b/src/cerberus/automation/policy.py new file mode 100644 index 0000000..e4bab80 --- /dev/null +++ b/src/cerberus/automation/policy.py @@ -0,0 +1,25 @@ +from typing import Dict + +from ..core.password_manager import PasswordManager +from ..core.models import PasswordEntry + + +SITE_POLICY: Dict[str, Dict] = { + # domain_substring: policy + "github.com": {"length": 20, "use_upper": True, "use_lower": True, "use_digits": True, "use_special": True}, + "google.com": {"length": 20, "use_upper": True, "use_lower": True, "use_digits": True, "use_special": False}, +} + + +def generate_for_entry(pm: PasswordManager, entry: PasswordEntry) -> str: + url = entry.url or entry.website or "" + selected = None + for key, policy in SITE_POLICY.items(): + if key in url: + selected = policy + break + if not selected: + selected = {"length": 20, "use_upper": True, "use_lower": True, "use_digits": True, "use_special": True} + return pm.generate_password(**selected) + + diff --git a/src/cerberus/automation/runner.py b/src/cerberus/automation/runner.py new file mode 100644 index 0000000..841e89e --- /dev/null +++ b/src/cerberus/automation/runner.py @@ -0,0 +1,59 @@ +from dataclasses import dataclass +from typing import List, Optional, Callable +from datetime import datetime + +from ..core.password_manager import PasswordManager +from ..core.models import PasswordEntry +from .types import AutomationResult, AutomationStatus +from .discovery import auto_change_flow + + +@dataclass +class RotationSelector: + all: bool = False + compromised_only: bool = False + tag: Optional[str] = None + domain: Optional[str] = None + + +class RotationRunner: + def __init__(self, engine, site_flows: List, password_manager: PasswordManager): + self.engine = engine + self.site_flows = site_flows + self.pm = password_manager + + def _filter_entries(self, selector: RotationSelector) -> List[PasswordEntry]: + entries = self.pm.list_passwords() + filtered: List[PasswordEntry] = [] + for e in entries: + if selector.compromised_only and not e.compromised: + continue + if selector.tag and selector.tag not in (e.tags or []): + continue + if selector.domain and selector.domain not in (e.url or e.website or ""): + continue + filtered.append(e) + return filtered + + def rotate(self, selector: RotationSelector, generate_password: Callable[[PasswordEntry], str], dry_run: bool = True) -> List[AutomationResult]: + results: List[AutomationResult] = [] + targets = self._filter_entries(selector) + for entry in targets: + new_password = generate_password(entry) + if dry_run: + results.append(AutomationResult(status=AutomationStatus.SUCCESS, message="dry-run", changed_at=datetime.utcnow())) + continue + + flow = next((f for f in self.site_flows if f.match(entry)), None) + if flow: + res = flow.perform_change(self.engine, entry, new_password) + else: + # Fallback to heuristic auto discovery/change + res = auto_change_flow(self.engine, entry, new_password) + if res.status == AutomationStatus.SUCCESS: + self.pm.update_password(entry.id, password=new_password) + results.append(res) + + return results + + diff --git a/src/cerberus/automation/selenium_engine.py b/src/cerberus/automation/selenium_engine.py new file mode 100644 index 0000000..3fbdaac --- /dev/null +++ b/src/cerberus/automation/selenium_engine.py @@ -0,0 +1,64 @@ +from typing import Optional, Dict, Any + +try: + from selenium import webdriver + from selenium.webdriver.common.by import By + from selenium.webdriver.chrome.options import Options as ChromeOptions + SELENIUM_AVAILABLE = True +except Exception: # pragma: no cover - optional dependency + SELENIUM_AVAILABLE = False + + +class SeleniumEngine: + def __init__(self): + if not SELENIUM_AVAILABLE: + raise RuntimeError("Selenium is not installed. Install with: pip install selenium") + self._driver: Optional[webdriver.Chrome] = None + + def start(self, headless: bool = True, user_data_dir: Optional[str] = None) -> None: + options = ChromeOptions() + if headless: + options.add_argument("--headless=new") + if user_data_dir: + options.add_argument(f"--user-data-dir={user_data_dir}") + self._driver = webdriver.Chrome(options=options) + + def stop(self) -> None: + if self._driver: + self._driver.quit() + self._driver = None + + def goto(self, url: str, wait_until: str = "networkidle") -> None: + assert self._driver is not None + self._driver.get(url) + + def type(self, selector: str, value: str, clear: bool = True) -> None: + assert self._driver is not None + elem = self._driver.find_element(By.CSS_SELECTOR, selector) + if clear: + elem.clear() + elem.send_keys(value) + + def click(self, selector: str) -> None: + assert self._driver is not None + elem = self._driver.find_element(By.CSS_SELECTOR, selector) + elem.click() + + def wait_for(self, selector: str, timeout_ms: int = 15000) -> None: + assert self._driver is not None + # Simple polling; users can replace with WebDriverWait if desired + import time + end = time.time() + timeout_ms / 1000.0 + while time.time() < end: + try: + self._driver.find_element(By.CSS_SELECTOR, selector) + return + except Exception: + time.sleep(0.1) + raise TimeoutError(f"Timeout waiting for selector: {selector}") + + def evaluate(self, script: str, arg: Optional[Dict[str, Any]] = None) -> Any: + assert self._driver is not None + return self._driver.execute_script(script, arg) + + diff --git a/src/cerberus/automation/sites/__pycache__/apple.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/apple.cpython-313.pyc new file mode 100644 index 0000000..65c1943 Binary files /dev/null and b/src/cerberus/automation/sites/__pycache__/apple.cpython-313.pyc differ diff --git a/src/cerberus/automation/sites/__pycache__/base_site.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/base_site.cpython-313.pyc new file mode 100644 index 0000000..28a72e3 Binary files /dev/null and b/src/cerberus/automation/sites/__pycache__/base_site.cpython-313.pyc differ diff --git a/src/cerberus/automation/sites/__pycache__/facebook.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/facebook.cpython-313.pyc new file mode 100644 index 0000000..fabaf7c Binary files /dev/null and b/src/cerberus/automation/sites/__pycache__/facebook.cpython-313.pyc differ diff --git a/src/cerberus/automation/sites/__pycache__/github.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/github.cpython-313.pyc new file mode 100644 index 0000000..776dc1c Binary files /dev/null and b/src/cerberus/automation/sites/__pycache__/github.cpython-313.pyc differ diff --git a/src/cerberus/automation/sites/__pycache__/google.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/google.cpython-313.pyc new file mode 100644 index 0000000..ca05800 Binary files /dev/null and b/src/cerberus/automation/sites/__pycache__/google.cpython-313.pyc differ diff --git a/src/cerberus/automation/sites/__pycache__/linkedin.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/linkedin.cpython-313.pyc new file mode 100644 index 0000000..56f9f97 Binary files /dev/null and b/src/cerberus/automation/sites/__pycache__/linkedin.cpython-313.pyc differ diff --git a/src/cerberus/automation/sites/__pycache__/microsoft.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/microsoft.cpython-313.pyc new file mode 100644 index 0000000..4b579a5 Binary files /dev/null and b/src/cerberus/automation/sites/__pycache__/microsoft.cpython-313.pyc differ diff --git a/src/cerberus/automation/sites/__pycache__/twitter.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/twitter.cpython-313.pyc new file mode 100644 index 0000000..976b44f Binary files /dev/null and b/src/cerberus/automation/sites/__pycache__/twitter.cpython-313.pyc differ diff --git a/src/cerberus/automation/sites/apple.py b/src/cerberus/automation/sites/apple.py new file mode 100644 index 0000000..1f32946 --- /dev/null +++ b/src/cerberus/automation/sites/apple.py @@ -0,0 +1,64 @@ +from typing import Optional, Dict, Any +from datetime import datetime + +from ...core.models import PasswordEntry +from ..types import AutomationResult, AutomationStatus +from .base_site import SiteFlow + + +class AppleFlow(SiteFlow): + def match(self, entry: PasswordEntry) -> bool: + target = (entry.url or entry.website or "").lower() + return "apple.com" in target or "appleid.apple.com" in target + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + try: + # Apple frequently enforces MFA; treat as best-effort stub + engine.goto("https://appleid.apple.com/") + engine.wait_for("input[type=email], input[name=email], input[id=email]") + try: + engine.type("input[type=email], input[name=email], input[id=email]", entry.username) + except Exception: + pass + try: + engine.wait_for("input[type=password]", timeout_ms=8000) + engine.type("input[type=password]", entry.password) + except Exception: + return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="Apple ID requires MFA or device approval") + + # Direct to password change if possible (likely gated by MFA) + engine.goto("https://appleid.apple.com/account/manage/password") + try: + engine.wait_for("input[type=password]", timeout_ms=8000) + except Exception: + return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="Password settings gated by MFA") + + # Attempt to fill current/new/confirm + for sel in ["input[name='currentPassword']", "input[id='currentPassword']", "input[type='password']"]: + try: + engine.type(sel, entry.password) + break + except Exception: + continue + for sel in ["input[name='newPassword']", "input[id='newPassword']"]: + try: + engine.type(sel, new_password) + break + except Exception: + continue + for sel in ["input[name='confirmPassword']", "input[id='confirmPassword']"]: + try: + engine.type(sel, new_password) + break + except Exception: + continue + for sel in ["button[type=submit]", "button"]: + try: + engine.click(sel) + break + except Exception: + continue + + return AutomationResult(status=AutomationStatus.SUCCESS, message="Password change attempted", changed_at=datetime.utcnow()) + except Exception as e: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e)) diff --git a/src/cerberus/automation/sites/base_site.py b/src/cerberus/automation/sites/base_site.py new file mode 100644 index 0000000..a33dc5b --- /dev/null +++ b/src/cerberus/automation/sites/base_site.py @@ -0,0 +1,14 @@ +from typing import Protocol, Optional, Dict, Any + +from ...core.models import PasswordEntry +from ..types import AutomationResult + + +class SiteFlow(Protocol): + def match(self, entry: PasswordEntry) -> bool: + ... + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + ... + + diff --git a/src/cerberus/automation/sites/facebook.py b/src/cerberus/automation/sites/facebook.py new file mode 100644 index 0000000..f7b0535 --- /dev/null +++ b/src/cerberus/automation/sites/facebook.py @@ -0,0 +1,56 @@ +from typing import Optional, Dict, Any +from datetime import datetime + +from ...core.models import PasswordEntry +from ..types import AutomationResult, AutomationStatus +from .base_site import SiteFlow + + +class FacebookFlow(SiteFlow): + def match(self, entry: PasswordEntry) -> bool: + target = (entry.url or entry.website or "").lower() + return "facebook.com" in target or "fb.com" in target + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + try: + engine.goto("https://www.facebook.com/login/") + engine.wait_for("input[name=email]") + engine.type("input[name=email]", entry.username) + engine.type("input[name=pass]", entry.password) + engine.click("button[name=login]") + + # Password settings + engine.goto("https://www.facebook.com/settings?tab=security") + # New UI is dynamic; try common selectors + try: + engine.wait_for("input[type=password]", timeout_ms=8000) + except Exception: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Security page requires interaction or MFA") + + # Attempt common current/new/confirm fields + try: + engine.type("input[name='current_password']", entry.password) + except Exception: + pass + for sel in ["input[name='new_password']", "input[name='password_new']", "input[name='new']"]: + try: + engine.type(sel, new_password) + break + except Exception: + continue + for sel in ["input[name='confirm_password']", "input[name='password_confirm']", "input[name='confirm']"]: + try: + engine.type(sel, new_password) + break + except Exception: + continue + for sel in ["button[type=submit]", "[data-testid='sec_settings_save']", "button"]: + try: + engine.click(sel) + break + except Exception: + continue + + return AutomationResult(status=AutomationStatus.SUCCESS, message="Password change attempted", changed_at=datetime.utcnow()) + except Exception as e: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e)) diff --git a/src/cerberus/automation/sites/github.py b/src/cerberus/automation/sites/github.py new file mode 100644 index 0000000..36e25f2 --- /dev/null +++ b/src/cerberus/automation/sites/github.py @@ -0,0 +1,53 @@ +from typing import Optional, Dict, Any +from datetime import datetime + +from ...core.models import PasswordEntry +from ..types import AutomationResult, AutomationStatus +from .base_site import SiteFlow + + +class GithubFlow(SiteFlow): + def match(self, entry: PasswordEntry) -> bool: + target = (entry.url or entry.website or "").lower() + return "github.com" in target + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + try: + # Login + engine.goto("https://github.com/login") + engine.wait_for("input#login_field") + engine.type("input#login_field", entry.username) + engine.type("input#password", entry.password) + engine.click("input[type=submit]") + + # Detect 2FA requirement + try: + engine.wait_for("input#otp", timeout_ms=3000) + return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="2FA required") + except Exception: + pass + + # Navigate to password change + engine.goto("https://github.com/settings/security") + engine.wait_for("a[href='/settings/password']") + engine.click("a[href='/settings/password']") + + engine.wait_for("input#old_password") + engine.type("input#old_password", entry.password) + engine.type("input#new_password", new_password) + engine.type("input#confirm_new_password", new_password) + engine.click("button[type=submit]") + + # Verify success: check for flash notice + try: + engine.wait_for(".flash-notice, .flash-success", timeout_ms=5000) + except Exception: + # As fallback, assume success if no error shown + pass + + return AutomationResult(status=AutomationStatus.SUCCESS, message="Password changed", changed_at=datetime.utcnow()) + + except Exception as e: + return AutomationResult(status=AutomationStatus.FAILED, message="Error during change", error=str(e)) + + diff --git a/src/cerberus/automation/sites/google.py b/src/cerberus/automation/sites/google.py new file mode 100644 index 0000000..4d651b7 --- /dev/null +++ b/src/cerberus/automation/sites/google.py @@ -0,0 +1,45 @@ +from typing import Optional, Dict, Any +from datetime import datetime + +from ...core.models import PasswordEntry +from ..types import AutomationResult, AutomationStatus +from .base_site import SiteFlow + + +class GoogleFlow(SiteFlow): + def match(self, entry: PasswordEntry) -> bool: + target = (entry.url or entry.website or "").lower() + return "google.com" in target + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + try: + # Login flow + engine.goto("https://accounts.google.com/signin/v2/identifier") + engine.wait_for("input[type=email], input#identifierId") + try: + engine.type("input[type=email], input#identifierId", entry.username) + except Exception: + pass + engine.click("#identifierNext, button[type=submit]") + try: + engine.wait_for("input[type=password]", timeout_ms=7000) + except Exception: + return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="MFA or challenge present") + engine.type("input[type=password]", entry.password) + engine.click("#passwordNext, button[type=submit]") + + # Security settings - direct link (subject to change) + engine.goto("https://myaccount.google.com/signinoptions/password") + engine.wait_for("input[type=password]") + engine.type("input[type=password]", entry.password) + engine.click("button[type=submit], #passwordNext") + + # New password fields + engine.wait_for("input[name=password]") + engine.type("input[name=password]", new_password) + engine.type("input[name=confirmation_password]", new_password) + engine.click("button[type=submit]") + + return AutomationResult(status=AutomationStatus.SUCCESS, message="Password changed", changed_at=datetime.utcnow()) + except Exception as e: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e)) diff --git a/src/cerberus/automation/sites/linkedin.py b/src/cerberus/automation/sites/linkedin.py new file mode 100644 index 0000000..cb8ecea --- /dev/null +++ b/src/cerberus/automation/sites/linkedin.py @@ -0,0 +1,56 @@ +from typing import Optional, Dict, Any +from datetime import datetime + +from ...core.models import PasswordEntry +from ..types import AutomationResult, AutomationStatus +from .base_site import SiteFlow + + +class LinkedInFlow(SiteFlow): + def match(self, entry: PasswordEntry) -> bool: + target = (entry.url or entry.website or "").lower() + return "linkedin.com" in target + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + try: + engine.goto("https://www.linkedin.com/login") + engine.wait_for("input#username") + engine.type("input#username", entry.username) + engine.type("input#password", entry.password) + engine.click("button[type=submit]") + + engine.goto("https://www.linkedin.com/psettings/change-password") + try: + engine.wait_for("input[type=password]", timeout_ms=8000) + except Exception: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Password settings gated or MFA required") + + # Fill current/new/confirm + for sel in ["input[name='currentPassword']", "input#current-password", "input[type='password']"]: + try: + engine.type(sel, entry.password) + break + except Exception: + continue + for sel in ["input[name='newPassword']", "input#new-password"]: + try: + engine.type(sel, new_password) + break + except Exception: + continue + for sel in ["input[name='confirmPassword']", "input#confirm-password"]: + try: + engine.type(sel, new_password) + break + except Exception: + continue + for sel in ["button[type=submit]", "button"]: + try: + engine.click(sel) + break + except Exception: + continue + + return AutomationResult(status=AutomationStatus.SUCCESS, message="Password change attempted", changed_at=datetime.utcnow()) + except Exception as e: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e)) diff --git a/src/cerberus/automation/sites/microsoft.py b/src/cerberus/automation/sites/microsoft.py new file mode 100644 index 0000000..8ddee76 --- /dev/null +++ b/src/cerberus/automation/sites/microsoft.py @@ -0,0 +1,40 @@ +from typing import Optional, Dict, Any +from datetime import datetime + +from ...core.models import PasswordEntry +from ..types import AutomationResult, AutomationStatus +from .base_site import SiteFlow + + +class MicrosoftFlow(SiteFlow): + def match(self, entry: PasswordEntry) -> bool: + target = (entry.url or entry.website or "").lower() + return "microsoft.com" in target or "live.com" in target or "office.com" in target + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + try: + # Login page + engine.goto("https://login.live.com/") + engine.wait_for("input[type=email], input[name=loginfmt]") + engine.type("input[type=email], input[name=loginfmt]", entry.username) + engine.click("input[type=submit], button[type=submit]") + + # Password step + try: + engine.wait_for("input[type=password]", timeout_ms=8000) + except Exception: + return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="MFA or challenge present") + engine.type("input[type=password]", entry.password) + engine.click("input[type=submit], button[type=submit]") + + # Navigate to security/password change + engine.goto("https://account.live.com/password/change") + engine.wait_for("input[name=OldPassword], input[name=CurrentPassword]") + engine.type("input[name=OldPassword], input[name=CurrentPassword]", entry.password) + engine.type("input[name=NewPassword], input[name=NewPasswordBox]", new_password) + engine.type("input[name=ConfirmPassword], input[name=ConfirmNewPasswordBox]", new_password) + engine.click("button[type=submit], input[type=submit]") + + return AutomationResult(status=AutomationStatus.SUCCESS, message="Password changed", changed_at=datetime.utcnow()) + except Exception as e: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e)) diff --git a/src/cerberus/automation/sites/twitter.py b/src/cerberus/automation/sites/twitter.py new file mode 100644 index 0000000..609eda7 --- /dev/null +++ b/src/cerberus/automation/sites/twitter.py @@ -0,0 +1,42 @@ +from typing import Optional, Dict, Any +from datetime import datetime + +from ...core.models import PasswordEntry +from ..types import AutomationResult, AutomationStatus +from .base_site import SiteFlow + + +class TwitterFlow(SiteFlow): + def match(self, entry: PasswordEntry) -> bool: + target = (entry.url or entry.website or "").lower() + return "twitter.com" in target or "x.com" in target + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + try: + # Login flow + engine.goto("https://twitter.com/i/flow/login") + engine.wait_for("input[name='text']") + engine.type("input[name='text']", entry.username) + engine.click("div[role='button'][data-testid='LoginForm_Login_Button'], div[role='button']") + try: + engine.wait_for("input[name='password']", timeout_ms=8000) + except Exception: + return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="MFA or challenge present") + engine.type("input[name='password']", entry.password) + engine.click("div[role='button'][data-testid='LoginForm_Login_Button']") + + # Settings (paths change often; best-effort direct link) + engine.goto("https://twitter.com/settings/password") + engine.wait_for("input[name='current_password'], input[type='password']") + # Try to fill typical current/new/confirm fields + try: + engine.type("input[name='current_password']", entry.password) + except Exception: + pass + engine.type("input[name='new_password']", new_password) + engine.type("input[name='password_confirmation']", new_password) + engine.click("div[role='button'], button[type='submit']") + + return AutomationResult(status=AutomationStatus.SUCCESS, message="Password changed", changed_at=datetime.utcnow()) + except Exception as e: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e)) diff --git a/src/cerberus/automation/types.py b/src/cerberus/automation/types.py new file mode 100644 index 0000000..f236a28 --- /dev/null +++ b/src/cerberus/automation/types.py @@ -0,0 +1,22 @@ +from dataclasses import dataclass, field +from enum import Enum +from typing import Optional, Dict, Any +from datetime import datetime + + +class AutomationStatus(str, Enum): + SUCCESS = "success" + NEEDS_MFA = "needs_mfa" + NEEDS_MANUAL = "needs_manual" + FAILED = "failed" + + +@dataclass +class AutomationResult: + status: AutomationStatus + message: str = "" + changed_at: Optional[datetime] = None + error: Optional[str] = None + details: Dict[str, Any] = field(default_factory=dict) + + diff --git a/src/cerberus/cli.py b/src/cerberus/cli.py new file mode 100644 index 0000000..902d4be --- /dev/null +++ b/src/cerberus/cli.py @@ -0,0 +1,53 @@ +import argparse +from typing import List + +from cerberus.core.password_manager import PasswordManager +from cerberus.automation.playwright_engine import PlaywrightEngine +from cerberus.automation.selenium_engine import SeleniumEngine, SELENIUM_AVAILABLE +from cerberus.automation.runner import RotationRunner, RotationSelector +from cerberus.automation.policy import generate_for_entry +from cerberus.automation.sites.github import GithubFlow + + +def cli(): + parser = argparse.ArgumentParser(prog="cerberus") + sub = parser.add_subparsers(dest="command") + + rotate = sub.add_parser("rotate", help="Rotate passwords via web automation") + rotate.add_argument("--engine", choices=["playwright", "selenium"], default="playwright") + rotate.add_argument("--data-dir", default=None, help="Password manager data dir") + rotate.add_argument("--master", required=True, help="Master password") + rotate.add_argument("--all", action="store_true", help="Rotate all entries") + rotate.add_argument("--compromised", action="store_true", help="Rotate only compromised entries") + rotate.add_argument("--tag", default=None) + rotate.add_argument("--domain", default=None) + rotate.add_argument("--dry-run", action="store_true") + + args = parser.parse_args() + + if args.command == "rotate": + pm = PasswordManager(data_dir=args.data_dir, master_password=args.master) + + engine = PlaywrightEngine() if args.engine == "playwright" else None + if args.engine == "selenium": + if not SELENIUM_AVAILABLE: + raise SystemExit("Selenium not installed. Install extra: pip install .[automation-selenium]") + engine = SeleniumEngine() + + engine.start(headless=True) + try: + flows = [GithubFlow()] + runner = RotationRunner(engine, flows, pm) + selector = RotationSelector( + all=args.all, + compromised_only=args.compromised, + tag=args.tag, + domain=args.domain, + ) + results = runner.rotate(selector, lambda e: generate_for_entry(pm, e), dry_run=args.dry_run) + for r in results: + print(f"{r.status}: {r.message}") + finally: + engine.stop() + else: + parser.print_help() diff --git a/src/cerberus/cli/__init__.py b/src/cerberus/cli/__init__.py new file mode 100644 index 0000000..d3e17ef --- /dev/null +++ b/src/cerberus/cli/__init__.py @@ -0,0 +1,305 @@ +""" +Cerberus CLI - Command Line Interface for Cerberus Password Manager. +""" +from typing import Optional, List, Dict, Any +import logging +import sys +import json +from pathlib import Path +from datetime import datetime +import getpass + +import click +from rich.console import Console +from rich.table import Table +from rich.progress import Progress, SpinnerColumn, TextColumn + +from ..core.password_manager import PasswordManager, VaultError +from ..core.models import PasswordEntry +from ..tui import main as tui_main + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(message)s') +logger = logging.getLogger(__name__) + +# Create console for rich output +console = Console() + +class CerberusCLI: + """Main CLI application for Cerberus Password Manager.""" + + def __init__(self, data_dir: Optional[str] = None, debug: bool = False): + """Initialize the CLI.""" + self.data_dir = data_dir + self.debug = debug + self.pm: Optional[PasswordManager] = None + + if debug: + logging.getLogger().setLevel(logging.DEBUG) + logger.debug("Debug mode enabled") + + def ensure_initialized(self) -> None: + """Ensure the password manager is initialized.""" + if not self.pm: + raise click.UsageError("Vault is not unlocked. Use 'cerberus unlock' first.") + + def unlock_vault(self, master_password: Optional[str] = None) -> None: + """Unlock the password vault.""" + try: + if not master_password: + master_password = getpass.getpass("Master password: ") + + with self._progress_spinner("Unlocking vault..."): + self.pm = PasswordManager(data_dir=self.data_dir, master_password=master_password) + + console.print("[green]✓[/] Vault unlocked successfully!") + + except VaultError as e: + raise click.ClickException(f"Failed to unlock vault: {e}") + except Exception as e: + if self.debug: + logger.exception("Error unlocking vault") + raise click.ClickException(f"An error occurred: {e}") + + def _progress_spinner(self, description: str): + """Create a progress spinner context manager.""" + return Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + transient=True + ) + + def list_entries(self, search: Optional[str] = None) -> List[PasswordEntry]: + """List password entries, optionally filtered by search term.""" + self.ensure_initialized() + + try: + entries = self.pm.get_entries() + + if search: + search = search.lower() + entries = [ + e for e in entries + if search in e.website.lower() or + search in (e.username or "").lower() or + search in (e.notes or "").lower() or + any(search in tag.lower() for tag in (e.tags or [])) + ] + + return entries + + except Exception as e: + if self.debug: + logger.exception("Error listing entries") + raise click.ClickException(f"Failed to list entries: {e}") + + def get_entry(self, identifier: str) -> PasswordEntry: + """Get a specific password entry by ID or website.""" + self.ensure_initialized() + + try: + # Try to get by ID first + try: + return self.pm.get_entry(identifier) + except (ValueError, KeyError): + # If not found by ID, try by website + entries = [e for e in self.pm.get_entries() if e.website.lower() == identifier.lower()] + if not entries: + raise ValueError(f"No entry found with ID or website: {identifier}") + if len(entries) > 1: + raise ValueError(f"Multiple entries found for website: {identifier}. Please use the entry ID instead.") + return entries[0] + + except Exception as e: + if self.debug: + logger.exception(f"Error getting entry: {identifier}") + raise click.ClickException(str(e)) + + def add_entry( + self, + website: str, + username: str, + password: Optional[str] = None, + url: str = "", + notes: str = "", + tags: Optional[List[str]] = None, + generate: bool = False, + length: int = 16, + special_chars: bool = True + ) -> PasswordEntry: + """Add a new password entry.""" + self.ensure_initialized() + + try: + if generate: + with self._progress_spinner("Generating strong password..."): + password = self.pm.generate_password_easy(length=length, special=special_chars) + elif not password: + password = click.prompt("Password", hide_input=True, confirmation_prompt=True) + + entry = PasswordEntry( + id=self.pm.generate_id(), + website=website, + username=username, + password=password, + url=url, + notes=notes, + tags=tags or [], + created_at=datetime.utcnow(), + updated_at=datetime.utcnow() + ) + + with self._progress_spinner("Saving entry..."): + self.pm.add_entry(entry) + + return entry + + except Exception as e: + if self.debug: + logger.exception("Error adding entry") + raise click.ClickException(f"Failed to add entry: {e}") + + def update_entry(self, entry_id: str, **updates) -> PasswordEntry: + """Update an existing password entry.""" + self.ensure_initialized() + + try: + entry = self.get_entry(entry_id) + + # Apply updates + for key, value in updates.items(): + if value is not None and hasattr(entry, key): + setattr(entry, key, value) + + entry.updated_at = datetime.utcnow() + + with self._progress_spinner("Updating entry..."): + self.pm.update_entry(entry) + + return entry + + except Exception as e: + if self.debug: + logger.exception(f"Error updating entry: {entry_id}") + raise click.ClickException(f"Failed to update entry: {e}") + + def delete_entry(self, entry_id: str) -> None: + """Delete a password entry.""" + self.ensure_initialized() + + try: + entry = self.get_entry(entry_id) + + if click.confirm(f"Are you sure you want to delete the entry for {entry.website}?"): + with self._progress_spinner("Deleting entry..."): + self.pm.delete_entry(entry.id) + + console.print(f"[green]✓[/] Deleted entry for {entry.website}") + + except Exception as e: + if self.debug: + logger.exception(f"Error deleting entry: {entry_id}") + raise click.ClickException(f"Failed to delete entry: {e}") + + def rotate_password( + self, + entry_id: str, + length: int = 32, + special_chars: bool = True + ) -> PasswordEntry: + """Generate a new password for an entry.""" + self.ensure_initialized() + + try: + entry = self.get_entry(entry_id) + + with self._progress_spinner("Generating new password..."): + new_password = self.pm.generate_password_easy(length=length, special=special_chars) + + entry.password = new_password + entry.updated_at = datetime.utcnow() + + with self._progress_spinner("Saving updated entry..."): + self.pm.update_entry(entry) + + return entry + + except Exception as e: + if self.debug: + logger.exception(f"Error rotating password for entry: {entry_id}") + raise click.ClickException(f"Failed to rotate password: {e}") + + def export_entries(self, output_file: str, format: str = "json") -> None: + """Export password entries to a file.""" + self.ensure_initialized() + + try: + entries = self.pm.get_entries() + output_path = Path(output_file).expanduser().resolve() + + with self._progress_spinner(f"Exporting entries to {output_path}..."): + if format.lower() == "json": + data = [e.to_dict() for e in entries] + output_path.write_text(json.dumps(data, indent=2, default=str)) + else: + raise ValueError(f"Unsupported export format: {format}") + + console.print(f"[green]✓[/] Exported {len(entries)} entries to {output_path}") + + except Exception as e: + if self.debug: + logger.exception("Error exporting entries") + raise click.ClickException(f"Failed to export entries: {e}") + + def import_entries(self, input_file: str, format: str = "json") -> None: + """Import password entries from a file.""" + self.ensure_initialized() + + try: + input_path = Path(input_file).expanduser().resolve() + + if not input_path.exists(): + raise FileNotFoundError(f"Input file not found: {input_path}") + + with self._progress_spinner(f"Importing entries from {input_path}..."): + if format.lower() == "json": + data = json.loads(input_path.read_text()) + for item in data: + entry = PasswordEntry.from_dict(item) + # Ensure we don't overwrite existing entries + entry.id = self.pm.generate_id() + self.pm.add_entry(entry) + else: + raise ValueError(f"Unsupported import format: {format}") + + console.print(f"[green]✓[/] Imported {len(data)} entries from {input_path}") + + except Exception as e: + if self.debug: + logger.exception("Error importing entries") + raise click.ClickException(f"Failed to import entries: {e}") + +def print_entry_table(entries: List[PasswordEntry]) -> None: + """Print a table of password entries.""" + if not entries: + console.print("[yellow]No entries found.[/]") + return + + table = Table(show_header=True, header_style="bold magenta") + table.add_column("ID", style="dim", width=8) + table.add_column("Website") + table.add_column("Username") + table.add_column("Last Used", style="dim") + table.add_column("Updated", style="dim") + + for entry in entries: + table.add_row( + entry.id[:8], + entry.website, + entry.username, + entry.last_used.strftime("%Y-%m-%d") if entry.last_used else "Never", + entry.updated_at.strftime("%Y-%m-%d") if entry.updated_at else "" + ) + + console.print(table) diff --git a/src/cerberus/cli/__pycache__/__init__.cpython-313.pyc b/src/cerberus/cli/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..093e1af Binary files /dev/null and b/src/cerberus/cli/__pycache__/__init__.cpython-313.pyc differ diff --git a/src/cerberus/cli/__pycache__/main.cpython-313.pyc b/src/cerberus/cli/__pycache__/main.cpython-313.pyc new file mode 100644 index 0000000..3663a5a Binary files /dev/null and b/src/cerberus/cli/__pycache__/main.cpython-313.pyc differ diff --git a/src/cerberus/cli/main.py b/src/cerberus/cli/main.py new file mode 100644 index 0000000..c40c411 --- /dev/null +++ b/src/cerberus/cli/main.py @@ -0,0 +1,611 @@ +""" +Cerberus CLI - Command Line Interface for Cerberus Password Manager. +""" +import os +import sys +import logging +from typing import Optional + +import click +from rich.console import Console +from rich.logging import RichHandler + +from . import CerberusCLI, print_entry_table +from ..automation.playwright_engine import PlaywrightEngine +from ..automation.selenium_engine import SeleniumEngine, SELENIUM_AVAILABLE +from ..automation.runner import RotationRunner, RotationSelector +from ..automation.sites.github import GithubFlow +from ..automation.sites.google import GoogleFlow +from ..automation.sites.microsoft import MicrosoftFlow +from ..automation.sites.twitter import TwitterFlow +from ..automation.sites.facebook import FacebookFlow +from ..automation.sites.linkedin import LinkedInFlow +from ..automation.sites.apple import AppleFlow +from ..automation.policy import generate_for_entry +from ..automation.types import AutomationStatus + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(message)s", + datefmt="[%X]", + handlers=[RichHandler(rich_tracebacks=True)] +) +logger = logging.getLogger("cerberus") + +# Create console for rich output +console = Console() + +# Default data directory +DEFAULT_DATA_DIR = os.path.expanduser("~/.cerberus") + +@click.group(invoke_without_command=True) +@click.option( + "--data-dir", + type=click.Path(file_okay=False, dir_okay=True, writable=True), + default=DEFAULT_DATA_DIR, + help="Directory to store password data", + show_default=True +) +@click.option( + "--debug/--no-debug", + default=False, + help="Enable debug output", + show_default=True +) +@click.pass_context +def cli(ctx: click.Context, data_dir: str, debug: bool) -> None: + """Cerberus Password Manager - Secure and user-friendly password management.""" + # Set debug logging if enabled + if debug: + logging.getLogger().setLevel(logging.DEBUG) + logger.debug("Debug mode enabled") + + # Create data directory if it doesn't exist + os.makedirs(data_dir, exist_ok=True) + + # Store the CLI instance in the context + ctx.obj = CerberusCLI(data_dir=data_dir, debug=debug) + + # If no command is provided, show help + if ctx.invoked_subcommand is None: + click.echo(ctx.get_help()) + +@cli.command() +@click.pass_obj +def init(cli: CerberusCLI) -> None: + """Initialize a new password vault.""" + master_password = click.prompt( + "Enter a master password", + hide_input=True, + confirmation_prompt=True + ) + + try: + cli.unlock_vault(master_password) + console.print("[green]✓[/] Vault initialized and unlocked!") + except Exception as e: + console.print(f"[red]✗[/] Failed to initialize vault: {e}") + sys.exit(1) + +@cli.command() +@click.option("--engine", type=click.Choice(["playwright", "selenium"]), default="playwright", show_default=True) +@click.option("--all", "all_", is_flag=True, default=False, help="Include all entries") +@click.option("--compromised", is_flag=True, default=False, help="Only compromised entries") +@click.option("--tag", default=None, help="Filter by tag") +@click.option("--domain", default=None, help="Filter by domain") +@click.pass_obj +def reliability_report( + cli: CerberusCLI, + engine: str, + all_: bool, + compromised: bool, + tag: Optional[str], + domain: Optional[str], +) -> None: + """Run a dry-run rotate across selected entries and report SUCCESS/NEEDS_MANUAL/FAILED counts.""" + try: + try: + cli.ensure_initialized() + except Exception: + # Attempt interactive unlock + password = click.prompt("Master password", hide_input=True) + cli.unlock_vault(password) + if engine == "playwright": + eng = PlaywrightEngine() + else: + if not SELENIUM_AVAILABLE: + raise click.ClickException("Selenium not installed. Install extra: pip install .[automation-selenium]") + eng = SeleniumEngine() + + eng.start(headless=True) + try: + flows = [ + GithubFlow(), GoogleFlow(), MicrosoftFlow(), TwitterFlow(), + FacebookFlow(), LinkedInFlow(), AppleFlow() + ] + runner = RotationRunner(eng, flows, cli.pm) # type: ignore[arg-type] + selector = RotationSelector( + all=all_, + compromised_only=compromised, + tag=tag, + domain=domain, + ) + results = runner.rotate(selector, lambda e: generate_for_entry(cli.pm, e), dry_run=True) # type: ignore[arg-type] + counts = {AutomationStatus.SUCCESS: 0, AutomationStatus.NEEDS_MANUAL: 0, AutomationStatus.FAILED: 0} + for r in results: + counts[r.status] = counts.get(r.status, 0) + 1 + console.print("[bold]Reliability Report[/bold]") + console.print(f"SUCCESS: {counts.get(AutomationStatus.SUCCESS, 0)}") + console.print(f"NEEDS_MANUAL: {counts.get(AutomationStatus.NEEDS_MANUAL, 0)}") + console.print(f"FAILED: {counts.get(AutomationStatus.FAILED, 0)}") + finally: + eng.stop() + except Exception as e: + console.print(f"[red]✗[/] Reliability report failed: {e}") + if cli.debug: + import traceback + console.print(traceback.format_exc()) + sys.exit(1) + +@cli.command() +@click.option( + "--password", + help="Master password (prompt if not provided)", + default=None +) +@click.pass_obj +def unlock(cli: CerberusCLI, password: Optional[str]) -> None: + """Unlock the password vault.""" + try: + cli.unlock_vault(password) + console.print("[green]✓[/] Vault unlocked!") + except Exception as e: + console.print(f"[red]✗[/] Failed to unlock vault: {e}") + sys.exit(1) + +@cli.command() +@click.option( + "--search", + "-s", + help="Filter entries by search term" +) +@click.pass_obj +def list(cli: CerberusCLI, search: Optional[str]) -> None: + """List all password entries.""" + try: + entries = cli.list_entries(search) + print_entry_table(entries) + except Exception as e: + console.print(f"[red]✗[/] Failed to list entries: {e}") + sys.exit(1) + +@cli.command() +@click.argument("identifier") +@click.pass_obj +def show(cli: CerberusCLI, identifier: str) -> None: + """Show details for a specific password entry.""" + try: + entry = cli.get_entry(identifier) + + console.print(f"[bold]Entry:[/bold] {entry.id}") + console.print(f"[bold]Website:[/bold] {entry.website}") + console.print(f"[bold]Username:[/bold] {entry.username}") + console.print(f"[bold]Password:[/bold] {'*' * 12} (use 'cerberus copy-password {entry.id}' to copy)") + + if entry.url: + console.print(f"[bold]URL:[/bold] {entry.url}") + if entry.notes: + console.print("[bold]Notes:[/bold]") + console.print(entry.notes) + if entry.tags: + console.print(f"[bold]Tags:[/bold] {', '.join(entry.tags)}") + + console.print(f"[dim]Created: {entry.created_at}") + console.print(f"[dim]Updated: {entry.updated_at}") + if entry.last_used: + console.print(f"[dim]Last Used: {entry.last_used}") + + except Exception as e: + console.print(f"[red]✗[/] Failed to get entry: {e}") + sys.exit(1) + +@cli.command() +@click.option( + "--website", + "-w", + required=True, + help="Website or service name" +) +@click.option( + "--username", + "-u", + required=True, + help="Username or email" +) +@click.option( + "--password", + "-p", + help="Password (prompt if not provided)" +) +@click.option( + "--url", + help="Website URL" +) +@click.option( + "--notes", + "-n", + help="Additional notes" +) +@click.option( + "--tag", + "-t", + "tags", + multiple=True, + help="Tags for organization (can be used multiple times)" +) +@click.option( + "--generate/--no-generate", + "-g", + default=False, + help="Generate a strong password" +) +@click.option( + "--length", + "-l", + type=int, + default=16, + help="Length of generated password" +) +@click.option( + "--no-special-chars", + is_flag=True, + default=False, + help="Exclude special characters from generated password" +) +@click.pass_obj +def add( + cli: CerberusCLI, + website: str, + username: str, + password: Optional[str], + url: str, + notes: str, + tags: list, + generate: bool, + length: int, + no_special_chars: bool +) -> None: + """Add a new password entry.""" + try: + entry = cli.add_entry( + website=website, + username=username, + password=password, + url=url, + notes=notes, + tags=list(tags) if tags else None, + generate=generate, + length=length, + special_chars=not no_special_chars + ) + + console.print(f"[green]✓[/] Added entry for [bold]{entry.website}[/bold]") + if generate: + console.print(f"Generated password: [yellow]{entry.password}[/]") + + except Exception as e: + console.print(f"[red]✗[/] Failed to add entry: {e}") + sys.exit(1) + +@cli.command() +@click.argument("identifier") +@click.option( + "--website", + "-w", + help="Update website name" +) +@click.option( + "--username", + "-u", + help="Update username" +) +@click.option( + "--password", + "-p", + help="Update password (prompt if not provided)" +) +@click.option( + "--url", + help="Update website URL" +) +@click.option( + "--notes", + "-n", + help="Update notes" +) +@click.option( + "--tag", + "-t", + "tags", + multiple=True, + help="Update tags (use --tag=clear to remove all tags)" +) +@click.pass_obj +def edit( + cli: CerberusCLI, + identifier: str, + website: str, + username: str, + password: str, + url: str, + notes: str, + tags: list +) -> None: + """Edit an existing password entry.""" + try: + # Get the current entry + entry = cli.get_entry(identifier) + + # Prepare updates + updates = {} + + if website is not None: + updates["website"] = website + if username is not None: + updates["username"] = username + if password is not None: + if password == "": + password = click.prompt("New password", hide_input=True, confirmation_prompt=True) + updates["password"] = password + if url is not None: + updates["url"] = url + if notes is not None: + updates["notes"] = notes + if tags: + if tags == ("clear",): + updates["tags"] = [] + else: + updates["tags"] = list(tags) + + if not updates: + console.print("[yellow]No changes specified.[/]") + return + + # Apply updates + updated_entry = cli.update_entry(entry.id, **updates) + console.print(f"[green]✓[/] Updated entry for [bold]{updated_entry.website}[/]") + + except Exception as e: + console.print(f"[red]✗[/] Failed to update entry: {e}") + sys.exit(1) + +@cli.command() +@click.argument("identifier") +@click.option( + "--length", + "-l", + type=int, + default=32, + help="Length of the new password" +) +@click.option( + "--no-special-chars", + is_flag=True, + default=False, + help="Exclude special characters from the new password" +) +@click.pass_obj +def rotate( + cli: CerberusCLI, + identifier: str, + length: int, + no_special_chars: bool +) -> None: + """Generate a new password for an entry.""" + try: + entry = cli.rotate_password( + identifier, + length=length, + special_chars=not no_special_chars + ) + + console.print(f"[green]✓[/] Rotated password for [bold]{entry.website}[/]") + console.print(f"New password: [yellow]{entry.password}[/]") + + except Exception as e: + console.print(f"[red]✗[/] Failed to rotate password: {e}") + sys.exit(1) + +@cli.command() +@click.argument("identifier") +@click.pass_obj +def delete(cli: CerberusCLI, identifier: str) -> None: + """Delete a password entry.""" + try: + cli.delete_entry(identifier) + except Exception as e: + console.print(f"[red]✗[/] Failed to delete entry: {e}") + sys.exit(1) + +@cli.command() +@click.argument("identifier") +@click.pass_obj +def copy_username(cli: CerberusCLI, identifier: str) -> None: + """Copy username to clipboard.""" + try: + entry = cli.get_entry(identifier) + + # Use platform-specific clipboard handling + import pyperclip + pyperclip.copy(entry.username) + + console.print(f"[green]✓[/] Copied username for [bold]{entry.website}[/] to clipboard") + + except Exception as e: + console.print(f"[red]✗[/] Failed to copy username: {e}") + sys.exit(1) + +@cli.command() +@click.argument("identifier") +@click.pass_obj +def copy_password(cli: CerberusCLI, identifier: str) -> None: + """Copy password to clipboard.""" + try: + entry = cli.get_entry(identifier) + + # Use platform-specific clipboard handling + import pyperclip + pyperclip.copy(entry.password) + + console.print(f"[green]✓[/] Copied password for [bold]{entry.website}[/] to clipboard") + + # Clear clipboard after 30 seconds + import threading + import time + + def clear_clipboard(): + time.sleep(30) + if pyperclip.paste() == entry.password: + pyperclip.copy("") + console.print("[yellow]✓[/] Clipboard cleared") + + threading.Thread(target=clear_clipboard, daemon=True).start() + + except Exception as e: + console.print(f"[red]✗[/] Failed to copy password: {e}") + sys.exit(1) + +@cli.command() +@click.argument("output_file", type=click.Path()) +@click.option( + "--format", + "-f", + type=click.Choice(["json"], case_sensitive=False), + default="json", + help="Export format" +) +@click.pass_obj +def export(cli: CerberusCLI, output_file: str, format: str) -> None: + """Export password entries to a file.""" + try: + cli.export_entries(output_file, format=format) + except Exception as e: + console.print(f"[red]✗[/] Failed to export entries: {e}") + sys.exit(1) + +@cli.command() +@click.argument("input_file", type=click.Path(exists=True)) +@click.option( + "--format", + "-f", + type=click.Choice(["json"], case_sensitive=False), + default="json", + help="Import format" +) +@click.pass_obj +def import_entries(cli: CerberusCLI, input_file: str, format: str) -> None: + """Import password entries from a file.""" + try: + cli.import_entries(input_file, format=format) + except Exception as e: + console.print(f"[red]✗[/] Failed to import entries: {e}") + sys.exit(1) + +@cli.command() +@click.pass_obj +def tui(cli: CerberusCLI) -> None: + """Launch the Terminal User Interface.""" + from ..tui import main as tui_main + tui_main() + +@cli.command() +@click.pass_obj +def gui(cli: CerberusCLI) -> None: + """Launch the Graphical User Interface.""" + from ..gui import run_app + run_app() + +@cli.command() +@click.argument("identifier", required=False) +@click.option("--engine", type=click.Choice(["playwright", "selenium"]), default="playwright", show_default=True) +@click.option("--all", "all_", is_flag=True, default=False, help="Rotate all entries") +@click.option("--compromised", is_flag=True, default=False, help="Only compromised entries") +@click.option("--tag", default=None, help="Filter by tag") +@click.option("--domain", default=None, help="Filter by domain") +@click.option("--dry-run", is_flag=True, default=False, help="Do not perform changes, simulate only") +@click.option("--user-data-dir", type=str, default=None, help="Browser user data dir for persistent sessions") +@click.option("--no-headless", is_flag=True, default=False, help="Run browser with a visible window") +@click.pass_obj +def web_rotate( + cli: CerberusCLI, + identifier: Optional[str], + engine: str, + all_: bool, + compromised: bool, + tag: Optional[str], + domain: Optional[str], + dry_run: bool, + user_data_dir: Optional[str], + no_headless: bool, +) -> None: + """Rotate password(s) on websites via web automation with dynamic discovery. + + If IDENTIFIER is provided, attempts to rotate only that entry (by id or website). + Otherwise uses filters (--all/--tag/--domain/--compromised). + """ + try: + cli.ensure_initialized() + # Create automation engine + if engine == "playwright": + eng = PlaywrightEngine() + else: + if not SELENIUM_AVAILABLE: + raise click.ClickException("Selenium not installed. Install extra: pip install .[automation-selenium]") + eng = SeleniumEngine() + + eng.start(headless=not no_headless, user_data_dir=user_data_dir) + try: + flows = [ + GithubFlow(), GoogleFlow(), MicrosoftFlow(), TwitterFlow(), + FacebookFlow(), LinkedInFlow(), AppleFlow() + ] + runner = RotationRunner(eng, flows, cli.pm) # type: ignore[arg-type] + if identifier: + # Build selector to target specific entry + selector = RotationSelector(all=False) + # Temporarily filter entries by overriding internals using domain/website matching + # We'll rely on runner._filter_entries via domain filter + # Best-effort: put identifier into domain filter + domain = identifier + selector = RotationSelector( + all=all_, + compromised_only=compromised, + tag=tag, + domain=domain, + ) + results = runner.rotate(selector, lambda e: generate_for_entry(cli.pm, e), dry_run=dry_run) # type: ignore[arg-type] + for r in results: + console.print(f"[bold]{r.status.value}[/]: {r.message}") + finally: + eng.stop() + except Exception as e: + console.print(f"[red]✗[/] Web rotate failed: {e}") + if cli.debug: + import traceback + console.print(traceback.format_exc()) + sys.exit(1) + +def main() -> None: + """Entry point for the Cerberus CLI.""" + try: + cli() + except Exception as e: + console.print(f"[red]Error:[/red] {e}") + if cli.obj and cli.obj.debug: + import traceback + console.print(traceback.format_exc()) + sys.exit(1) + +if __name__ == "__main__": + main() diff --git a/src/cerberus/core/Makefile b/src/cerberus/core/Makefile new file mode 100644 index 0000000..14569a9 --- /dev/null +++ b/src/cerberus/core/Makefile @@ -0,0 +1,31 @@ +# Compiler and flags +CC = gcc +CFLAGS = -fPIC -O2 -Wall -Wextra +LDFLAGS = -shared -fPIC -lssl -lcrypto -luuid +TARGET = libcerberus.so +SOURCES = cerberus.c +OBJECTS = $(SOURCES:.c=.o) + +.PHONY: all clean + +all: $(TARGET) + +$(TARGET): $(OBJECTS) + $(CC) $(LDFLAGS) -o $@ $^ + +%.o: %.c + $(CC) $(CFLAGS) -c -o $@ $< + +clean: + rm -f $(TARGET) $(OBJECTS) + +install: $(TARGET) + cp $(TARGET) /usr/local/lib/ + ldconfig + +uninstall: + rm -f /usr/local/lib/$(TARGET) + ldconfig + +test: $(TARGET) + python3 -m pytest tests/ diff --git a/src/cerberus/core/__init__.py b/src/cerberus/core/__init__.py new file mode 100644 index 0000000..5541b05 --- /dev/null +++ b/src/cerberus/core/__init__.py @@ -0,0 +1,127 @@ +"""Cerberus Core - Core functionality for the Cerberus password manager. + +This module provides the core functionality for the Cerberus password manager, +including the C core bindings and high-level password management interfaces. +""" + +import os +import cffi +from pathlib import Path +from typing import Optional, Any + +# Initialize CFFI (exported for callers that need to manage buffers) +ffi = cffi.FFI() + +def _load_header(): + cdef_src = ''' + typedef unsigned int uint32_t; + typedef unsigned long size_t; + typedef long time_t; + typedef int bool; + + typedef struct cerb_vault_t cerb_vault_t; + typedef struct cerb_entry_t cerb_entry_t; + + typedef struct { + char id[37]; + char website[256]; + char username[256]; + char password[1024]; + char notes[4096]; + char url[1024]; + time_t created_at; + time_t updated_at; + } cerb_entry_basic_t; + + int cerb_crypto_init(void); + void cerb_crypto_cleanup(void); + + int cerb_vault_create(const char *master_password, cerb_vault_t **vault); + int cerb_vault_open(const char *master_password, const char *vault_path, cerb_vault_t **vault); + int cerb_vault_save(cerb_vault_t *vault, const char *vault_path); + void cerb_vault_close(cerb_vault_t *vault); + + int cerb_vault_add_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry); + int cerb_vault_update_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry); + int cerb_vault_delete_entry(cerb_vault_t *vault, const char *entry_id); + int cerb_vault_get_entry_basic(cerb_vault_t *vault, const char *entry_id, cerb_entry_basic_t *entry); + int cerb_vault_get_entries_basic(cerb_vault_t *vault, cerb_entry_basic_t **entries, size_t *count); + int cerb_vault_search_basic(cerb_vault_t *vault, const char *query, cerb_entry_basic_t **results, size_t *count); + + int cerb_generate_password(uint32_t length, bool use_upper, bool use_lower, bool use_digits, bool use_special, char *buffer, size_t buffer_size); + void cerb_generate_uuid(char *uuid); + time_t cerb_current_timestamp(void); + ''' + ffi.cdef(cdef_src) + +# Load the header +_load_header() + +# Try to load the compiled library +_lib = None + +def init() -> bool: + """Initialize the Cerberus C core. + + Returns: + bool: True if initialization was successful, False otherwise + """ + global _lib + + if _lib is not None: + return True + + # Try multiple candidate names + candidates = [ + Path(__file__).parent / 'libcerberus.so', + Path(__file__).parent / 'cerberus.so' + ] + for lib_path in candidates: + try: + _lib = ffi.dlopen(str(lib_path)) + return True + except OSError: + continue + _lib = None + return False + +# Initialize on import +if not init(): + class _DummyLib: + def __getattribute__(self, name: str) -> Any: + raise RuntimeError( + "Cerberus C core not initialized. " + "Please ensure the core is compiled and in your library path." + ) + _lib = _DummyLib() + CORE_AVAILABLE = False +else: + CORE_AVAILABLE = True + +# Re-export the C functions with proper typing +for name in dir(_lib): + if name.startswith('cerb_'): + globals()[name] = getattr(_lib, name) + +# Error code constants (must match cerberus.h enum) +CERB_OK = 0 +CERB_ERROR = -1 + +# Clean up the namespace (keep ffi exported) +try: + del _DummyLib +except NameError: + pass +del os, Path, _load_header, init + +# Export high-level interfaces +from .password_manager import PasswordManager +from .models import PasswordEntry + +__all__ = [ + 'PasswordManager', + 'PasswordEntry', + 'VaultError', + 'CoreNotAvailableError', + 'ffi' +] diff --git a/src/cerberus/core/__pycache__/__init__.cpython-313.pyc b/src/cerberus/core/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..1807bd2 Binary files /dev/null and b/src/cerberus/core/__pycache__/__init__.cpython-313.pyc differ diff --git a/src/cerberus/core/__pycache__/models.cpython-313.pyc b/src/cerberus/core/__pycache__/models.cpython-313.pyc new file mode 100644 index 0000000..ffc2dd6 Binary files /dev/null and b/src/cerberus/core/__pycache__/models.cpython-313.pyc differ diff --git a/src/cerberus/core/__pycache__/password_manager.cpython-313.pyc b/src/cerberus/core/__pycache__/password_manager.cpython-313.pyc new file mode 100644 index 0000000..813cf2b Binary files /dev/null and b/src/cerberus/core/__pycache__/password_manager.cpython-313.pyc differ diff --git a/src/cerberus/core/cerberus.c b/src/cerberus/core/cerberus.c new file mode 100644 index 0000000..8511e3a --- /dev/null +++ b/src/cerberus/core/cerberus.c @@ -0,0 +1,530 @@ +#include "cerberus.h" +#include +#include +#include +#include +#include +#include +// uuid/uuid.h not required; implement UUID v4 using RAND_bytes + +// Vault structure +typedef struct { + uint8_t salt[SALT_LEN]; + uint8_t key[KEY_LEN]; + bool key_initialized; + cerb_entry_t *entries; + size_t num_entries; + size_t capacity; +} cerb_vault_internal_t; + +// Initialize crypto +cerb_error_t cerb_crypto_init(void) { + OpenSSL_add_all_algorithms(); + ERR_load_crypto_strings(); + return RAND_poll() ? CERB_OK : CERB_CRYPTO_ERROR; +} + +// Cleanup crypto +void cerb_crypto_cleanup(void) { + EVP_cleanup(); + ERR_free_strings(); +} + +// Create new vault +cerb_error_t cerb_vault_create(const char *master_password, cerb_vault_t **vault) { + if (!master_password || !vault) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = calloc(1, sizeof(cerb_vault_internal_t)); + if (!v) return CERB_MEMORY_ERROR; + + if (RAND_bytes(v->salt, SALT_LEN) != 1) { + free(v); + return CERB_CRYPTO_ERROR; + } + + // Derive key from password and salt + if (!PKCS5_PBKDF2_HMAC(master_password, (int)strlen(master_password), + v->salt, SALT_LEN, PBKDF2_ITERATIONS, + EVP_sha256(), KEY_LEN, v->key)) { + free(v); + return CERB_CRYPTO_ERROR; + } + v->key_initialized = true; + + v->capacity = 32; + v->entries = calloc(v->capacity, sizeof(cerb_entry_t)); + if (!v->entries) { + free(v); + return CERB_MEMORY_ERROR; + } + + *vault = (cerb_vault_t *)v; + return CERB_OK; +} + +// Save vault to file (AES-256-GCM encrypted blob) +cerb_error_t cerb_vault_save(cerb_vault_t *vault, const char *vault_path) { + if (!vault || !vault_path) return CERB_INVALID_ARG; + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + FILE *fp = fopen(vault_path, "wb"); + if (!fp) return CERB_STORAGE_ERROR; + + // Serialize entries: [num_entries][entries...] + size_t plain_len = sizeof(uint32_t) + v->num_entries * sizeof(cerb_entry_t); + unsigned char *plaintext = malloc(plain_len); + if (!plaintext) { fclose(fp); return CERB_MEMORY_ERROR; } + + uint32_t n = (uint32_t)v->num_entries; + memcpy(plaintext, &n, sizeof(uint32_t)); + if (v->num_entries > 0) { + memcpy(plaintext + sizeof(uint32_t), v->entries, v->num_entries * sizeof(cerb_entry_t)); + } + + // Prepare AES-GCM + unsigned char iv[IV_LEN]; + if (RAND_bytes(iv, IV_LEN) != 1) { free(plaintext); fclose(fp); return CERB_CRYPTO_ERROR; } + unsigned char *ciphertext = malloc(plain_len); + if (!ciphertext) { free(plaintext); fclose(fp); return CERB_MEMORY_ERROR; } + int len = 0, ciphertext_len = 0; + unsigned char tag[16]; + + EVP_CIPHER_CTX *ctx = EVP_CIPHER_CTX_new(); + if (!ctx) { free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; } + + if (EVP_EncryptInit_ex(ctx, EVP_aes_256_gcm(), NULL, NULL, NULL) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + if (EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_IVLEN, IV_LEN, NULL) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + if (EVP_EncryptInit_ex(ctx, NULL, NULL, v->key, iv) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + + if (EVP_EncryptUpdate(ctx, ciphertext, &len, plaintext, (int)plain_len) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + ciphertext_len = len; + if (EVP_EncryptFinal_ex(ctx, ciphertext + len, &len) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + ciphertext_len += len; + if (EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_GET_TAG, 16, tag) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + EVP_CIPHER_CTX_free(ctx); + + // Write file: MAGIC, VERSION, SALT, IV, TAG, CIPHERTEXT_LEN, CIPHERTEXT + const char magic[8] = { 'C','E','R','B','E','R','U','S' }; + uint32_t version = 1; + uint32_t clen = (uint32_t)ciphertext_len; + + if (fwrite(magic, 1, sizeof(magic), fp) != sizeof(magic) || + fwrite(&version, 1, sizeof(version), fp) != sizeof(version) || + fwrite(v->salt, 1, SALT_LEN, fp) != SALT_LEN || + fwrite(iv, 1, IV_LEN, fp) != IV_LEN || + fwrite(tag, 1, sizeof(tag), fp) != sizeof(tag) || + fwrite(&clen, 1, sizeof(clen), fp) != sizeof(clen) || + fwrite(ciphertext, 1, ciphertext_len, fp) != (size_t)ciphertext_len) { + free(plaintext); free(ciphertext); fclose(fp); return CERB_STORAGE_ERROR; + } + + free(plaintext); + free(ciphertext); + fclose(fp); + return CERB_OK; +} + +// Open vault from file +cerb_error_t cerb_vault_open(const char *master_password, const char *vault_path, cerb_vault_t **vault) { + if (!master_password || !vault_path || !vault) return CERB_INVALID_ARG; + FILE *fp = fopen(vault_path, "rb"); + if (!fp) return CERB_STORAGE_ERROR; + + const char expected_magic[8] = { 'C','E','R','B','E','R','U','S' }; + char magic[8]; + uint32_t version = 0; + unsigned char salt[SALT_LEN]; + unsigned char iv[IV_LEN]; + unsigned char tag[16]; + uint32_t clen = 0; + + if (fread(magic, 1, sizeof(magic), fp) != sizeof(magic) || + memcmp(magic, expected_magic, sizeof(magic)) != 0 || + fread(&version, 1, sizeof(version), fp) != sizeof(version) || + fread(salt, 1, SALT_LEN, fp) != SALT_LEN || + fread(iv, 1, IV_LEN, fp) != IV_LEN || + fread(tag, 1, sizeof(tag), fp) != sizeof(tag) || + fread(&clen, 1, sizeof(clen), fp) != sizeof(clen)) { + fclose(fp); + return CERB_STORAGE_ERROR; + } + + unsigned char *ciphertext = malloc(clen); + if (!ciphertext) { fclose(fp); return CERB_MEMORY_ERROR; } + if (fread(ciphertext, 1, clen, fp) != clen) { free(ciphertext); fclose(fp); return CERB_STORAGE_ERROR; } + fclose(fp); + + // Derive key + unsigned char key[KEY_LEN]; + if (!PKCS5_PBKDF2_HMAC(master_password, (int)strlen(master_password), + salt, SALT_LEN, PBKDF2_ITERATIONS, + EVP_sha256(), KEY_LEN, key)) { + free(ciphertext); + return CERB_CRYPTO_ERROR; + } + + // Decrypt + unsigned char *plaintext = malloc(clen); // ciphertext_len >= plaintext_len + if (!plaintext) { free(ciphertext); return CERB_MEMORY_ERROR; } + int len = 0, plain_len = 0; + cerb_error_t status = CERB_OK; + EVP_CIPHER_CTX *ctx = EVP_CIPHER_CTX_new(); + if (!ctx) { free(ciphertext); free(plaintext); return CERB_CRYPTO_ERROR; } + if (EVP_DecryptInit_ex(ctx, EVP_aes_256_gcm(), NULL, NULL, NULL) != 1) status = CERB_CRYPTO_ERROR; + if (status == CERB_OK && EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_IVLEN, IV_LEN, NULL) != 1) status = CERB_CRYPTO_ERROR; + if (status == CERB_OK && EVP_DecryptInit_ex(ctx, NULL, NULL, key, iv) != 1) status = CERB_CRYPTO_ERROR; + if (status == CERB_OK && EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, (int)clen) != 1) status = CERB_CRYPTO_ERROR; + plain_len = len; + if (status == CERB_OK && EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_TAG, 16, tag) != 1) status = CERB_CRYPTO_ERROR; + if (status == CERB_OK && EVP_DecryptFinal_ex(ctx, plaintext + len, &len) != 1) status = CERB_CRYPTO_ERROR; + plain_len += len; + EVP_CIPHER_CTX_free(ctx); + if (status != CERB_OK) { free(ciphertext); free(plaintext); return CERB_CRYPTO_ERROR; } + + // Deserialize + if ((size_t)plain_len < sizeof(uint32_t)) { free(ciphertext); free(plaintext); return CERB_STORAGE_ERROR; } + uint32_t n = 0; memcpy(&n, plaintext, sizeof(uint32_t)); + size_t expected = sizeof(uint32_t) + (size_t)n * sizeof(cerb_entry_t); + if ((size_t)plain_len != expected) { free(ciphertext); free(plaintext); return CERB_STORAGE_ERROR; } + + cerb_vault_internal_t *v = calloc(1, sizeof(cerb_vault_internal_t)); + if (!v) { free(ciphertext); free(plaintext); return CERB_MEMORY_ERROR; } + memcpy(v->salt, salt, SALT_LEN); + memcpy(v->key, key, KEY_LEN); + v->key_initialized = true; + v->capacity = n > 0 ? n : 32; + v->entries = calloc(v->capacity, sizeof(cerb_entry_t)); + if (!v->entries) { free(v); free(ciphertext); free(plaintext); return CERB_MEMORY_ERROR; } + v->num_entries = n; + if (n > 0) { + memcpy(v->entries, plaintext + sizeof(uint32_t), (size_t)n * sizeof(cerb_entry_t)); + } + + *vault = (cerb_vault_t *)v; + free(ciphertext); + free(plaintext); + return CERB_OK; +} + +// Add entry to vault +cerb_error_t cerb_vault_add_entry(cerb_vault_t *vault, const cerb_entry_t *entry) { + if (!vault || !entry) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + // Check for duplicates + for (size_t i = 0; i < v->num_entries; i++) { + if (strcmp(v->entries[i].id, entry->id) == 0) { + return CERB_DUPLICATE; + } + } + + // Resize if needed + if (v->num_entries >= v->capacity) { + size_t new_capacity = v->capacity * 2; + cerb_entry_t *new_entries = realloc(v->entries, new_capacity * sizeof(cerb_entry_t)); + if (!new_entries) return CERB_MEMORY_ERROR; + v->entries = new_entries; + v->capacity = new_capacity; + } + + // Add entry + v->entries[v->num_entries++] = *entry; + return CERB_OK; +} + +// Update existing entry +cerb_error_t cerb_vault_update_entry(cerb_vault_t *vault, const cerb_entry_t *entry) { + if (!vault || !entry) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + for (size_t i = 0; i < v->num_entries; i++) { + if (strcmp(v->entries[i].id, entry->id) == 0) { + v->entries[i] = *entry; + return CERB_OK; + } + } + + return CERB_NOT_FOUND; +} + +// Delete entry by ID +cerb_error_t cerb_vault_delete_entry(cerb_vault_t *vault, const char *entry_id) { + if (!vault || !entry_id) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + for (size_t i = 0; i < v->num_entries; i++) { + if (strcmp(v->entries[i].id, entry_id) == 0) { + // Move last entry into this slot to keep array compact + if (i != v->num_entries - 1) { + v->entries[i] = v->entries[v->num_entries - 1]; + } + memset(&v->entries[v->num_entries - 1], 0, sizeof(cerb_entry_t)); + v->num_entries--; + return CERB_OK; + } + } + + return CERB_NOT_FOUND; +} + +// Get entry by ID +cerb_error_t cerb_vault_get_entry(cerb_vault_t *vault, const char *entry_id, cerb_entry_t *entry) { + if (!vault || !entry_id || !entry) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + for (size_t i = 0; i < v->num_entries; i++) { + if (strcmp(v->entries[i].id, entry_id) == 0) { + *entry = v->entries[i]; + return CERB_OK; + } + } + + return CERB_NOT_FOUND; +} + +// Get all entries (returns a newly allocated array the caller must free) +cerb_error_t cerb_vault_get_entries(cerb_vault_t *vault, cerb_entry_t **entries, size_t *count) { + if (!vault || !entries || !count) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + if (v->num_entries == 0) { + *entries = NULL; + *count = 0; + return CERB_OK; + } + + cerb_entry_t *out = calloc(v->num_entries, sizeof(cerb_entry_t)); + if (!out) return CERB_MEMORY_ERROR; + + memcpy(out, v->entries, v->num_entries * sizeof(cerb_entry_t)); + *entries = out; + *count = v->num_entries; + return CERB_OK; +} + +// Basic substring search across website, username, and url +cerb_error_t cerb_vault_search(cerb_vault_t *vault, const char *query, cerb_entry_t **results, size_t *count) { + if (!vault || !results || !count) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + if (!query || *query == '\0') { + return cerb_vault_get_entries(vault, results, count); + } + + size_t matched = 0; + // First pass: count + for (size_t i = 0; i < v->num_entries; i++) { + if ((strstr(v->entries[i].website, query) != NULL) || + (strstr(v->entries[i].username, query) != NULL) || + (strstr(v->entries[i].url, query) != NULL)) { + matched++; + } + } + + if (matched == 0) { + *results = NULL; + *count = 0; + return CERB_OK; + } + + cerb_entry_t *out = calloc(matched, sizeof(cerb_entry_t)); + if (!out) return CERB_MEMORY_ERROR; + + size_t idx = 0; + for (size_t i = 0; i < v->num_entries; i++) { + if ((strstr(v->entries[i].website, query) != NULL) || + (strstr(v->entries[i].username, query) != NULL) || + (strstr(v->entries[i].url, query) != NULL)) { + out[idx++] = v->entries[i]; + } + } + + *results = out; + *count = matched; + return CERB_OK; +} + +// Basic variants for FFI (flattened struct) +static void entry_to_basic(const cerb_entry_t *in, cerb_entry_basic_t *out) { + memset(out, 0, sizeof(*out)); + strncpy(out->id, in->id, sizeof(out->id)-1); + strncpy(out->website, in->website, sizeof(out->website)-1); + strncpy(out->username, in->username, sizeof(out->username)-1); + strncpy(out->password, in->password, sizeof(out->password)-1); + strncpy(out->notes, in->notes, sizeof(out->notes)-1); + strncpy(out->url, in->url, sizeof(out->url)-1); + out->created_at = in->created_at; + out->updated_at = in->updated_at; +} + +static void basic_to_entry(const cerb_entry_basic_t *in, cerb_entry_t *out) { + memset(out, 0, sizeof(*out)); + strncpy(out->id, in->id, sizeof(out->id)-1); + strncpy(out->website, in->website, sizeof(out->website)-1); + strncpy(out->username, in->username, sizeof(out->username)-1); + strncpy(out->password, in->password, sizeof(out->password)-1); + strncpy(out->notes, in->notes, sizeof(out->notes)-1); + strncpy(out->url, in->url, sizeof(out->url)-1); + out->created_at = in->created_at; + out->updated_at = in->updated_at; +} + +cerb_error_t cerb_vault_add_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry) { + cerb_entry_t full; + basic_to_entry(entry, &full); + return cerb_vault_add_entry(vault, &full); +} + +cerb_error_t cerb_vault_update_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry) { + cerb_entry_t full; + basic_to_entry(entry, &full); + return cerb_vault_update_entry(vault, &full); +} + +cerb_error_t cerb_vault_get_entry_basic(cerb_vault_t *vault, const char *entry_id, cerb_entry_basic_t *entry) { + cerb_entry_t full; + cerb_error_t r = cerb_vault_get_entry(vault, entry_id, &full); + if (r != CERB_OK) return r; + entry_to_basic(&full, entry); + return CERB_OK; +} + +cerb_error_t cerb_vault_get_entries_basic(cerb_vault_t *vault, cerb_entry_basic_t **entries, size_t *count) { + cerb_entry_t *full = NULL; + cerb_error_t r = cerb_vault_get_entries(vault, &full, count); + if (r != CERB_OK) return r; + if (*count == 0) { *entries = NULL; return CERB_OK; } + cerb_entry_basic_t *out = calloc(*count, sizeof(cerb_entry_basic_t)); + if (!out) return CERB_MEMORY_ERROR; + for (size_t i = 0; i < *count; i++) entry_to_basic(&full[i], &out[i]); + *entries = out; + free(full); + return CERB_OK; +} + +cerb_error_t cerb_vault_search_basic(cerb_vault_t *vault, const char *query, cerb_entry_basic_t **results, size_t *count) { + cerb_entry_t *full = NULL; + cerb_error_t r = cerb_vault_search(vault, query, &full, count); + if (r != CERB_OK) return r; + if (*count == 0) { *results = NULL; return CERB_OK; } + cerb_entry_basic_t *out = calloc(*count, sizeof(cerb_entry_basic_t)); + if (!out) return CERB_MEMORY_ERROR; + for (size_t i = 0; i < *count; i++) entry_to_basic(&full[i], &out[i]); + *results = out; + free(full); + return CERB_OK; +} + +// Generate password +cerb_error_t cerb_generate_password( + uint32_t length, + bool use_upper, + bool use_lower, + bool use_digits, + bool use_special, + char *buffer, + size_t buffer_size +) { + if (!buffer || length < 8 || length > MAX_PASSWORD_LEN || buffer_size < length + 1) { + return CERB_INVALID_ARG; + } + + const char *lower = "abcdefghijklmnopqrstuvwxyz"; + const char *upper = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + const char *digits = "0123456789"; + const char *special = "!@#$%^&*()-_=+[]{}|;:,.<>?"; + + char charset[256] = {0}; + size_t pos = 0; + + if (use_lower) { strcpy(charset + pos, lower); pos += strlen(lower); } + if (use_upper) { strcpy(charset + pos, upper); pos += strlen(upper); } + if (use_digits) { strcpy(charset + pos, digits); pos += strlen(digits); } + if (use_special) { strcpy(charset + pos, special); pos += strlen(special); } + + if (pos == 0) return CERB_INVALID_ARG; + + // Generate random password + for (size_t i = 0; i < length; i++) { + unsigned char byte; + do { + if (RAND_bytes(&byte, 1) != 1) { + return CERB_CRYPTO_ERROR; + } + } while (byte >= (256 / pos) * pos); + + buffer[i] = charset[byte % pos]; + } + + buffer[length] = '\0'; + return CERB_OK; +} + +// Generate UUID v4 (xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx) +void cerb_generate_uuid(char *uuid) { + unsigned char bytes[16]; + if (RAND_bytes(bytes, sizeof(bytes)) != 1) { + // Fallback to zeroed UUID on failure + memset(uuid, '0', 36); + uuid[8] = uuid[13] = uuid[18] = uuid[23] = '-'; + uuid[36] = '\0'; + return; + } + // Set version (4) + bytes[6] = (bytes[6] & 0x0F) | 0x40; + // Set variant (10xx) + bytes[8] = (bytes[8] & 0x3F) | 0x80; + + static const char *hex = "0123456789abcdef"; + int p = 0; + for (int i = 0; i < 16; i++) { + if (i == 4 || i == 6 || i == 8 || i == 10) { + uuid[p++] = '-'; + } + uuid[p++] = hex[(bytes[i] >> 4) & 0x0F]; + uuid[p++] = hex[bytes[i] & 0x0F]; + } + uuid[p] = '\0'; +} + +// Get current timestamp +time_t cerb_current_timestamp(void) { + return time(NULL); +} + +// Cleanup vault +void cerb_vault_close(cerb_vault_t *vault) { + if (!vault) return; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + // Securely wipe sensitive data + memset(v->key, 0, KEY_LEN); + memset(v->salt, 0, SALT_LEN); + + // Wipe entries + for (size_t i = 0; i < v->num_entries; i++) { + memset(&v->entries[i], 0, sizeof(cerb_entry_t)); + } + + free(v->entries); + free(v); +} diff --git a/src/cerberus/core/cerberus.h b/src/cerberus/core/cerberus.h new file mode 100644 index 0000000..06c736a --- /dev/null +++ b/src/cerberus/core/cerberus.h @@ -0,0 +1,144 @@ +#ifndef CERBERUS_CORE_H +#define CERBERUS_CORE_H + +#include +#include +#include + +// Maximum lengths for fields +#define MAX_WEBSITE_LEN 256 +#define MAX_USERNAME_LEN 256 +#define MAX_PASSWORD_LEN 1024 +#define MAX_NOTES_LEN 4096 +#define MAX_TAGS 32 +#define MAX_TAG_LEN 64 +#define MAX_CUSTOM_FIELDS 32 +#define MAX_CUSTOM_KEY_LEN 64 +#define MAX_CUSTOM_VALUE_LEN 1024 +#define MAX_ENTRIES 65536 +#define SALT_LEN 32 +#define KEY_LEN 32 // 256 bits for AES-256 +#define IV_LEN 16 // 128 bits for AES block size +#define PBKDF2_ITERATIONS 100000 + +// Error codes +typedef enum { + CERB_OK = 0, + CERB_ERROR = -1, + CERB_INVALID_ARG = -2, + CERB_NOT_FOUND = -3, + CERB_DUPLICATE = -4, + CERB_STORAGE_ERROR = -5, + CERB_CRYPTO_ERROR = -6, + CERB_MEMORY_ERROR = -7, + CERB_INVALID_STATE = -8, + CERB_NOT_IMPLEMENTED = -9 +} cerb_error_t; + +// Custom field structure +typedef struct { + char key[MAX_CUSTOM_KEY_LEN]; + char value[MAX_CUSTOM_VALUE_LEN]; +} cerb_custom_field_t; + +// Password entry structure +typedef struct { + char id[37]; // UUID string (36 chars + null terminator) + char website[MAX_WEBSITE_LEN]; + char username[MAX_USERNAME_LEN]; + char password[MAX_PASSWORD_LEN]; + char notes[MAX_NOTES_LEN]; + char url[1024]; + char tags[MAX_TAGS][MAX_TAG_LEN]; + size_t num_tags; + time_t created_at; + time_t updated_at; + time_t last_used; + bool favorite; + cerb_custom_field_t custom_fields[MAX_CUSTOM_FIELDS]; + size_t num_custom_fields; +} cerb_entry_t; + +// Vault structure +typedef struct cerb_vault_t cerb_vault_t; + +// Basic entry struct for FFI bindings (avoids nested arrays) +typedef struct { + char id[37]; + char website[MAX_WEBSITE_LEN]; + char username[MAX_USERNAME_LEN]; + char password[MAX_PASSWORD_LEN]; + char notes[MAX_NOTES_LEN]; + char url[1024]; + time_t created_at; + time_t updated_at; +} cerb_entry_basic_t; + +// Core API + +// Initialize the crypto subsystem +cerb_error_t cerb_crypto_init(void); + +// Cleanup the crypto subsystem +void cerb_crypto_cleanup(void); + +// Initialize a new vault +cerb_error_t cerb_vault_create(const char *master_password, cerb_vault_t **vault); + +// Open an existing vault +cerb_error_t cerb_vault_open(const char *master_password, const char *vault_path, cerb_vault_t **vault); + +// Save vault to file +cerb_error_t cerb_vault_save(cerb_vault_t *vault, const char *vault_path); + +// Close and free a vault +void cerb_vault_close(cerb_vault_t *vault); + +// Add a new entry to the vault +cerb_error_t cerb_vault_add_entry(cerb_vault_t *vault, const cerb_entry_t *entry); +cerb_error_t cerb_vault_add_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry); + +// Update an existing entry +cerb_error_t cerb_vault_update_entry(cerb_vault_t *vault, const cerb_entry_t *entry); +cerb_error_t cerb_vault_update_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry); + +// Delete an entry by ID +cerb_error_t cerb_vault_delete_entry(cerb_vault_t *vault, const char *entry_id); + +// Get an entry by ID +cerb_error_t cerb_vault_get_entry(cerb_vault_t *vault, const char *entry_id, cerb_entry_t *entry); +cerb_error_t cerb_vault_get_entry_basic(cerb_vault_t *vault, const char *entry_id, cerb_entry_basic_t *entry); + +// Get all entries +cerb_error_t cerb_vault_get_entries(cerb_vault_t *vault, cerb_entry_t **entries, size_t *count); +cerb_error_t cerb_vault_get_entries_basic(cerb_vault_t *vault, cerb_entry_basic_t **entries, size_t *count); + +// Search entries by query string +cerb_error_t cerb_vault_search(cerb_vault_t *vault, const char *query, cerb_entry_t **results, size_t *count); +cerb_error_t cerb_vault_search_basic(cerb_vault_t *vault, const char *query, cerb_entry_basic_t **results, size_t *count); + +// Generate a secure random password +cerb_error_t cerb_generate_password( + uint32_t length, + bool use_upper, + bool use_lower, + bool use_digits, + bool use_special, + char *buffer, + size_t buffer_size +); + +// Import from other password managers +cerb_error_t cerb_import_bitwarden_json(cerb_vault_t *vault, const char *json_path); +cerb_error_t cerb_import_lastpass_csv(cerb_vault_t *vault, const char *csv_path); +cerb_error_t cerb_import_chrome_csv(cerb_vault_t *vault, const char *csv_path); + +// Export to various formats +cerb_error_t cerb_export_json(cerb_vault_t *vault, const char *json_path); +cerb_error_t cerb_export_csv(cerb_vault_t *vault, const char *csv_path); + +// Utility functions +void cerb_generate_uuid(char *uuid); +time_t cerb_current_timestamp(void); + +#endif // CERBERUS_CORE_H diff --git a/src/cerberus/core/cerberus.so b/src/cerberus/core/cerberus.so new file mode 100755 index 0000000..2cdd65a Binary files /dev/null and b/src/cerberus/core/cerberus.so differ diff --git a/src/cerberus/core/models.py b/src/cerberus/core/models.py new file mode 100644 index 0000000..ecdcb3c --- /dev/null +++ b/src/cerberus/core/models.py @@ -0,0 +1,58 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import Optional, List, Dict, Any +import json + +@dataclass +class PasswordEntry: + """Represents a single password entry in the password manager.""" + id: str + website: str + username: str + password: str + url: str = "" + notes: str = "" + tags: List[str] = field(default_factory=list) + created_at: datetime = field(default_factory=datetime.utcnow) + updated_at: datetime = field(default_factory=datetime.utcnow) + last_used: Optional[datetime] = None + password_strength: Optional[float] = None + compromised: bool = False + custom_fields: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + """Convert the password entry to a dictionary for serialization.""" + return { + 'id': self.id, + 'website': self.website, + 'username': self.username, + 'password': self.password, + 'url': self.url, + 'notes': self.notes, + 'tags': self.tags, + 'created_at': self.created_at.isoformat(), + 'updated_at': self.updated_at.isoformat(), + 'last_used': self.last_used.isoformat() if self.last_used else None, + 'password_strength': self.password_strength, + 'compromised': self.compromised, + 'custom_fields': self.custom_fields + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'PasswordEntry': + """Create a PasswordEntry from a dictionary.""" + return cls( + id=data.get('id', ''), + website=data['website'], + username=data['username'], + password=data['password'], + url=data.get('url', ''), + notes=data.get('notes', ''), + tags=data.get('tags', []), + created_at=datetime.fromisoformat(data['created_at']), + updated_at=datetime.fromisoformat(data['updated_at']), + last_used=datetime.fromisoformat(data['last_used']) if data.get('last_used') else None, + password_strength=data.get('password_strength'), + compromised=data.get('compromised', False), + custom_fields=data.get('custom_fields', {}) + ) diff --git a/src/cerberus/core/password_manager.py b/src/cerberus/core/password_manager.py new file mode 100644 index 0000000..018c727 --- /dev/null +++ b/src/cerberus/core/password_manager.py @@ -0,0 +1,462 @@ +import os +import json +import logging +from pathlib import Path +from typing import Dict, List, Optional, Union, Any +from datetime import datetime + +logger = logging.getLogger(__name__) + +# Import the C core from our package +try: + from . import ( + cerb_crypto_init, cerb_crypto_cleanup, + cerb_vault_create, cerb_vault_open, cerb_vault_save, cerb_vault_close, + cerb_vault_add_entry_basic, cerb_vault_update_entry_basic, cerb_vault_delete_entry, + cerb_vault_get_entry_basic, cerb_vault_get_entries_basic, cerb_vault_search_basic, + cerb_generate_password, cerb_generate_uuid, cerb_current_timestamp, + CERB_OK, CERB_ERROR, ffi + ) + CORE_AVAILABLE = True +except (ImportError, OSError) as e: + CORE_AVAILABLE = False + logger.warning("Cerberus C core not available. Using Python fallback.") + +from .models import PasswordEntry +from ..integrations import get_integration, IntegrationError + +class VaultError(Exception): + """Base exception for vault-related errors.""" + pass + +class CoreNotAvailableError(VaultError): + """Raised when the C core is not available.""" + pass + +class PasswordManager: + """Core password manager with C-based encryption and integration support.""" + + def __init__(self, data_dir: str = None, master_password: str = None): + """Initialize the password manager. + + Args: + data_dir: Directory to store password data + master_password: Master password for encryption + """ + if not CORE_AVAILABLE: + raise CoreNotAvailableError( + "Cerberus C core not available. Please compile it first." + ) + + self.data_dir = Path(data_dir or os.path.expanduser("~/.cerberus_pm")) + self.data_dir.mkdir(parents=True, exist_ok=True) + + self.master_password = master_password + self.vault_file = self.data_dir / "vault.cerb" + self._vault = ffi.NULL + + # Initialize crypto + if cerb_crypto_init() != CERB_OK: + raise VaultError("Failed to initialize crypto") + + # Create or load vault if master password is provided + if master_password: + if self.vault_file.exists(): + self._open_vault() + else: + self._create_vault() + + def _create_vault(self): + """Create a new vault.""" + if not self.master_password: + raise VaultError("Master password is required to create a vault") + + vault_ptr = ffi.new("cerb_vault_t**") + result = cerb_vault_create( + self.master_password.encode('utf-8'), + vault_ptr + ) + + if result != CERB_OK: + raise VaultError(f"Failed to create vault: error code {result}") + + self._vault = vault_ptr[0] + logger.info("Created new vault") + + def _open_vault(self): + """Open an existing vault.""" + if not self.vault_file.exists(): + raise VaultError(f"Vault file not found: {self.vault_file}") + + if not self.master_password: + raise VaultError("Master password is required to open the vault") + + # Open existing vault from file using C core + vault_ptr = ffi.new("cerb_vault_t**") + result = cerb_vault_open( + self.master_password.encode('utf-8'), + str(self.vault_file).encode('utf-8'), + vault_ptr + ) + if result != CERB_OK: + raise VaultError(f"Failed to open vault: error code {result}") + self._vault = vault_ptr[0] + + def save_vault(self): + """Save the vault to disk.""" + if self._vault == ffi.NULL: + raise VaultError("No vault is open") + + result = cerb_vault_save(self._vault, str(self.vault_file).encode('utf-8')) + if result != CERB_OK: + raise VaultError(f"Failed to save vault: error code {result}") + + def initialize(self, master_password: str) -> bool: + """Initialize the password manager with a master password. + + Args: + master_password: The master password + + Returns: + bool: True if initialization was successful + """ + self.master_password = master_password + self._create_vault() + return True + + def unlock(self, master_password: str) -> bool: + """Unlock the password manager with the master password. + + Args: + master_password: The master password + + Returns: + bool: True if unlock was successful + """ + try: + self.master_password = master_password + self._open_vault() + return True + except VaultError as e: + logger.error(f"Failed to unlock password manager: {e}") + return False + + def _find_entries(self, query: str = None) -> List[PasswordEntry]: + """Find entries matching the query. + + Args: + query: Search query (e.g., 'website:example.com', 'tag:work') + + Returns: + List of matching PasswordEntry objects + """ + if self._vault == ffi.NULL: + raise VaultError("No vault is open") + + entries_ptr = ffi.new("cerb_entry_basic_t**") + count_ptr = ffi.new("size_t*") + if query: + result = cerb_vault_search_basic(self._vault, query.encode('utf-8'), entries_ptr, count_ptr) + else: + result = cerb_vault_get_entries_basic(self._vault, entries_ptr, count_ptr) + if result != CERB_OK: + raise VaultError(f"Search failed: error code {result}") + + results: List[PasswordEntry] = [] + count = int(count_ptr[0]) + if count == 0: + return results + + entries_array = entries_ptr[0] + for i in range(count): + c_entry = entries_array[i] + results.append(PasswordEntry( + id=ffi.string(c_entry.id).decode('utf-8'), + website=ffi.string(c_entry.website).decode('utf-8'), + username=ffi.string(c_entry.username).decode('utf-8'), + password=ffi.string(c_entry.password).decode('utf-8'), + url=ffi.string(c_entry.url).decode('utf-8'), + notes=ffi.string(c_entry.notes).decode('utf-8'), + )) + + # Caller-owned memory: free via C if a free function exists; else rely on C API contract + return results + + def add_password(self, entry: PasswordEntry) -> str: + """Add a new password entry to the vault. + + Args: + entry: The password entry to add + + Returns: + str: The ID of the new entry + + Raises: + VaultError: If the vault is not open or an error occurs + """ + if self._vault == ffi.NULL: + raise VaultError("No vault is open") + + # Create a new C entry + c_entry = ffi.new("cerb_entry_basic_t*") + + # Generate a new UUID if not provided + if entry.id: + entry_id = entry.id + else: + uuid_buf = ffi.new("char[37]") + cerb_generate_uuid(uuid_buf) + entry_id = ffi.string(uuid_buf).decode('utf-8') + + # Set fields + ffi.memmove(c_entry.id, entry_id.encode('utf-8'), len(entry_id)) + c_entry.id[len(entry_id)] = b'\0' + for field_name in ["website", "username", "password", "notes", "url"]: + val = (getattr(entry, field_name) or '').encode('utf-8') + buf = getattr(c_entry, field_name) + ffi.memmove(buf, val, len(val)) + buf[len(val)] = b'\0' + c_entry.created_at = int(entry.created_at.timestamp()) if entry.created_at else cerb_current_timestamp() + c_entry.updated_at = int(entry.updated_at.timestamp()) if entry.updated_at else c_entry.created_at + + # Add to vault + result = cerb_vault_add_entry_basic(self._vault, c_entry) + if result != CERB_OK: + raise VaultError(f"Failed to add entry: error code {result}") + + # Save the vault + self.save_vault() + + return entry_id + + def update_password(self, entry_id: str, **updates) -> bool: + """Update an existing password entry. + + Args: + entry_id: The ID of the entry to update + **updates: Fields to update + + Returns: + bool: True if the update was successful + + Raises: + VaultError: If the vault is not open or an error occurs + ValueError: If the entry is not found + """ + if self._vault == ffi.NULL: + raise VaultError("No vault is open") + + # Fetch, modify, send to C update + c_existing = ffi.new("cerb_entry_basic_t*") + res = cerb_vault_get_entry_basic(self._vault, entry_id.encode('utf-8'), c_existing) + if res != CERB_OK: + raise ValueError(f"Entry with ID {entry_id} not found") + + # Apply updates + def _set_field(dst, value: str, max_len: int): + data = (value or '').encode('utf-8') + ln = min(len(data), max_len - 1) + ffi.memmove(dst, data, ln) + dst[ln] = b'\0' + + for key, value in updates.items(): + if key == 'website': _set_field(c_existing.website, value, 256) + elif key == 'username': _set_field(c_existing.username, value, 256) + elif key == 'password': _set_field(c_existing.password, value, 1024) + elif key == 'notes': _set_field(c_existing.notes, value, 4096) + elif key == 'url': _set_field(c_existing.url, value, 1024) + + c_existing.updated_at = int(datetime.utcnow().timestamp()) + + res = cerb_vault_update_entry_basic(self._vault, c_existing) + if res != CERB_OK: + raise VaultError(f"Failed to update entry: error code {res}") + self.save_vault() + return True + + def delete_password(self, entry_id: str) -> bool: + """Delete a password entry. + + Args: + entry_id: The ID of the entry to delete + + Returns: + bool: True if the deletion was successful + + Raises: + VaultError: If the vault is not open or an error occurs + """ + if self._vault == ffi.NULL: + raise VaultError("No vault is open") + + result = cerb_vault_delete_entry(self._vault, entry_id.encode('utf-8')) + if result != CERB_OK: + raise VaultError(f"Failed to delete entry: error code {result}") + + # Save the vault + self.save_vault() + return True + + def get_password(self, entry_id: str) -> Optional[PasswordEntry]: + """Get a password entry by ID. + + Args: + entry_id: The ID of the entry to retrieve + + Returns: + PasswordEntry if found, None otherwise + """ + c_entry = ffi.new("cerb_entry_basic_t*") + res = cerb_vault_get_entry_basic(self._vault, entry_id.encode('utf-8'), c_entry) + if res != CERB_OK: + return None + return PasswordEntry( + id=ffi.string(c_entry.id).decode('utf-8'), + website=ffi.string(c_entry.website).decode('utf-8'), + username=ffi.string(c_entry.username).decode('utf-8'), + password=ffi.string(c_entry.password).decode('utf-8'), + url=ffi.string(c_entry.url).decode('utf-8'), + notes=ffi.string(c_entry.notes).decode('utf-8'), + ) + + def list_passwords(self) -> List[PasswordEntry]: + """List all password entries. + + Returns: + List of all PasswordEntry objects + """ + return self._find_entries() + + def search_passwords(self, query: str) -> List[PasswordEntry]: + """Search password entries by website, username, or tags. + + Args: + query: Search query + + Returns: + List of matching PasswordEntry objects + """ + return self._find_entries(query) + + @staticmethod + def generate_password( + length: int = 16, + use_upper: bool = True, + use_lower: bool = True, + use_digits: bool = True, + use_special: bool = True + ) -> str: + """Generate a secure random password using the C core. + + Args: + length: Length of the password (8-1024) + use_upper: Include uppercase letters + use_lower: Include lowercase letters + use_digits: Include digits + use_special: Include special characters + + Returns: + The generated password + + Raises: + VaultError: If password generation fails + """ + if not CORE_AVAILABLE: + raise CoreNotAvailableError("C core not available for password generation") + + # Validate length + if length < 8 or length > 1024: + raise ValueError("Password length must be between 8 and 1024 characters") + + # At least one character set must be selected + if not (use_upper or use_lower or use_digits or use_special): + raise ValueError("At least one character set must be selected") + + # Allocate buffer for the password (+1 for null terminator) + buffer = ffi.new(f"char[{length + 1}]") # +1 for null terminator + + # Generate the password + result = cerb_generate_password( + length, + use_upper, + use_lower, + use_digits, + use_special, + buffer, + length + 1 # Include space for null terminator + ) + + if result != CERB_OK: + raise VaultError(f"Failed to generate password: error code {result}") + + # Convert from C string to Python string + password = ffi.string(buffer).decode('utf-8') + return password + + # ---- Convenience methods for CLI/TUI compatibility ---- + def generate_id(self) -> str: + """Generate a UUID string using the C core.""" + if not CORE_AVAILABLE: + raise CoreNotAvailableError("C core not available for ID generation") + uuid_buf = ffi.new("char[37]") + cerb_generate_uuid(uuid_buf) + return ffi.string(uuid_buf).decode("utf-8") + + def get_entries(self) -> List[PasswordEntry]: + """Alias for list_passwords().""" + return self.list_passwords() + + def add_entry(self, entry: PasswordEntry) -> str: + """Alias for add_password(entry).""" + return self.add_password(entry) + + def update_entry(self, entry: PasswordEntry) -> bool: + """Update an existing entry using the entry object.""" + fields = { + "website": entry.website, + "username": entry.username, + "password": entry.password, + "notes": entry.notes, + "url": entry.url, + } + return self.update_password(entry.id, **fields) + + def delete_entry(self, entry_id: str) -> bool: + """Alias for delete_password(entry_id).""" + return self.delete_password(entry_id) + + def get_entry(self, identifier: str) -> PasswordEntry: + """Get an entry by ID, or by exact website match as a fallback.""" + entry = self.get_password(identifier) + if entry is not None: + return entry + # Fallback: search by exact website name + matches = [e for e in self.list_passwords() if (e.website or "").lower() == (identifier or "").lower()] + if not matches: + raise ValueError(f"Entry not found: {identifier}") + if len(matches) > 1: + raise ValueError(f"Multiple entries found for website '{identifier}'. Use the entry ID instead.") + return matches[0] + + def generate_password_easy( + self, + length: int = 16, + special: bool = True, + upper: Optional[bool] = None, + lower: Optional[bool] = None, + digits: Optional[bool] = None, + ) -> str: + """Friendly wrapper for password generation used by new CLI/TUI. + + Args: + length: desired length + special: include special characters (maps to use_special) + upper, lower, digits: if None use defaults (True). If bool provided, override. + """ + return PasswordManager.generate_password( + length=length, + use_upper=True if upper is None else upper, + use_lower=True if lower is None else lower, + use_digits=True if digits is None else digits, + use_special=special, + ) diff --git a/src/cerberus/gui/__init__.py b/src/cerberus/gui/__init__.py new file mode 100644 index 0000000..5cc77e9 --- /dev/null +++ b/src/cerberus/gui/__init__.py @@ -0,0 +1,6 @@ +""" +Graphical User Interface (GUI) for Cerberus Password Manager. +""" +from .main_window import run_app + +__all__ = ["run_app"] diff --git a/src/cerberus/gui/main_window.py b/src/cerberus/gui/main_window.py new file mode 100644 index 0000000..a7cac07 --- /dev/null +++ b/src/cerberus/gui/main_window.py @@ -0,0 +1,269 @@ +""" +PyQt6-based GUI for Cerberus Password Manager. +""" +from __future__ import annotations + +import sys +from dataclasses import asdict +from typing import Optional, List + +from PyQt6.QtCore import Qt +from PyQt6.QtWidgets import ( + QApplication, + QMainWindow, + QWidget, + QVBoxLayout, + QHBoxLayout, + QListWidget, + QListWidgetItem, + QPushButton, + QLabel, + QLineEdit, + QTextEdit, + QMessageBox, + QInputDialog, + QFileDialog, +) + +from ..core.password_manager import PasswordManager, VaultError +from ..core.models import PasswordEntry + + +class MainWindow(QMainWindow): + def __init__(self, pm: PasswordManager): + super().__init__() + self.pm = pm + self.setWindowTitle("Cerberus Password Manager") + self.resize(900, 600) + + # Root container + root = QWidget() + layout = QHBoxLayout() + root.setLayout(layout) + self.setCentralWidget(root) + + # Left: list + left = QWidget() + left_layout = QVBoxLayout() + left.setLayout(left_layout) + layout.addWidget(left, 1) + + self.search_input = QLineEdit() + self.search_input.setPlaceholderText("Search website, username, tags...") + self.search_input.textChanged.connect(self.refresh_list) + left_layout.addWidget(self.search_input) + + self.list_widget = QListWidget() + self.list_widget.itemSelectionChanged.connect(self.on_selection_changed) + left_layout.addWidget(self.list_widget, 1) + + btn_bar = QHBoxLayout() + self.btn_add = QPushButton("Add") + self.btn_edit = QPushButton("Edit") + self.btn_rotate = QPushButton("Rotate") + self.btn_delete = QPushButton("Delete") + self.btn_export = QPushButton("Export") + self.btn_import = QPushButton("Import") + btn_bar.addWidget(self.btn_add) + btn_bar.addWidget(self.btn_edit) + btn_bar.addWidget(self.btn_rotate) + btn_bar.addWidget(self.btn_delete) + btn_bar.addWidget(self.btn_export) + btn_bar.addWidget(self.btn_import) + left_layout.addLayout(btn_bar) + + # Right: detail + right = QWidget() + right_layout = QVBoxLayout() + right.setLayout(right_layout) + layout.addWidget(right, 2) + + self.lbl_website = QLineEdit() + self.lbl_username = QLineEdit() + self.lbl_password = QLineEdit() + self.lbl_password.setEchoMode(QLineEdit.EchoMode.Password) + self.lbl_url = QLineEdit() + self.txt_notes = QTextEdit() + + right_layout.addWidget(QLabel("Website")) + right_layout.addWidget(self.lbl_website) + right_layout.addWidget(QLabel("Username")) + right_layout.addWidget(self.lbl_username) + right_layout.addWidget(QLabel("Password")) + right_layout.addWidget(self.lbl_password) + right_layout.addWidget(QLabel("URL")) + right_layout.addWidget(self.lbl_url) + right_layout.addWidget(QLabel("Notes")) + right_layout.addWidget(self.txt_notes, 1) + + act_bar = QHBoxLayout() + self.btn_save = QPushButton("Save") + self.btn_copy_user = QPushButton("Copy User") + self.btn_copy_pass = QPushButton("Copy Pass") + act_bar.addWidget(self.btn_save) + act_bar.addWidget(self.btn_copy_user) + act_bar.addWidget(self.btn_copy_pass) + right_layout.addLayout(act_bar) + + # Wire actions + self.btn_add.clicked.connect(self.on_add) + self.btn_edit.clicked.connect(self.on_edit) + self.btn_rotate.clicked.connect(self.on_rotate) + self.btn_delete.clicked.connect(self.on_delete) + self.btn_export.clicked.connect(self.on_export) + self.btn_import.clicked.connect(self.on_import) + self.btn_save.clicked.connect(self.on_save) + self.btn_copy_user.clicked.connect(self.on_copy_user) + self.btn_copy_pass.clicked.connect(self.on_copy_pass) + + # Data + self.entries: List[PasswordEntry] = [] + self.current: Optional[PasswordEntry] = None + self.refresh_list() + + def refresh_list(self) -> None: + query = self.search_input.text().lower().strip() + try: + self.entries = self.pm.get_entries() + except Exception as e: + QMessageBox.critical(self, "Error", f"Failed to load entries: {e}") + self.entries = [] + self.list_widget.clear() + for e in self.entries: + if query and not ( + query in (e.website or '').lower() + or query in (e.username or '').lower() + or query in (e.notes or '').lower() + or any(query in t.lower() for t in (e.tags or [])) + ): + continue + item = QListWidgetItem(f"{e.website} — {e.username}") + item.setData(Qt.ItemDataRole.UserRole, e.id) + self.list_widget.addItem(item) + + def on_selection_changed(self) -> None: + item = self.list_widget.currentItem() + if not item: + self.current = None + return + entry_id = item.data(Qt.ItemDataRole.UserRole) + try: + # get_entry may accept id or website; ensure id fetch + entry = self.pm.get_entry(entry_id) + except Exception: + # fallback: find in self.entries + entry = next((x for x in self.entries if x.id == entry_id), None) + self.current = entry + if entry: + self.lbl_website.setText(entry.website) + self.lbl_username.setText(entry.username) + self.lbl_password.setText(entry.password) + self.lbl_url.setText(entry.url) + self.txt_notes.setText(entry.notes) + + def on_add(self) -> None: + website, ok = QInputDialog.getText(self, "Add Entry", "Website:") + if not ok or not website: + return + username, ok = QInputDialog.getText(self, "Add Entry", "Username:") + if not ok or not username: + return + # Generate or prompt for password + choice = QMessageBox.question(self, "Password", "Generate a strong password?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No) + if choice == QMessageBox.StandardButton.Yes: + password = self.pm.generate_password() + else: + password, ok = QInputDialog.getText(self, "Add Entry", "Password:") + if not ok or not password: + return + try: + entry = PasswordEntry( + id=self.pm.generate_id(), + website=website, + username=username, + password=password, + ) + self.pm.add_entry(entry) + self.refresh_list() + except Exception as e: + QMessageBox.critical(self, "Error", f"Failed to add entry: {e}") + + def on_edit(self) -> None: + if not self.current: + return + self.lbl_website.setFocus() + + def on_save(self) -> None: + if not self.current: + return + try: + self.current.website = self.lbl_website.text() + self.current.username = self.lbl_username.text() + self.current.password = self.lbl_password.text() + self.current.url = self.lbl_url.text() + self.current.notes = self.txt_notes.toPlainText() + self.pm.update_entry(self.current) + self.refresh_list() + except Exception as e: + QMessageBox.critical(self, "Error", f"Failed to save entry: {e}") + + def on_rotate(self) -> None: + if not self.current: + return + try: + new_password = self.pm.generate_password(length=24) + self.current.password = new_password + self.pm.update_entry(self.current) + self.lbl_password.setText(new_password) + QMessageBox.information(self, "Rotated", "Password rotated and saved.") + except Exception as e: + QMessageBox.critical(self, "Error", f"Failed to rotate: {e}") + + def on_delete(self) -> None: + if not self.current: + return + if QMessageBox.question(self, "Delete", f"Delete entry for {self.current.website}?") == QMessageBox.StandardButton.Yes: + try: + self.pm.delete_entry(self.current.id) + self.current = None + self.refresh_list() + except Exception as e: + QMessageBox.critical(self, "Error", f"Failed to delete: {e}") + + def on_copy_user(self) -> None: + if not self.current: + return + try: + import pyperclip + pyperclip.copy(self.current.username) + QMessageBox.information(self, "Copied", "Username copied to clipboard.") + except Exception as e: + QMessageBox.critical(self, "Error", f"Clipboard failed: {e}") + + def on_copy_pass(self) -> None: + if not self.current: + return + try: + import pyperclip + pyperclip.copy(self.current.password) + QMessageBox.information(self, "Copied", "Password copied to clipboard.") + except Exception as e: + QMessageBox.critical(self, "Error", f"Clipboard failed: {e}") + + +def run_app() -> None: + # Prompt for master password + app = QApplication(sys.argv) + from PyQt6.QtWidgets import QInputDialog + + pw, ok = QInputDialog.getText(None, "Cerberus", "Master password:") + if not ok or not pw: + return + try: + pm = PasswordManager(master_password=pw) + except VaultError as e: + QMessageBox.critical(None, "Error", f"Failed to unlock vault: {e}") + return + win = MainWindow(pm) + win.show() + sys.exit(app.exec()) diff --git a/src/cerberus/integrations/__init__.py b/src/cerberus/integrations/__init__.py new file mode 100644 index 0000000..2d81122 --- /dev/null +++ b/src/cerberus/integrations/__init__.py @@ -0,0 +1,111 @@ +"""Password manager integrations for Cerberus.""" + +from typing import Dict, Type, List, Optional, Any +from pathlib import Path +import importlib +import json + +from ..core.models import PasswordEntry + +class IntegrationError(Exception): + """Base exception for integration errors.""" + pass + +class BaseIntegration: + """Base class for password manager integrations.""" + + def __init__(self, **kwargs): + """Initialize the integration with any required parameters.""" + self.connected = False + + def connect(self, **kwargs) -> bool: + """Connect to the password manager. + + Returns: + bool: True if connection was successful + """ + raise NotImplementedError + + def disconnect(self): + """Disconnect from the password manager.""" + self.connected = False + + def list_entries(self) -> List[PasswordEntry]: + """List all password entries. + + Returns: + List of PasswordEntry objects + """ + raise NotImplementedError + + def export_entries(self, output_path: Path) -> bool: + """Export entries to a file. + + Args: + output_path: Path to save the exported data + + Returns: + bool: True if export was successful + """ + raise NotImplementedError + + def import_entries(self, input_path: Path) -> List[PasswordEntry]: + """Import entries from a file. + + Args: + input_path: Path to the file to import from + + Returns: + List of imported PasswordEntry objects + """ + raise NotImplementedError + +# Dictionary of available integrations +INTEGRATIONS: Dict[str, Type[BaseIntegration]] = {} + +def register_integration(name: str): + """Decorator to register an integration class.""" + def decorator(cls: Type[BaseIntegration]) -> Type[BaseIntegration]: + INTEGRATIONS[name.lower()] = cls + return cls + return decorator + +def get_integration(name: str, **kwargs) -> BaseIntegration: + """Get an instance of the specified integration. + + Args: + name: Name of the integration + **kwargs: Additional arguments to pass to the integration + + Returns: + An instance of the specified integration + + Raises: + IntegrationError: If the integration is not found + """ + name = name.lower() + if name not in INTEGRATIONS: + raise IntegrationError(f"Integration '{name}' not found") + + return INTEGRATIONS[name](**kwargs) + +def list_available_integrations() -> List[str]: + """List all available integrations. + + Returns: + List of integration names + """ + return list(INTEGRATIONS.keys()) + +# Import all integration modules to register them +# This will be populated by the individual integration modules +# that use the @register_integration decorator + +try: + from . import bitwarden # noqa + from . import lastpass # noqa + from . import keepass # noqa + from . import chrome # noqa +except ImportError as e: + # Some integrations may have additional dependencies + pass diff --git a/src/cerberus/integrations/__pycache__/__init__.cpython-313.pyc b/src/cerberus/integrations/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..fbceef5 Binary files /dev/null and b/src/cerberus/integrations/__pycache__/__init__.cpython-313.pyc differ diff --git a/src/cerberus/integrations/__pycache__/bitwarden.cpython-313.pyc b/src/cerberus/integrations/__pycache__/bitwarden.cpython-313.pyc new file mode 100644 index 0000000..cb25ecf Binary files /dev/null and b/src/cerberus/integrations/__pycache__/bitwarden.cpython-313.pyc differ diff --git a/src/cerberus/integrations/__pycache__/chrome.cpython-313.pyc b/src/cerberus/integrations/__pycache__/chrome.cpython-313.pyc new file mode 100644 index 0000000..02a7447 Binary files /dev/null and b/src/cerberus/integrations/__pycache__/chrome.cpython-313.pyc differ diff --git a/src/cerberus/integrations/__pycache__/keepass.cpython-313.pyc b/src/cerberus/integrations/__pycache__/keepass.cpython-313.pyc new file mode 100644 index 0000000..ca99f53 Binary files /dev/null and b/src/cerberus/integrations/__pycache__/keepass.cpython-313.pyc differ diff --git a/src/cerberus/integrations/__pycache__/lastpass.cpython-313.pyc b/src/cerberus/integrations/__pycache__/lastpass.cpython-313.pyc new file mode 100644 index 0000000..fac83f5 Binary files /dev/null and b/src/cerberus/integrations/__pycache__/lastpass.cpython-313.pyc differ diff --git a/src/cerberus/integrations/bitwarden.py b/src/cerberus/integrations/bitwarden.py new file mode 100644 index 0000000..7a4cd73 --- /dev/null +++ b/src/cerberus/integrations/bitwarden.py @@ -0,0 +1,268 @@ +import subprocess +import json +import logging +from typing import List, Dict, Optional, Any +from pathlib import Path +import os + +from ..core.models import PasswordEntry + +logger = logging.getLogger(__name__) + +class BitwardenCLIError(Exception): + """Exception raised for errors in the Bitwarden CLI.""" + pass + +class BitwardenIntegration: + """Integration with Bitwarden password manager.""" + + def __init__(self, email: str = None, password: str = None, session: str = None): + """Initialize the Bitwarden integration. + + Args: + email: Bitwarden account email + password: Bitwarden master password + session: Existing Bitwarden session key + """ + self.email = email + self.password = password + self.session = session + self.bw_path = self._find_bw() + + def _run_command(self, command: List[str], input_data: str = None) -> Dict: + """Run a Bitwarden CLI command and return the result.""" + try: + env = os.environ.copy() + if self.session: + env['BW_SESSION'] = self.session + + result = subprocess.run( + [self.bw_path] + command, + input=input_data.encode() if input_data else None, + capture_output=True, + check=True, + env=env + ) + + if result.stdout: + try: + return json.loads(result.stdout) + except json.JSONDecodeError: + return result.stdout.decode().strip() + return {} + + except subprocess.CalledProcessError as e: + error_msg = e.stderr.decode().strip() if e.stderr else str(e) + logger.error(f"Bitwarden CLI error: {error_msg}") + raise BitwardenCLIError(f"Bitwarden command failed: {error_msg}") + + @staticmethod + def _find_bw() -> str: + """Find the Bitwarden CLI executable.""" + # Check common locations + possible_paths = [ + '/usr/local/bin/bw', + '/usr/bin/bw', + 'bw' # Try PATH + ] + + for path in possible_paths: + try: + result = subprocess.run( + [path, '--version'], + capture_output=True, + text=True + ) + if result.returncode == 0: + logger.info(f"Found Bitwarden CLI at {path}") + return path + except (FileNotFoundError, subprocess.CalledProcessError): + continue + + raise BitwardenCLIError( + "Bitwarden CLI not found. Please install it from " + "https://bitwarden.com/help/cli/" + ) + + def login(self) -> bool: + """Log in to Bitwarden and get a session key.""" + if not self.email or not self.password: + raise BitwardenCLIError("Email and password are required for login") + + try: + # Log in and get the session key + result = subprocess.run( + [self.bw_path, 'login', self.email, self.password, '--raw'], + capture_output=True, + check=True, + text=True + ) + + self.session = result.stdout.strip() + return True + + except subprocess.CalledProcessError as e: + error_msg = e.stderr.decode().strip() if e.stderr else str(e) + logger.error(f"Bitwarden login failed: {error_msg}") + return False + + def logout(self) -> bool: + """Log out of Bitwarden.""" + try: + self._run_command(['logout']) + self.session = None + return True + except BitwardenCLIError: + return False + + def sync(self) -> bool: + """Sync with Bitwarden server.""" + try: + self._run_command(['sync']) + return True + except BitwardenCLIError: + return False + + def export_vault(self, output_file: str, format: str = 'encrypted_json') -> bool: + """Export the Bitwarden vault. + + Args: + output_file: Path to save the exported file + format: Export format ('encrypted_json', 'json', 'csv', 'encrypted_json') + + Returns: + bool: True if export was successful + """ + try: + result = self._run_command(['export', '--format', format, '--output', output_file]) + return True + except BitwardenCLIError: + return False + + def get_items(self, search: str = None) -> List[Dict]: + """Get items from the vault, optionally filtered by search term.""" + try: + if search: + return self._run_command(['list', 'items', '--search', search]) + return self._run_command(['list', 'items']) + except BitwardenCLIError: + return [] + + def get_item(self, item_id: str) -> Optional[Dict]: + """Get a specific item by ID.""" + try: + return self._run_command(['get', 'item', item_id]) + except BitwardenCLIError: + return None + + def create_item(self, item_data: Dict) -> Optional[Dict]: + """Create a new item in the vault.""" + try: + return self._run_command( + ['create', 'item'], + input_data=json.dumps(item_data) + ) + except BitwardenCLIError: + return None + + def update_item(self, item_id: str, item_data: Dict) -> Optional[Dict]: + """Update an existing item.""" + try: + return self._run_command( + ['edit', 'item', item_id], + input_data=json.dumps(item_data) + ) + except BitwardenCLIError: + return None + + def delete_item(self, item_id: str) -> bool: + """Delete an item from the vault.""" + try: + self._run_command(['delete', 'item', item_id]) + return True + except BitwardenCLIError: + return False + + def import_from_bitwarden(self) -> List[PasswordEntry]: + """Import passwords from Bitwarden to Cerberus format.""" + try: + items = self.get_items() + entries = [] + + for item in items: + try: + entry = PasswordEntry( + id=item.get('id'), + website=item.get('name', ''), + username=next( + (field['value'] for field in item.get('login', {}).get('uris', [{}]) + if field.get('name', '').lower() == 'username'), + '' + ), + password=item.get('login', {}).get('password', ''), + url=next( + (uri.get('uri', '') for uri in item.get('login', {}).get('uris', []) + if uri.get('uri')), + '' + ), + notes=item.get('notes', ''), + tags=item.get('collectionIds', []), + custom_fields={ + 'folderId': item.get('folderId'), + 'organizationId': item.get('organizationId'), + 'favorite': item.get('favorite', False), + 'reprompt': item.get('reprompt', 0), + 'revisionDate': item.get('revisionDate') + } + ) + entries.append(entry) + except Exception as e: + logger.error(f"Error converting Bitwarden item to PasswordEntry: {e}") + + return entries + + except Exception as e: + logger.error(f"Error importing from Bitwarden: {e}") + return [] + + def export_to_bitwarden(self, entries: List[PasswordEntry], folder_id: str = None) -> List[str]: + """Export passwords from Cerberus format to Bitwarden. + + Args: + entries: List of PasswordEntry objects to export + folder_id: Optional Bitwarden folder ID to place items in + + Returns: + List of Bitwarden item IDs that were created + """ + created_ids = [] + + for entry in entries: + try: + item_data = { + 'type': 1, # Login type + 'name': entry.website, + 'notes': entry.notes, + 'favorite': entry.custom_fields.get('favorite', False), + 'login': { + 'username': entry.username, + 'password': entry.password, + 'uris': [ + { + 'match': None, + 'uri': entry.url + } + ] if entry.url else [] + }, + 'collectionIds': entry.tags, + 'folderId': folder_id or entry.custom_fields.get('folderId') + } + + result = self.create_item(item_data) + if result and 'id' in result: + created_ids.append(result['id']) + + except Exception as e: + logger.error(f"Error exporting entry to Bitwarden: {e}") + + return created_ids diff --git a/src/cerberus/integrations/chrome.py b/src/cerberus/integrations/chrome.py new file mode 100644 index 0000000..ddc6acc --- /dev/null +++ b/src/cerberus/integrations/chrome.py @@ -0,0 +1,135 @@ +"""Chrome/Chromium password export integration for Cerberus.""" + +import csv +import json +import sqlite3 +import shutil +import tempfile +from pathlib import Path +from typing import List, Optional, Dict, Any + +from ..core.models import PasswordEntry +from . import BaseIntegration, register_integration, IntegrationError + +class ChromeIntegration(BaseIntegration): + """Integration with Chrome/Chromium password exports.""" + + def __init__(self, export_path: Optional[Path] = None): + """Initialize the Chrome integration. + + Args: + export_path: Path to Chrome passwords CSV export + """ + super().__init__() + self.export_path = export_path + + def connect(self, export_path: Optional[Path] = None, **kwargs) -> bool: + """Load the Chrome passwords export file. + + Args: + export_path: Path to the Chrome passwords CSV export + + Returns: + bool: True if the export file exists and is accessible + """ + if export_path: + self.export_path = Path(export_path) + + if not self.export_path or not self.export_path.exists(): + raise IntegrationError("Chrome passwords export file not found") + + self.connected = True + return True + + def list_entries(self) -> List[PasswordEntry]: + """List all password entries from the Chrome export. + + Returns: + List of PasswordEntry objects + + Raises: + IntegrationError: If not connected or error reading the export file + """ + if not self.connected: + raise IntegrationError("Not connected to Chrome passwords") + + entries: List[PasswordEntry] = [] + + try: + with open(self.export_path, 'r', encoding='utf-8') as f: + # Chrome CSV format: name,url,username,password + reader = csv.reader(f) + + # Skip header if it exists + header = next(reader, None) + if not header or len(header) < 4: + # Try without skipping header + f.seek(0) + + for row in reader: + if len(row) < 4: # Ensure we have enough columns + continue + + name, url, username, password = row[:4] + + # Create a PasswordEntry + entry = PasswordEntry( + website=name or url or "Unknown", + username=username, + password=password, + url=url + ) + + entries.append(entry) + + except Exception as e: + raise IntegrationError(f"Error reading Chrome passwords export: {e}") + + return entries + + def import_entries(self, input_path: Optional[Path] = None) -> List[PasswordEntry]: + """Import entries from a Chrome passwords export. + + Args: + input_path: Path to the Chrome passwords export file + + Returns: + List of imported PasswordEntry objects + """ + if input_path: + self.export_path = Path(input_path) + + if not self.export_path or not self.export_path.exists(): + raise IntegrationError("Chrome passwords export file not found") + + return self.list_entries() + + def export_entries(self, output_path: Path) -> bool: + """Export entries to a Chrome-compatible CSV file. + + Args: + output_path: Path to save the exported data + + Returns: + bool: True if export was successful + """ + # This would require converting from our format to Chrome's format + # For now, we'll just raise a NotImplementedError + raise NotImplementedError("Export to Chrome format is not yet implemented") + + @classmethod + def export_help(cls) -> str: + """Get instructions for exporting from Chrome. + + Returns: + str: Instructions for exporting from Chrome + """ + return """To export passwords from Chrome: + 1. Open Chrome and go to: chrome://settings/passwords + 2. Click the three dots menu next to 'Saved Passwords' + 3. Select 'Export passwords...' + 4. Follow the prompts to save the passwords to a CSV file + """ + +# Register the integration with the name 'chrome' +register_integration("chrome")(ChromeIntegration) diff --git a/src/cerberus/integrations/keepass.py b/src/cerberus/integrations/keepass.py new file mode 100644 index 0000000..f445826 --- /dev/null +++ b/src/cerberus/integrations/keepass.py @@ -0,0 +1,208 @@ +"""KeePass integration for Cerberus.""" + +import xml.etree.ElementTree as ET +from pathlib import Path +from typing import List, Optional, Dict, Any + +from ..core.models import PasswordEntry +from . import BaseIntegration, register_integration, IntegrationError + +try: + import pykeepass + KEEPASS_AVAILABLE = True +except ImportError: + KEEPASS_AVAILABLE = False + +@register_integration("keepass") +class KeePassIntegration(BaseIntegration): + """Integration with KeePass password manager.""" + + def __init__(self, database_path: Optional[Path] = None, keyfile: Optional[Path] = None): + """Initialize the KeePass integration. + + Args: + database_path: Path to the KeePass database file (.kdbx) + keyfile: Path to the keyfile (if used) + """ + super().__init__() + if not KEEPASS_AVAILABLE: + raise IntegrationError( + "pykeepass package is required for KeePass integration. " + "Install with: pip install pykeepass" + ) + + self.database_path = database_path + self.keyfile = keyfile + self.kp = None + + def connect(self, password: str, database_path: Optional[Path] = None, + keyfile: Optional[Path] = None, **kwargs) -> bool: + """Connect to a KeePass database. + + Args: + password: Database password + database_path: Path to the KeePass database file + keyfile: Path to the keyfile (if used) + + Returns: + bool: True if connection was successful + """ + if database_path: + self.database_path = Path(database_path) + if keyfile: + self.keyfile = Path(keyfile) + + if not self.database_path or not self.database_path.exists(): + raise IntegrationError("KeePass database file not found") + + try: + self.kp = pykeepass.PyKeePass( + self.database_path, + password=password, + keyfile=str(self.keyfile) if self.keyfile and self.keyfile.exists() else None + ) + self.connected = True + return True + except Exception as e: + raise IntegrationError(f"Failed to open KeePass database: {e}") + + def disconnect(self): + """Close the KeePass database.""" + self.kp = None + self.connected = False + + def list_entries(self) -> List[PasswordEntry]: + """List all password entries from the KeePass database. + + Returns: + List of PasswordEntry objects + + Raises: + IntegrationError: If not connected or error reading the database + """ + if not self.connected or not self.kp: + raise IntegrationError("Not connected to KeePass database") + + entries: List[PasswordEntry] = [] + + try: + for entry in self.kp.entries: + # Skip entries without URLs or usernames + if not (entry.url or entry.title) or not entry.username: + continue + + # Get entry notes and custom fields + notes = entry.notes or "" + custom_fields = {} + + # Add any custom fields + for key, value in entry.custom_properties.items(): + if key and value and key.lower() not in ['notes', 'password']: + custom_fields[key] = value + + # Create a PasswordEntry + entry_obj = PasswordEntry( + website=entry.url or entry.title, + username=entry.username, + password=entry.password, + notes=notes, + url=entry.url, + tags=[entry.group.name] if entry.group else [], + custom_fields=custom_fields if custom_fields else None + ) + + entries.append(entry_obj) + + except Exception as e: + raise IntegrationError(f"Error reading KeePass database: {e}") + + return entries + + def import_entries(self, input_path: Optional[Path] = None, password: str = None, + keyfile: Optional[Path] = None) -> List[PasswordEntry]: + """Import entries from a KeePass database. + + Args: + input_path: Path to the KeePass database file + password: Database password + keyfile: Path to the keyfile (if used) + + Returns: + List of imported PasswordEntry objects + """ + if input_path: + self.database_path = Path(input_path) + if keyfile: + self.keyfile = Path(keyfile) + + if not password: + raise IntegrationError("Password is required to open KeePass database") + + self.connect(password=password, database_path=self.database_path, keyfile=self.keyfile) + return self.list_entries() + + def export_entries(self, output_path: Path, entries: List[PasswordEntry] = None) -> bool: + """Export entries to a new KeePass database. + + Args: + output_path: Path to save the new KeePass database + entries: List of PasswordEntry objects to export + + Returns: + bool: True if export was successful + """ + if not self.connected or not self.kp: + raise IntegrationError("Not connected to KeePass database") + + try: + # Create a new KeePass database + new_kp = pykeepass.create_database(str(output_path)) + + # Add a group for the imported entries + imported_group = new_kp.add_group(new_kp.root_group, 'Imported') + + # Add each entry to the database + for entry in (entries or self.list_entries()): + # Skip entries without URLs or usernames + if not (entry.website or entry.url) or not entry.username: + continue + + # Add the entry to the database + new_entry = new_kp.add_entry( + imported_group, + title=entry.website or entry.url, + username=entry.username, + password=entry.password, + url=entry.url or entry.website, + notes=entry.notes, + tags=','.join(entry.tags) if entry.tags else None + ) + + # Add custom fields + if entry.custom_fields: + for key, value in entry.custom_fields.items(): + if key and value and key.lower() not in ['notes', 'password']: + new_entry.set_custom_property(key, str(value)) + + # Save the new database + new_kp.save() + return True + + except Exception as e: + raise IntegrationError(f"Error exporting to KeePass database: {e}") + + @classmethod + def export_help(cls) -> str: + """Get instructions for exporting from KeePass. + + Returns: + str: Instructions for exporting from KeePass + """ + return """To export from KeePass: + 1. Open your KeePass database + 2. Go to 'File' > 'Export' + 3. Choose a format (e.g., XML) + 4. Save the exported file + + Note: For better security, use the direct database import method. + """ diff --git a/src/cerberus/integrations/lastpass.py b/src/cerberus/integrations/lastpass.py new file mode 100644 index 0000000..9c587d2 --- /dev/null +++ b/src/cerberus/integrations/lastpass.py @@ -0,0 +1,126 @@ +"""LastPass integration for Cerberus.""" + +import csv +from pathlib import Path +from typing import List, Optional, Dict, Any + +from ..core.models import PasswordEntry +from . import BaseIntegration, register_integration, IntegrationError + +@register_integration("lastpass") +class LastPassIntegration(BaseIntegration): + """Integration with LastPass password manager.""" + + def __init__(self, export_path: Optional[Path] = None): + """Initialize the LastPass integration. + + Args: + export_path: Path to LastPass export file + """ + super().__init__() + self.export_path = export_path + + def connect(self, export_path: Optional[Path] = None, **kwargs) -> bool: + """Connect to LastPass (loads the export file). + + Args: + export_path: Path to LastPass export file + + Returns: + bool: True if the export file exists and is accessible + """ + if export_path: + self.export_path = Path(export_path) + + if not self.export_path or not self.export_path.exists(): + raise IntegrationError("LastPass export file not found") + + self.connected = True + return True + + def list_entries(self) -> List[PasswordEntry]: + """List all password entries from the LastPass export. + + Returns: + List of PasswordEntry objects + + Raises: + IntegrationError: If not connected or error reading the export file + """ + if not self.connected: + raise IntegrationError("Not connected to LastPass") + + entries: List[PasswordEntry] = [] + + try: + with open(self.export_path, 'r', encoding='utf-8') as f: + # Skip the first line (header) + next(f) + + reader = csv.reader(f) + for row in reader: + if len(row) < 7: # Ensure we have enough columns + continue + + url, username, password, extra, name, grouping, fav = row[:7] + + # Create a PasswordEntry + entry = PasswordEntry( + website=url or name or "Unknown", + username=username, + password=password, + notes=extra, + tags=[grouping] if grouping else [] + ) + + entries.append(entry) + + except Exception as e: + raise IntegrationError(f"Error reading LastPass export: {e}") + + return entries + + def import_entries(self, input_path: Optional[Path] = None) -> List[PasswordEntry]: + """Import entries from a LastPass export file. + + Args: + input_path: Path to the LastPass export file + + Returns: + List of imported PasswordEntry objects + """ + if input_path: + self.export_path = Path(input_path) + + if not self.export_path or not self.export_path.exists(): + raise IntegrationError("LastPass export file not found") + + return self.list_entries() + + def export_entries(self, output_path: Path) -> bool: + """Export entries to a LastPass-compatible CSV file. + + Args: + output_path: Path to save the exported data + + Returns: + bool: True if export was successful + """ + # This would require converting from our format to LastPass format + # For now, we'll just raise a NotImplementedError + raise NotImplementedError("Export to LastPass format is not yet implemented") + + @classmethod + def export_help(cls) -> str: + """Get instructions for exporting from LastPass. + + Returns: + str: Instructions for exporting from LastPass + """ + return """To export from LastPass: + 1. Log in to your LastPass account + 2. Click on your email in the bottom left + 3. Select 'Advanced' > 'Export' + 4. Enter your master password + 5. Save the exported file + """ diff --git a/src/cerberus/native/host.py b/src/cerberus/native/host.py new file mode 100644 index 0000000..75459b1 --- /dev/null +++ b/src/cerberus/native/host.py @@ -0,0 +1,138 @@ +""" +Native Messaging host for Cerberus Password Manager (development scaffold). + +WARNING: Development-only. For testing with the Firefox/Chrome extension. +Authentication: expects master password via CERB_MASTER environment variable. + +Protocol: newline-free JSON messages with 4-byte little-endian length prefix. +Messages: + {"type":"ping"} + {"type":"list_entries"} + {"type":"get_for_origin", "origin":"https://example.com", "include_password": false} + +Responses: + {"ok": true, "result": ...} or {"ok": false, "error": "..."} +""" +from __future__ import annotations + +import json +import os +import struct +import sys +from typing import Any, Dict, List +from urllib.parse import urlparse + +from cerberus.core.password_manager import PasswordManager, VaultError + + +def _read_msg() -> Dict[str, Any] | None: + raw_len = sys.stdin.buffer.read(4) + if not raw_len: + return None + msg_len = struct.unpack(" None: + data = json.dumps(obj, default=str, separators=(",", ":")).encode("utf-8") + sys.stdout.buffer.write(struct.pack(" str: + try: + u = urlparse(origin) + return u.hostname or origin + except Exception: + return origin + + +def main() -> None: + master = os.environ.get("CERB_MASTER") + data_dir = os.environ.get("CERB_DATA_DIR") + pm: PasswordManager | None = None + + if master: + try: + pm = PasswordManager(data_dir=data_dir, master_password=master) + except VaultError as e: + _write_msg({"ok": False, "error": f"unlock_failed:{e}"}) + return + else: + # Defer unlock until first request arrives + pm = None + + allow_remote_unlock = os.environ.get("CERB_ALLOW_REMOTE_UNLOCK") == "1" + + while True: + msg = _read_msg() + if msg is None: + break + try: + mtype = msg.get("type") + if mtype == "ping": + _write_msg({"ok": True, "result": "pong"}) + continue + if mtype == "unlock": + if not allow_remote_unlock: + _write_msg({"ok": False, "error": "remote_unlock_disabled"}) + else: + master = msg.get("master") + if not master: + _write_msg({"ok": False, "error": "missing_master"}) + continue + try: + pm = PasswordManager(data_dir=data_dir, master_password=master) + _write_msg({"ok": True, "result": "unlocked"}) + except VaultError as e: + _write_msg({"ok": False, "error": f"unlock_failed:{e}"}) + continue + if pm is None: + _write_msg({"ok": False, "error": "locked"}) + continue + if mtype == "list_entries": + entries = pm.get_entries() + slim = [ + { + "id": e.id, + "website": e.website, + "username": e.username, + "url": e.url, + } + for e in entries + ] + _write_msg({"ok": True, "result": slim}) + continue + if mtype == "get_for_origin": + origin = msg.get("origin") or "" + include_password = bool(msg.get("include_password", False)) + host = _origin_host(origin).lower() + matches = [] + for e in pm.get_entries(): + target = (e.url or e.website or "").lower() + if host and host in target: + item = { + "id": e.id, + "website": e.website, + "username": e.username, + "url": e.url, + } + if include_password: + item["password"] = e.password + matches.append(item) + _write_msg({"ok": True, "result": matches}) + continue + _write_msg({"ok": False, "error": f"unknown_type:{mtype}"}) + except Exception as e: + _write_msg({"ok": False, "error": f"exception:{e}"}) + + +if __name__ == "__main__": + main() diff --git a/src/cerberus/native/manifests/chrome_com.cerberus.pm.json b/src/cerberus/native/manifests/chrome_com.cerberus.pm.json new file mode 100644 index 0000000..7eec080 --- /dev/null +++ b/src/cerberus/native/manifests/chrome_com.cerberus.pm.json @@ -0,0 +1,9 @@ +{ + "name": "com.cerberus.pm", + "description": "Cerberus Password Manager Native Messaging Host (dev)", + "path": "/usr/local/bin/cerberus-native-host", + "type": "stdio", + "allowed_origins": [ + "chrome-extension://REPLACE_WITH_EXTENSION_ID/" + ] +} diff --git a/src/cerberus/native/manifests/firefox_com.cerberus.pm.json b/src/cerberus/native/manifests/firefox_com.cerberus.pm.json new file mode 100644 index 0000000..fda6d6e --- /dev/null +++ b/src/cerberus/native/manifests/firefox_com.cerberus.pm.json @@ -0,0 +1,9 @@ +{ + "name": "com.cerberus.pm", + "description": "Cerberus Password Manager Native Messaging Host (dev)", + "path": "/usr/local/bin/cerberus-native-host", + "type": "stdio", + "allowed_extensions": [ + "cerberus@example.com" + ] +} diff --git a/src/cerberus/tui/__init__.py b/src/cerberus/tui/__init__.py new file mode 100644 index 0000000..75461d6 --- /dev/null +++ b/src/cerberus/tui/__init__.py @@ -0,0 +1,16 @@ +""" +Terminal User Interface (TUI) for Cerberus Password Manager. + +This module provides a rich, interactive terminal interface for managing passwords. +""" + +__all__ = ["main"] + +def main(): + """Launch the Cerberus TUI.""" + from .app import CerberusTUI + app = CerberusTUI() + app.run() + +if __name__ == "__main__": + main() diff --git a/src/cerberus/tui/__pycache__/__init__.cpython-313.pyc b/src/cerberus/tui/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..bb0f474 Binary files /dev/null and b/src/cerberus/tui/__pycache__/__init__.cpython-313.pyc differ diff --git a/src/cerberus/tui/app.py b/src/cerberus/tui/app.py new file mode 100644 index 0000000..6e3d513 --- /dev/null +++ b/src/cerberus/tui/app.py @@ -0,0 +1,243 @@ +"" +Main TUI application for Cerberus Password Manager. +""" +from typing import Optional, List, Dict, Any +from pathlib import Path +import logging + +from rich.console import Console +from rich.panel import Panel +from rich.table import Table +from rich.prompt import Prompt, Confirm +from rich.progress import Progress, SpinnerColumn, TextColumn +from rich.traceback import install as install_rich_traceback +from textual.app import App, ComposeResult +from textual.containers import Container, VerticalScroll +from textual.widgets import ( + Header, Footer, Button, Static, Input, Label, + DataTable, Select, Switch, LoadingIndicator +) + +from ..core.password_manager import PasswordManager, VaultError +from ..core.models import PasswordEntry + +# Set up rich traceback for better error messages +install_rich_traceback(show_locals=True) +logger = logging.getLogger(__name__) + +class CerberusTUI(App): + """Main TUI application for Cerberus Password Manager.""" + + CSS = """ + Screen { + layout: vertical; + } + + #login-screen { + width: 100%; + height: 100%; + align: center middle; + } + + #main-screen { + width: 100%; + height: 100%; + layout: horizontal; + } + + #sidebar { + width: 30%; + height: 100%; + border: solid $accent; + } + + #content { + width: 70%; + height: 100%; + border: solid $accent; + } + + .entry-list { + width: 100%; + height: 100%; + } + + .entry-detail { + width: 100%; + height: 100%; + padding: 1; + } + """ + + BINDINGS = [ + ("q", "quit", "Quit"), + ("n", "new_password", "New Password"), + ("f", "find", "Find Password"), + ("r", "refresh", "Refresh"), + ("c", "copy_password", "Copy Password"), + ("u", "copy_username", "Copy Username"), + ("d", "delete", "Delete Entry"), + ] + + def __init__(self, data_dir: Optional[str] = None, **kwargs): + super().__init__(**kwargs) + self.console = Console() + self.pm: Optional[PasswordManager] = None + self.data_dir = data_dir + self.current_entry: Optional[PasswordEntry] = None + self.entries: List[PasswordEntry] = [] + + def compose(self) -> ComposeResult: + """Create child widgets for the app.""" + if not self.pm: + yield Container( + VerticalScroll( + Label("🔒 Cerberus Password Manager", id="login-title"), + Input(placeholder="Master Password", password=True, id="master-password"), + Button("Unlock", variant="primary", id="unlock-button"), + id="login-screen" + ) + ) + else: + with Container(id="main-screen"): + with Container(id="sidebar"): + yield DataTable(id="entry-list", cursor_type="row") + with Container(id="content"): + yield Static("Select an entry to view details", id="entry-detail") + + def on_mount(self) -> None: + """Initialize the UI after mounting.""" + if not self.pm: + self.query_one("#master-password", Input).focus() + else: + self.load_entries() + + def on_button_pressed(self, event: Button.Pressed) -> None: + """Handle button press events.""" + if event.button.id == "unlock-button": + self.unlock_vault() + + def on_input_submitted(self, event: Input.Submitted) -> None: + """Handle input submission.""" + if event.input.id == "master-password": + self.unlock_vault() + + def unlock_vault(self) -> None: + """Attempt to unlock the password vault.""" + password_input = self.query_one("#master-password", Input) + password = password_input.value + + if not password: + self.notify("Please enter a master password", severity="error") + return + + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + transient=True, + console=self.console + ) as progress: + task = progress.add_task("Unlocking vault...", total=None) + self.pm = PasswordManager(data_dir=self.data_dir, master_password=password) + progress.update(task, completed=1) + + # Clear the login screen and show the main interface + self.query("#login-screen").remove() + self.compose() + self.load_entries() + + except VaultError as e: + self.notify(f"Failed to unlock vault: {e}", severity="error") + except Exception as e: + logger.exception("Error unlocking vault") + self.notify(f"An error occurred: {e}", severity="error") + + def load_entries(self) -> None: + """Load password entries into the UI.""" + if not self.pm: + return + + try: + table = self.query_one("#entry-list", DataTable) + table.clear() + table.add_columns("Website", "Username", "Last Used") + + self.entries = self.pm.get_entries() + for entry in self.entries: + table.add_row( + entry.website, + entry.username, + entry.last_used.strftime("%Y-%m-%d") if entry.last_used else "Never" + ) + + except Exception as e: + logger.exception("Error loading entries") + self.notify(f"Failed to load entries: {e}", severity="error") + + def action_new_password(self) -> None: + """Create a new password entry.""" + self.notify("New password functionality coming soon!", severity="information") + + def action_find(self) -> None: + """Find a password entry.""" + self.notify("Find functionality coming soon!", severity="information") + + def action_refresh(self) -> None: + """Refresh the entry list.""" + self.load_entries() + self.notify("Entries refreshed", severity="information") + + def action_copy_password(self) -> None: + """Copy the current entry's password to clipboard.""" + if not self.current_entry: + self.notify("No entry selected", severity="warning") + return + + try: + # Use platform-specific clipboard handling + import pyperclip + pyperclip.copy(self.current_entry.password) + self.notify("Password copied to clipboard", severity="information") + except Exception as e: + logger.exception("Error copying to clipboard") + self.notify(f"Failed to copy to clipboard: {e}", severity="error") + + def action_copy_username(self) -> None: + """Copy the current entry's username to clipboard.""" + if not self.current_entry: + self.notify("No entry selected", severity="warning") + return + + try: + import pyperclip + pyperclip.copy(self.current_entry.username) + self.notify("Username copied to clipboard", severity="information") + except Exception as e: + logger.exception("Error copying username") + self.notify(f"Failed to copy username: {e}", severity="error") + + def action_delete(self) -> None: + """Delete the current entry.""" + if not self.current_entry: + self.notify("No entry selected", severity="warning") + return + + if Confirm.ask(f"Delete entry for {self.current_entry.website}?"): + try: + self.pm.delete_entry(self.current_entry.id) + self.load_entries() + self.notify("Entry deleted", severity="information") + self.current_entry = None + self.query_one("#entry-detail", Static).update("Select an entry to view details") + except Exception as e: + logger.exception("Error deleting entry") + self.notify(f"Failed to delete entry: {e}", severity="error") + +def main(): + """Run the Cerberus TUI.""" + app = CerberusTUI() + app.run() + +if __name__ == "__main__": + main() -- cgit v1.2.3