aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cerberus/__init__.py11
-rw-r--r--src/cerberus/__pycache__/__init__.cpython-313.pycbin0 -> 533 bytes
-rw-r--r--src/cerberus/automation/__init__.py19
-rw-r--r--src/cerberus/automation/__pycache__/__init__.cpython-313.pycbin0 -> 635 bytes
-rw-r--r--src/cerberus/automation/__pycache__/discovery.cpython-313.pycbin0 -> 12850 bytes
-rw-r--r--src/cerberus/automation/__pycache__/engine.cpython-313.pycbin0 -> 1864 bytes
-rw-r--r--src/cerberus/automation/__pycache__/playwright_engine.cpython-313.pycbin0 -> 4284 bytes
-rw-r--r--src/cerberus/automation/__pycache__/policy.cpython-313.pycbin0 -> 1146 bytes
-rw-r--r--src/cerberus/automation/__pycache__/runner.cpython-313.pycbin0 -> 3943 bytes
-rw-r--r--src/cerberus/automation/__pycache__/selenium_engine.cpython-313.pycbin0 -> 4337 bytes
-rw-r--r--src/cerberus/automation/__pycache__/types.cpython-313.pycbin0 -> 1243 bytes
-rw-r--r--src/cerberus/automation/discovery.py252
-rw-r--r--src/cerberus/automation/engine.py26
-rw-r--r--src/cerberus/automation/playwright_engine.py60
-rw-r--r--src/cerberus/automation/policy.py25
-rw-r--r--src/cerberus/automation/runner.py59
-rw-r--r--src/cerberus/automation/selenium_engine.py64
-rw-r--r--src/cerberus/automation/sites/__pycache__/apple.cpython-313.pycbin0 -> 4023 bytes
-rw-r--r--src/cerberus/automation/sites/__pycache__/base_site.cpython-313.pycbin0 -> 1073 bytes
-rw-r--r--src/cerberus/automation/sites/__pycache__/facebook.cpython-313.pycbin0 -> 3733 bytes
-rw-r--r--src/cerberus/automation/sites/__pycache__/github.cpython-313.pycbin0 -> 3278 bytes
-rw-r--r--src/cerberus/automation/sites/__pycache__/google.cpython-313.pycbin0 -> 3377 bytes
-rw-r--r--src/cerberus/automation/sites/__pycache__/linkedin.cpython-313.pycbin0 -> 3656 bytes
-rw-r--r--src/cerberus/automation/sites/__pycache__/microsoft.cpython-313.pycbin0 -> 3329 bytes
-rw-r--r--src/cerberus/automation/sites/__pycache__/twitter.cpython-313.pycbin0 -> 3425 bytes
-rw-r--r--src/cerberus/automation/sites/apple.py64
-rw-r--r--src/cerberus/automation/sites/base_site.py14
-rw-r--r--src/cerberus/automation/sites/facebook.py56
-rw-r--r--src/cerberus/automation/sites/github.py53
-rw-r--r--src/cerberus/automation/sites/google.py45
-rw-r--r--src/cerberus/automation/sites/linkedin.py56
-rw-r--r--src/cerberus/automation/sites/microsoft.py40
-rw-r--r--src/cerberus/automation/sites/twitter.py42
-rw-r--r--src/cerberus/automation/types.py22
-rw-r--r--src/cerberus/cli.py53
-rw-r--r--src/cerberus/cli/__init__.py305
-rw-r--r--src/cerberus/cli/__pycache__/__init__.cpython-313.pycbin0 -> 17653 bytes
-rw-r--r--src/cerberus/cli/__pycache__/main.cpython-313.pycbin0 -> 27863 bytes
-rw-r--r--src/cerberus/cli/main.py611
-rw-r--r--src/cerberus/core/Makefile31
-rw-r--r--src/cerberus/core/__init__.py127
-rw-r--r--src/cerberus/core/__pycache__/__init__.cpython-313.pycbin0 -> 4536 bytes
-rw-r--r--src/cerberus/core/__pycache__/models.cpython-313.pycbin0 -> 3340 bytes
-rw-r--r--src/cerberus/core/__pycache__/password_manager.cpython-313.pycbin0 -> 21450 bytes
-rw-r--r--src/cerberus/core/cerberus.c530
-rw-r--r--src/cerberus/core/cerberus.h144
-rwxr-xr-xsrc/cerberus/core/cerberus.sobin0 -> 30688 bytes
-rw-r--r--src/cerberus/core/models.py58
-rw-r--r--src/cerberus/core/password_manager.py462
-rw-r--r--src/cerberus/gui/__init__.py6
-rw-r--r--src/cerberus/gui/main_window.py269
-rw-r--r--src/cerberus/integrations/__init__.py111
-rw-r--r--src/cerberus/integrations/__pycache__/__init__.cpython-313.pycbin0 -> 4411 bytes
-rw-r--r--src/cerberus/integrations/__pycache__/bitwarden.cpython-313.pycbin0 -> 12752 bytes
-rw-r--r--src/cerberus/integrations/__pycache__/chrome.cpython-313.pycbin0 -> 5258 bytes
-rw-r--r--src/cerberus/integrations/__pycache__/keepass.cpython-313.pycbin0 -> 8901 bytes
-rw-r--r--src/cerberus/integrations/__pycache__/lastpass.cpython-313.pycbin0 -> 5034 bytes
-rw-r--r--src/cerberus/integrations/bitwarden.py268
-rw-r--r--src/cerberus/integrations/chrome.py135
-rw-r--r--src/cerberus/integrations/keepass.py208
-rw-r--r--src/cerberus/integrations/lastpass.py126
-rw-r--r--src/cerberus/native/host.py138
-rw-r--r--src/cerberus/native/manifests/chrome_com.cerberus.pm.json9
-rw-r--r--src/cerberus/native/manifests/firefox_com.cerberus.pm.json9
-rw-r--r--src/cerberus/tui/__init__.py16
-rw-r--r--src/cerberus/tui/__pycache__/__init__.cpython-313.pycbin0 -> 649 bytes
-rw-r--r--src/cerberus/tui/app.py243
67 files changed, 4767 insertions, 0 deletions
diff --git a/src/cerberus/__init__.py b/src/cerberus/__init__.py
new file mode 100644
index 0000000..2064933
--- /dev/null
+++ b/src/cerberus/__init__.py
@@ -0,0 +1,11 @@
+# Avoid importing heavy submodules at top-level to prevent side effects
+__all__ = ["PasswordManager", "PasswordEntry"]
+
+def __getattr__(name):
+ if name == "PasswordManager":
+ from .core.password_manager import PasswordManager
+ return PasswordManager
+ if name == "PasswordEntry":
+ from .core.models import PasswordEntry
+ return PasswordEntry
+ raise AttributeError(name)
diff --git a/src/cerberus/__pycache__/__init__.cpython-313.pyc b/src/cerberus/__pycache__/__init__.cpython-313.pyc
new file mode 100644
index 0000000..d06ce4e
--- /dev/null
+++ b/src/cerberus/__pycache__/__init__.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/__init__.py b/src/cerberus/automation/__init__.py
new file mode 100644
index 0000000..9defa53
--- /dev/null
+++ b/src/cerberus/automation/__init__.py
@@ -0,0 +1,19 @@
+"""Automation layer for password rotation via the web.
+
+This package provides abstractions and engines (Playwright/Selenium) to automate
+site-specific password change flows, along with runners and policy helpers.
+"""
+
+from .types import AutomationResult, AutomationStatus
+from .engine import AutomationEngine
+from .runner import RotationRunner, RotationSelector
+
+__all__ = [
+ 'AutomationEngine',
+ 'AutomationResult',
+ 'AutomationStatus',
+ 'RotationRunner',
+ 'RotationSelector',
+]
+
+
diff --git a/src/cerberus/automation/__pycache__/__init__.cpython-313.pyc b/src/cerberus/automation/__pycache__/__init__.cpython-313.pyc
new file mode 100644
index 0000000..81e9bb0
--- /dev/null
+++ b/src/cerberus/automation/__pycache__/__init__.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/__pycache__/discovery.cpython-313.pyc b/src/cerberus/automation/__pycache__/discovery.cpython-313.pyc
new file mode 100644
index 0000000..a7be35d
--- /dev/null
+++ b/src/cerberus/automation/__pycache__/discovery.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/__pycache__/engine.cpython-313.pyc b/src/cerberus/automation/__pycache__/engine.cpython-313.pyc
new file mode 100644
index 0000000..7d6272e
--- /dev/null
+++ b/src/cerberus/automation/__pycache__/engine.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/__pycache__/playwright_engine.cpython-313.pyc b/src/cerberus/automation/__pycache__/playwright_engine.cpython-313.pyc
new file mode 100644
index 0000000..82a546b
--- /dev/null
+++ b/src/cerberus/automation/__pycache__/playwright_engine.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/__pycache__/policy.cpython-313.pyc b/src/cerberus/automation/__pycache__/policy.cpython-313.pyc
new file mode 100644
index 0000000..5887ac4
--- /dev/null
+++ b/src/cerberus/automation/__pycache__/policy.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/__pycache__/runner.cpython-313.pyc b/src/cerberus/automation/__pycache__/runner.cpython-313.pyc
new file mode 100644
index 0000000..c8e90b2
--- /dev/null
+++ b/src/cerberus/automation/__pycache__/runner.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/__pycache__/selenium_engine.cpython-313.pyc b/src/cerberus/automation/__pycache__/selenium_engine.cpython-313.pyc
new file mode 100644
index 0000000..d14e6d7
--- /dev/null
+++ b/src/cerberus/automation/__pycache__/selenium_engine.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/__pycache__/types.cpython-313.pyc b/src/cerberus/automation/__pycache__/types.cpython-313.pyc
new file mode 100644
index 0000000..f2e48d4
--- /dev/null
+++ b/src/cerberus/automation/__pycache__/types.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/discovery.py b/src/cerberus/automation/discovery.py
new file mode 100644
index 0000000..d03c3af
--- /dev/null
+++ b/src/cerberus/automation/discovery.py
@@ -0,0 +1,252 @@
+"""
+Heuristic discovery for password change and reset flows.
+
+This module attempts to dynamically locate "Change password" or "Forgot/Reset password"
+paths and, when possible, automatically submit a password rotation using best-effort
+selectors. It works with any engine that implements the AutomationEngine Protocol.
+"""
+from __future__ import annotations
+
+from dataclasses import dataclass
+import logging
+from typing import List, Optional, Dict, Any, Protocol
+
+from .types import AutomationResult, AutomationStatus
+from ..core.models import PasswordEntry
+
+logger = logging.getLogger("cerberus")
+
+
+DISCOVERY_KEYWORDS = [
+ # English
+ "change password",
+ "password change",
+ "update password",
+ "reset password",
+ "forgot password",
+ "forgot your password",
+ "security",
+ "account settings",
+ # Common non-English hints (basic)
+ "contraseña", "senha", "mot de passe",
+ "passwort", "lozinka", "hasło",
+]
+
+# Common input name/id candidates
+OLD_PW_CANDIDATES = [
+ "current_password",
+ "old_password",
+ "password_current",
+ "passwordOld",
+ "password_old",
+ "existing_password",
+]
+NEW_PW_CANDIDATES = [
+ "new_password",
+ "password_new",
+ "password1",
+ "password",
+ "newPassword",
+]
+CONFIRM_PW_CANDIDATES = [
+ "confirm_password",
+ "password_confirm",
+ "password2",
+ "confirmNewPassword",
+]
+
+SUBMIT_CANDIDATES = [
+ 'button[type="submit"]',
+ 'input[type="submit"]',
+ 'button.primary',
+ 'button.save',
+ 'button.update',
+]
+
+
+@dataclass
+class DiscoveredEndpoint:
+ label: str
+ href: str
+
+
+def _js_find_links_script() -> str:
+ # Returns JSON array of {text, href}
+ return (
+ "(() => {"
+ " const matches = [];"
+ " const anchors = Array.from(document.querySelectorAll('a, button'));"
+ " const kws = new Set([" + ",".join([f"'{' '.join(k.split())}'" for k in DISCOVERY_KEYWORDS]) + "]);"
+ " for (const el of anchors) {"
+ " const text = (el.textContent || '').trim().toLowerCase();"
+ " const aria = (el.getAttribute('aria-label') || '').trim().toLowerCase();"
+ " const title = (el.getAttribute('title') || '').trim().toLowerCase();"
+ " for (const kw of kws) {"
+ " if (text.includes(kw) || aria.includes(kw) || title.includes(kw)) {"
+ " const href = el.getAttribute('href') || '';"
+ " matches.push({text, href});"
+ " break;"
+ " }"
+ " }"
+ " return matches;"
+ "})()"
+ )
+
+
+def discover_password_change(engine, base_url: Optional[str]) -> List[DiscoveredEndpoint]:
+ """Attempt to discover password change or reset endpoints from a base URL.
+
+ Heuristic approach: scan DOM for anchors/buttons whose text matches discovery keywords.
+ """
+ try:
+ # Ensure we're on the base URL first
+ if base_url:
+ engine.goto(base_url)
+ logger.debug("[discovery] scanning links/buttons for keywords")
+ except Exception:
+ logger.debug("[discovery] failed to navigate to base URL")
+ matches = engine.evaluate(_js_find_links_script())
+ endpoints: List[DiscoveredEndpoint] = []
+ if isinstance(matches, list):
+ for m in matches:
+ text = (m.get("text") or "").strip()
+ href = (m.get("href") or "").strip()
+ if href:
+ endpoints.append(DiscoveredEndpoint(text=text, href=href))
+ # Deduplicate by href
+ unique: Dict[str, DiscoveredEndpoint] = {}
+ for e in endpoints:
+ unique[e.href] = e
+ logger.debug(f"[discovery] found {len(unique)} unique endpoints")
+ return list(unique.values())
+
+
+def _try_type(engine, selector: str, value: str) -> bool:
+ try:
+ engine.wait_for(selector, timeout_ms=1000)
+ engine.type(selector, value)
+ logger.debug(f"[discovery] typed into {selector}")
+ return True
+ except Exception:
+ logger.debug(f"[discovery] could not type into {selector}")
+ return False
+
+
+def _try_click(engine, selector: str) -> bool:
+ try:
+ engine.wait_for(selector, timeout_ms=1000)
+ engine.click(selector)
+ logger.debug(f"[discovery] clicked {selector}")
+ return True
+ except Exception:
+ logger.debug(f"[discovery] could not click {selector}")
+ return False
+
+
+def _try_login_if_present(engine, entry: PasswordEntry) -> bool:
+ """Best-effort login if a login form is present on the current page."""
+ try:
+ has_form = engine.evaluate(
+ "(() => { const p = document.querySelector('form input[type=\\'password\\']'); return !!p; })()"
+ )
+ except Exception:
+ has_form = False
+ if not has_form:
+ logger.debug("[discovery] no login form present on page")
+ return False
+ candidates_user = [
+ "input[type='email']",
+ "input[type='text']",
+ "input[name*='user']",
+ "input[id*='user']",
+ "input[name*='email']",
+ "input[id*='email']",
+ "input[name*='login']",
+ "input[id*='login']",
+ ]
+ _ = any(_try_type(engine, sel, entry.username) for sel in candidates_user)
+ typed_pass = _try_type(engine, "input[type='password']", entry.password)
+ submitted = any(_try_click(engine, sel) for sel in SUBMIT_CANDIDATES)
+ if not submitted:
+ try:
+ engine.evaluate("(() => { const p = document.querySelector('input[type=\\'password\\']'); if (p && p.form) { p.form.submit(); return true;} return false; })()")
+ submitted = True
+ except Exception:
+ pass
+ ok = typed_pass and submitted
+ logger.debug(f"[discovery] login attempt result ok={ok}")
+ return ok
+
+
+def try_submit_password_change(engine, entry: PasswordEntry, new_password: str) -> AutomationResult:
+ """Attempt to submit a password change on the current page using common selectors."""
+ def candidates(names: List[str]) -> List[str]:
+ sels: List[str] = []
+ for n in names:
+ sels.append(f"input[name='{n}']")
+ sels.append(f"input[id='{n}']")
+ sels.append(f"input[type='password'][name='{n}']")
+ sels.append(f"input[type='password'][id='{n}']")
+ sels.append(f"input[placeholder*='{n}']")
+ sels.append(f"input[aria-label*='{n}']")
+ return sels
+
+ success_old = any(_try_type(engine, sel, entry.password) for sel in candidates(OLD_PW_CANDIDATES))
+ success_new = any(_try_type(engine, sel, new_password) for sel in candidates(NEW_PW_CANDIDATES))
+ success_confirm = any(_try_type(engine, sel, new_password) for sel in candidates(CONFIRM_PW_CANDIDATES)) or success_new
+
+ submitted = any(_try_click(engine, sel) for sel in SUBMIT_CANDIDATES)
+ if not submitted:
+ try:
+ engine.evaluate("(() => { const el = document.querySelector('input[type=\\'password\\']'); if (el) { el.dispatchEvent(new KeyboardEvent('keydown', {key: 'Enter', bubbles: true})); el.form && el.form.submit && el.form.submit(); return true;} return false; })()")
+ submitted = True
+ except Exception:
+ pass
+
+ if (success_new and success_confirm) and (submitted or success_old):
+ return AutomationResult(status=AutomationStatus.SUCCESS, message="Submitted change attempt")
+ return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Could not auto-submit on page")
+
+
+def auto_change_flow(engine, entry: PasswordEntry, new_password: str) -> AutomationResult:
+ """End-to-end best-effort flow: optional login, discover change endpoint, attempt update."""
+ base = entry.url or (("https://" + entry.website) if entry.website and not entry.website.startswith("http") else entry.website) or ""
+ if base:
+ try:
+ logger.debug(f"[discovery] navigating to base URL: {base}")
+ engine.goto(base)
+ _try_login_if_present(engine, entry)
+ except Exception:
+ logger.debug("[discovery] failed to navigate or login at base URL")
+
+ endpoints = discover_password_change(engine, base if base else None)
+ logger.debug(f"[discovery] candidate endpoints: {len(endpoints)}")
+
+ # Also try common guesses relative to base
+ guesses = [
+ "/account/security",
+ "/settings/security",
+ "/settings/password",
+ "/profile/security",
+ "/user/security",
+ ]
+ for g in guesses:
+ endpoints.append(DiscoveredEndpoint(label=f"guess:{g}", href=(base + g) if base else g))
+
+ for ep in endpoints:
+ try:
+ target = ep.href
+ if not target.startswith("http") and base:
+ target = base + ep.href
+ if not target:
+ continue
+ logger.debug(f"[discovery] trying endpoint: {target}")
+ engine.goto(target)
+ res = try_submit_password_change(engine, entry, new_password)
+ if res.status == AutomationStatus.SUCCESS:
+ return res
+ except Exception:
+ logger.debug("[discovery] endpoint attempt failed, trying next")
+ continue
+
+ return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="No automated flow succeeded")
diff --git a/src/cerberus/automation/engine.py b/src/cerberus/automation/engine.py
new file mode 100644
index 0000000..ade3e53
--- /dev/null
+++ b/src/cerberus/automation/engine.py
@@ -0,0 +1,26 @@
+from typing import Protocol, Optional, Dict, Any
+
+
+class AutomationEngine(Protocol):
+ def start(self, headless: bool = True, user_data_dir: Optional[str] = None) -> None:
+ ...
+
+ def stop(self) -> None:
+ ...
+
+ def goto(self, url: str, wait_until: str = "networkidle") -> None:
+ ...
+
+ def type(self, selector: str, value: str, clear: bool = True) -> None:
+ ...
+
+ def click(self, selector: str) -> None:
+ ...
+
+ def wait_for(self, selector: str, timeout_ms: int = 15000) -> None:
+ ...
+
+ def evaluate(self, script: str, arg: Optional[Dict[str, Any]] = None) -> Any:
+ ...
+
+
diff --git a/src/cerberus/automation/playwright_engine.py b/src/cerberus/automation/playwright_engine.py
new file mode 100644
index 0000000..a61920b
--- /dev/null
+++ b/src/cerberus/automation/playwright_engine.py
@@ -0,0 +1,60 @@
+from typing import Optional, Dict, Any
+
+from playwright.sync_api import sync_playwright, Page, Browser, BrowserContext
+
+
+class PlaywrightEngine:
+ def __init__(self):
+ self._pw = None
+ self._browser: Optional[Browser] = None
+ self._context: Optional[BrowserContext] = None
+ self._page: Optional[Page] = None
+
+ def start(self, headless: bool = True, user_data_dir: Optional[str] = None) -> None:
+ self._pw = sync_playwright().start()
+ launch_args = {"headless": headless}
+ if user_data_dir:
+ self._context = self._pw.chromium.launch_persistent_context(user_data_dir, **launch_args)
+ pages = self._context.pages
+ self._page = pages[0] if pages else self._context.new_page()
+ else:
+ self._browser = self._pw.chromium.launch(**launch_args)
+ self._context = self._browser.new_context()
+ self._page = self._context.new_page()
+
+ def stop(self) -> None:
+ if self._context:
+ self._context.close()
+ if self._browser:
+ self._browser.close()
+ if self._pw:
+ self._pw.stop()
+ self._page = None
+ self._context = None
+ self._browser = None
+ self._pw = None
+
+ def goto(self, url: str, wait_until: str = "networkidle") -> None:
+ assert self._page is not None
+ self._page.goto(url, wait_until=wait_until)
+
+ def type(self, selector: str, value: str, clear: bool = True) -> None:
+ assert self._page is not None
+ locator = self._page.locator(selector)
+ if clear:
+ locator.fill("")
+ locator.type(value)
+
+ def click(self, selector: str) -> None:
+ assert self._page is not None
+ self._page.click(selector)
+
+ def wait_for(self, selector: str, timeout_ms: int = 15000) -> None:
+ assert self._page is not None
+ self._page.wait_for_selector(selector, timeout=timeout_ms)
+
+ def evaluate(self, script: str, arg: Optional[Dict[str, Any]] = None) -> Any:
+ assert self._page is not None
+ return self._page.evaluate(script, arg)
+
+
diff --git a/src/cerberus/automation/policy.py b/src/cerberus/automation/policy.py
new file mode 100644
index 0000000..e4bab80
--- /dev/null
+++ b/src/cerberus/automation/policy.py
@@ -0,0 +1,25 @@
+from typing import Dict
+
+from ..core.password_manager import PasswordManager
+from ..core.models import PasswordEntry
+
+
+SITE_POLICY: Dict[str, Dict] = {
+ # domain_substring: policy
+ "github.com": {"length": 20, "use_upper": True, "use_lower": True, "use_digits": True, "use_special": True},
+ "google.com": {"length": 20, "use_upper": True, "use_lower": True, "use_digits": True, "use_special": False},
+}
+
+
+def generate_for_entry(pm: PasswordManager, entry: PasswordEntry) -> str:
+ url = entry.url or entry.website or ""
+ selected = None
+ for key, policy in SITE_POLICY.items():
+ if key in url:
+ selected = policy
+ break
+ if not selected:
+ selected = {"length": 20, "use_upper": True, "use_lower": True, "use_digits": True, "use_special": True}
+ return pm.generate_password(**selected)
+
+
diff --git a/src/cerberus/automation/runner.py b/src/cerberus/automation/runner.py
new file mode 100644
index 0000000..841e89e
--- /dev/null
+++ b/src/cerberus/automation/runner.py
@@ -0,0 +1,59 @@
+from dataclasses import dataclass
+from typing import List, Optional, Callable
+from datetime import datetime
+
+from ..core.password_manager import PasswordManager
+from ..core.models import PasswordEntry
+from .types import AutomationResult, AutomationStatus
+from .discovery import auto_change_flow
+
+
+@dataclass
+class RotationSelector:
+ all: bool = False
+ compromised_only: bool = False
+ tag: Optional[str] = None
+ domain: Optional[str] = None
+
+
+class RotationRunner:
+ def __init__(self, engine, site_flows: List, password_manager: PasswordManager):
+ self.engine = engine
+ self.site_flows = site_flows
+ self.pm = password_manager
+
+ def _filter_entries(self, selector: RotationSelector) -> List[PasswordEntry]:
+ entries = self.pm.list_passwords()
+ filtered: List[PasswordEntry] = []
+ for e in entries:
+ if selector.compromised_only and not e.compromised:
+ continue
+ if selector.tag and selector.tag not in (e.tags or []):
+ continue
+ if selector.domain and selector.domain not in (e.url or e.website or ""):
+ continue
+ filtered.append(e)
+ return filtered
+
+ def rotate(self, selector: RotationSelector, generate_password: Callable[[PasswordEntry], str], dry_run: bool = True) -> List[AutomationResult]:
+ results: List[AutomationResult] = []
+ targets = self._filter_entries(selector)
+ for entry in targets:
+ new_password = generate_password(entry)
+ if dry_run:
+ results.append(AutomationResult(status=AutomationStatus.SUCCESS, message="dry-run", changed_at=datetime.utcnow()))
+ continue
+
+ flow = next((f for f in self.site_flows if f.match(entry)), None)
+ if flow:
+ res = flow.perform_change(self.engine, entry, new_password)
+ else:
+ # Fallback to heuristic auto discovery/change
+ res = auto_change_flow(self.engine, entry, new_password)
+ if res.status == AutomationStatus.SUCCESS:
+ self.pm.update_password(entry.id, password=new_password)
+ results.append(res)
+
+ return results
+
+
diff --git a/src/cerberus/automation/selenium_engine.py b/src/cerberus/automation/selenium_engine.py
new file mode 100644
index 0000000..3fbdaac
--- /dev/null
+++ b/src/cerberus/automation/selenium_engine.py
@@ -0,0 +1,64 @@
+from typing import Optional, Dict, Any
+
+try:
+ from selenium import webdriver
+ from selenium.webdriver.common.by import By
+ from selenium.webdriver.chrome.options import Options as ChromeOptions
+ SELENIUM_AVAILABLE = True
+except Exception: # pragma: no cover - optional dependency
+ SELENIUM_AVAILABLE = False
+
+
+class SeleniumEngine:
+ def __init__(self):
+ if not SELENIUM_AVAILABLE:
+ raise RuntimeError("Selenium is not installed. Install with: pip install selenium")
+ self._driver: Optional[webdriver.Chrome] = None
+
+ def start(self, headless: bool = True, user_data_dir: Optional[str] = None) -> None:
+ options = ChromeOptions()
+ if headless:
+ options.add_argument("--headless=new")
+ if user_data_dir:
+ options.add_argument(f"--user-data-dir={user_data_dir}")
+ self._driver = webdriver.Chrome(options=options)
+
+ def stop(self) -> None:
+ if self._driver:
+ self._driver.quit()
+ self._driver = None
+
+ def goto(self, url: str, wait_until: str = "networkidle") -> None:
+ assert self._driver is not None
+ self._driver.get(url)
+
+ def type(self, selector: str, value: str, clear: bool = True) -> None:
+ assert self._driver is not None
+ elem = self._driver.find_element(By.CSS_SELECTOR, selector)
+ if clear:
+ elem.clear()
+ elem.send_keys(value)
+
+ def click(self, selector: str) -> None:
+ assert self._driver is not None
+ elem = self._driver.find_element(By.CSS_SELECTOR, selector)
+ elem.click()
+
+ def wait_for(self, selector: str, timeout_ms: int = 15000) -> None:
+ assert self._driver is not None
+ # Simple polling; users can replace with WebDriverWait if desired
+ import time
+ end = time.time() + timeout_ms / 1000.0
+ while time.time() < end:
+ try:
+ self._driver.find_element(By.CSS_SELECTOR, selector)
+ return
+ except Exception:
+ time.sleep(0.1)
+ raise TimeoutError(f"Timeout waiting for selector: {selector}")
+
+ def evaluate(self, script: str, arg: Optional[Dict[str, Any]] = None) -> Any:
+ assert self._driver is not None
+ return self._driver.execute_script(script, arg)
+
+
diff --git a/src/cerberus/automation/sites/__pycache__/apple.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/apple.cpython-313.pyc
new file mode 100644
index 0000000..65c1943
--- /dev/null
+++ b/src/cerberus/automation/sites/__pycache__/apple.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/sites/__pycache__/base_site.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/base_site.cpython-313.pyc
new file mode 100644
index 0000000..28a72e3
--- /dev/null
+++ b/src/cerberus/automation/sites/__pycache__/base_site.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/sites/__pycache__/facebook.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/facebook.cpython-313.pyc
new file mode 100644
index 0000000..fabaf7c
--- /dev/null
+++ b/src/cerberus/automation/sites/__pycache__/facebook.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/sites/__pycache__/github.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/github.cpython-313.pyc
new file mode 100644
index 0000000..776dc1c
--- /dev/null
+++ b/src/cerberus/automation/sites/__pycache__/github.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/sites/__pycache__/google.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/google.cpython-313.pyc
new file mode 100644
index 0000000..ca05800
--- /dev/null
+++ b/src/cerberus/automation/sites/__pycache__/google.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/sites/__pycache__/linkedin.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/linkedin.cpython-313.pyc
new file mode 100644
index 0000000..56f9f97
--- /dev/null
+++ b/src/cerberus/automation/sites/__pycache__/linkedin.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/sites/__pycache__/microsoft.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/microsoft.cpython-313.pyc
new file mode 100644
index 0000000..4b579a5
--- /dev/null
+++ b/src/cerberus/automation/sites/__pycache__/microsoft.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/sites/__pycache__/twitter.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/twitter.cpython-313.pyc
new file mode 100644
index 0000000..976b44f
--- /dev/null
+++ b/src/cerberus/automation/sites/__pycache__/twitter.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/automation/sites/apple.py b/src/cerberus/automation/sites/apple.py
new file mode 100644
index 0000000..1f32946
--- /dev/null
+++ b/src/cerberus/automation/sites/apple.py
@@ -0,0 +1,64 @@
+from typing import Optional, Dict, Any
+from datetime import datetime
+
+from ...core.models import PasswordEntry
+from ..types import AutomationResult, AutomationStatus
+from .base_site import SiteFlow
+
+
+class AppleFlow(SiteFlow):
+ def match(self, entry: PasswordEntry) -> bool:
+ target = (entry.url or entry.website or "").lower()
+ return "apple.com" in target or "appleid.apple.com" in target
+
+ def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult:
+ try:
+ # Apple frequently enforces MFA; treat as best-effort stub
+ engine.goto("https://appleid.apple.com/")
+ engine.wait_for("input[type=email], input[name=email], input[id=email]")
+ try:
+ engine.type("input[type=email], input[name=email], input[id=email]", entry.username)
+ except Exception:
+ pass
+ try:
+ engine.wait_for("input[type=password]", timeout_ms=8000)
+ engine.type("input[type=password]", entry.password)
+ except Exception:
+ return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="Apple ID requires MFA or device approval")
+
+ # Direct to password change if possible (likely gated by MFA)
+ engine.goto("https://appleid.apple.com/account/manage/password")
+ try:
+ engine.wait_for("input[type=password]", timeout_ms=8000)
+ except Exception:
+ return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="Password settings gated by MFA")
+
+ # Attempt to fill current/new/confirm
+ for sel in ["input[name='currentPassword']", "input[id='currentPassword']", "input[type='password']"]:
+ try:
+ engine.type(sel, entry.password)
+ break
+ except Exception:
+ continue
+ for sel in ["input[name='newPassword']", "input[id='newPassword']"]:
+ try:
+ engine.type(sel, new_password)
+ break
+ except Exception:
+ continue
+ for sel in ["input[name='confirmPassword']", "input[id='confirmPassword']"]:
+ try:
+ engine.type(sel, new_password)
+ break
+ except Exception:
+ continue
+ for sel in ["button[type=submit]", "button"]:
+ try:
+ engine.click(sel)
+ break
+ except Exception:
+ continue
+
+ return AutomationResult(status=AutomationStatus.SUCCESS, message="Password change attempted", changed_at=datetime.utcnow())
+ except Exception as e:
+ return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e))
diff --git a/src/cerberus/automation/sites/base_site.py b/src/cerberus/automation/sites/base_site.py
new file mode 100644
index 0000000..a33dc5b
--- /dev/null
+++ b/src/cerberus/automation/sites/base_site.py
@@ -0,0 +1,14 @@
+from typing import Protocol, Optional, Dict, Any
+
+from ...core.models import PasswordEntry
+from ..types import AutomationResult
+
+
+class SiteFlow(Protocol):
+ def match(self, entry: PasswordEntry) -> bool:
+ ...
+
+ def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult:
+ ...
+
+
diff --git a/src/cerberus/automation/sites/facebook.py b/src/cerberus/automation/sites/facebook.py
new file mode 100644
index 0000000..f7b0535
--- /dev/null
+++ b/src/cerberus/automation/sites/facebook.py
@@ -0,0 +1,56 @@
+from typing import Optional, Dict, Any
+from datetime import datetime
+
+from ...core.models import PasswordEntry
+from ..types import AutomationResult, AutomationStatus
+from .base_site import SiteFlow
+
+
+class FacebookFlow(SiteFlow):
+ def match(self, entry: PasswordEntry) -> bool:
+ target = (entry.url or entry.website or "").lower()
+ return "facebook.com" in target or "fb.com" in target
+
+ def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult:
+ try:
+ engine.goto("https://www.facebook.com/login/")
+ engine.wait_for("input[name=email]")
+ engine.type("input[name=email]", entry.username)
+ engine.type("input[name=pass]", entry.password)
+ engine.click("button[name=login]")
+
+ # Password settings
+ engine.goto("https://www.facebook.com/settings?tab=security")
+ # New UI is dynamic; try common selectors
+ try:
+ engine.wait_for("input[type=password]", timeout_ms=8000)
+ except Exception:
+ return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Security page requires interaction or MFA")
+
+ # Attempt common current/new/confirm fields
+ try:
+ engine.type("input[name='current_password']", entry.password)
+ except Exception:
+ pass
+ for sel in ["input[name='new_password']", "input[name='password_new']", "input[name='new']"]:
+ try:
+ engine.type(sel, new_password)
+ break
+ except Exception:
+ continue
+ for sel in ["input[name='confirm_password']", "input[name='password_confirm']", "input[name='confirm']"]:
+ try:
+ engine.type(sel, new_password)
+ break
+ except Exception:
+ continue
+ for sel in ["button[type=submit]", "[data-testid='sec_settings_save']", "button"]:
+ try:
+ engine.click(sel)
+ break
+ except Exception:
+ continue
+
+ return AutomationResult(status=AutomationStatus.SUCCESS, message="Password change attempted", changed_at=datetime.utcnow())
+ except Exception as e:
+ return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e))
diff --git a/src/cerberus/automation/sites/github.py b/src/cerberus/automation/sites/github.py
new file mode 100644
index 0000000..36e25f2
--- /dev/null
+++ b/src/cerberus/automation/sites/github.py
@@ -0,0 +1,53 @@
+from typing import Optional, Dict, Any
+from datetime import datetime
+
+from ...core.models import PasswordEntry
+from ..types import AutomationResult, AutomationStatus
+from .base_site import SiteFlow
+
+
+class GithubFlow(SiteFlow):
+ def match(self, entry: PasswordEntry) -> bool:
+ target = (entry.url or entry.website or "").lower()
+ return "github.com" in target
+
+ def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult:
+ try:
+ # Login
+ engine.goto("https://github.com/login")
+ engine.wait_for("input#login_field")
+ engine.type("input#login_field", entry.username)
+ engine.type("input#password", entry.password)
+ engine.click("input[type=submit]")
+
+ # Detect 2FA requirement
+ try:
+ engine.wait_for("input#otp", timeout_ms=3000)
+ return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="2FA required")
+ except Exception:
+ pass
+
+ # Navigate to password change
+ engine.goto("https://github.com/settings/security")
+ engine.wait_for("a[href='/settings/password']")
+ engine.click("a[href='/settings/password']")
+
+ engine.wait_for("input#old_password")
+ engine.type("input#old_password", entry.password)
+ engine.type("input#new_password", new_password)
+ engine.type("input#confirm_new_password", new_password)
+ engine.click("button[type=submit]")
+
+ # Verify success: check for flash notice
+ try:
+ engine.wait_for(".flash-notice, .flash-success", timeout_ms=5000)
+ except Exception:
+ # As fallback, assume success if no error shown
+ pass
+
+ return AutomationResult(status=AutomationStatus.SUCCESS, message="Password changed", changed_at=datetime.utcnow())
+
+ except Exception as e:
+ return AutomationResult(status=AutomationStatus.FAILED, message="Error during change", error=str(e))
+
+
diff --git a/src/cerberus/automation/sites/google.py b/src/cerberus/automation/sites/google.py
new file mode 100644
index 0000000..4d651b7
--- /dev/null
+++ b/src/cerberus/automation/sites/google.py
@@ -0,0 +1,45 @@
+from typing import Optional, Dict, Any
+from datetime import datetime
+
+from ...core.models import PasswordEntry
+from ..types import AutomationResult, AutomationStatus
+from .base_site import SiteFlow
+
+
+class GoogleFlow(SiteFlow):
+ def match(self, entry: PasswordEntry) -> bool:
+ target = (entry.url or entry.website or "").lower()
+ return "google.com" in target
+
+ def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult:
+ try:
+ # Login flow
+ engine.goto("https://accounts.google.com/signin/v2/identifier")
+ engine.wait_for("input[type=email], input#identifierId")
+ try:
+ engine.type("input[type=email], input#identifierId", entry.username)
+ except Exception:
+ pass
+ engine.click("#identifierNext, button[type=submit]")
+ try:
+ engine.wait_for("input[type=password]", timeout_ms=7000)
+ except Exception:
+ return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="MFA or challenge present")
+ engine.type("input[type=password]", entry.password)
+ engine.click("#passwordNext, button[type=submit]")
+
+ # Security settings - direct link (subject to change)
+ engine.goto("https://myaccount.google.com/signinoptions/password")
+ engine.wait_for("input[type=password]")
+ engine.type("input[type=password]", entry.password)
+ engine.click("button[type=submit], #passwordNext")
+
+ # New password fields
+ engine.wait_for("input[name=password]")
+ engine.type("input[name=password]", new_password)
+ engine.type("input[name=confirmation_password]", new_password)
+ engine.click("button[type=submit]")
+
+ return AutomationResult(status=AutomationStatus.SUCCESS, message="Password changed", changed_at=datetime.utcnow())
+ except Exception as e:
+ return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e))
diff --git a/src/cerberus/automation/sites/linkedin.py b/src/cerberus/automation/sites/linkedin.py
new file mode 100644
index 0000000..cb8ecea
--- /dev/null
+++ b/src/cerberus/automation/sites/linkedin.py
@@ -0,0 +1,56 @@
+from typing import Optional, Dict, Any
+from datetime import datetime
+
+from ...core.models import PasswordEntry
+from ..types import AutomationResult, AutomationStatus
+from .base_site import SiteFlow
+
+
+class LinkedInFlow(SiteFlow):
+ def match(self, entry: PasswordEntry) -> bool:
+ target = (entry.url or entry.website or "").lower()
+ return "linkedin.com" in target
+
+ def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult:
+ try:
+ engine.goto("https://www.linkedin.com/login")
+ engine.wait_for("input#username")
+ engine.type("input#username", entry.username)
+ engine.type("input#password", entry.password)
+ engine.click("button[type=submit]")
+
+ engine.goto("https://www.linkedin.com/psettings/change-password")
+ try:
+ engine.wait_for("input[type=password]", timeout_ms=8000)
+ except Exception:
+ return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Password settings gated or MFA required")
+
+ # Fill current/new/confirm
+ for sel in ["input[name='currentPassword']", "input#current-password", "input[type='password']"]:
+ try:
+ engine.type(sel, entry.password)
+ break
+ except Exception:
+ continue
+ for sel in ["input[name='newPassword']", "input#new-password"]:
+ try:
+ engine.type(sel, new_password)
+ break
+ except Exception:
+ continue
+ for sel in ["input[name='confirmPassword']", "input#confirm-password"]:
+ try:
+ engine.type(sel, new_password)
+ break
+ except Exception:
+ continue
+ for sel in ["button[type=submit]", "button"]:
+ try:
+ engine.click(sel)
+ break
+ except Exception:
+ continue
+
+ return AutomationResult(status=AutomationStatus.SUCCESS, message="Password change attempted", changed_at=datetime.utcnow())
+ except Exception as e:
+ return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e))
diff --git a/src/cerberus/automation/sites/microsoft.py b/src/cerberus/automation/sites/microsoft.py
new file mode 100644
index 0000000..8ddee76
--- /dev/null
+++ b/src/cerberus/automation/sites/microsoft.py
@@ -0,0 +1,40 @@
+from typing import Optional, Dict, Any
+from datetime import datetime
+
+from ...core.models import PasswordEntry
+from ..types import AutomationResult, AutomationStatus
+from .base_site import SiteFlow
+
+
+class MicrosoftFlow(SiteFlow):
+ def match(self, entry: PasswordEntry) -> bool:
+ target = (entry.url or entry.website or "").lower()
+ return "microsoft.com" in target or "live.com" in target or "office.com" in target
+
+ def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult:
+ try:
+ # Login page
+ engine.goto("https://login.live.com/")
+ engine.wait_for("input[type=email], input[name=loginfmt]")
+ engine.type("input[type=email], input[name=loginfmt]", entry.username)
+ engine.click("input[type=submit], button[type=submit]")
+
+ # Password step
+ try:
+ engine.wait_for("input[type=password]", timeout_ms=8000)
+ except Exception:
+ return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="MFA or challenge present")
+ engine.type("input[type=password]", entry.password)
+ engine.click("input[type=submit], button[type=submit]")
+
+ # Navigate to security/password change
+ engine.goto("https://account.live.com/password/change")
+ engine.wait_for("input[name=OldPassword], input[name=CurrentPassword]")
+ engine.type("input[name=OldPassword], input[name=CurrentPassword]", entry.password)
+ engine.type("input[name=NewPassword], input[name=NewPasswordBox]", new_password)
+ engine.type("input[name=ConfirmPassword], input[name=ConfirmNewPasswordBox]", new_password)
+ engine.click("button[type=submit], input[type=submit]")
+
+ return AutomationResult(status=AutomationStatus.SUCCESS, message="Password changed", changed_at=datetime.utcnow())
+ except Exception as e:
+ return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e))
diff --git a/src/cerberus/automation/sites/twitter.py b/src/cerberus/automation/sites/twitter.py
new file mode 100644
index 0000000..609eda7
--- /dev/null
+++ b/src/cerberus/automation/sites/twitter.py
@@ -0,0 +1,42 @@
+from typing import Optional, Dict, Any
+from datetime import datetime
+
+from ...core.models import PasswordEntry
+from ..types import AutomationResult, AutomationStatus
+from .base_site import SiteFlow
+
+
+class TwitterFlow(SiteFlow):
+ def match(self, entry: PasswordEntry) -> bool:
+ target = (entry.url or entry.website or "").lower()
+ return "twitter.com" in target or "x.com" in target
+
+ def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult:
+ try:
+ # Login flow
+ engine.goto("https://twitter.com/i/flow/login")
+ engine.wait_for("input[name='text']")
+ engine.type("input[name='text']", entry.username)
+ engine.click("div[role='button'][data-testid='LoginForm_Login_Button'], div[role='button']")
+ try:
+ engine.wait_for("input[name='password']", timeout_ms=8000)
+ except Exception:
+ return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="MFA or challenge present")
+ engine.type("input[name='password']", entry.password)
+ engine.click("div[role='button'][data-testid='LoginForm_Login_Button']")
+
+ # Settings (paths change often; best-effort direct link)
+ engine.goto("https://twitter.com/settings/password")
+ engine.wait_for("input[name='current_password'], input[type='password']")
+ # Try to fill typical current/new/confirm fields
+ try:
+ engine.type("input[name='current_password']", entry.password)
+ except Exception:
+ pass
+ engine.type("input[name='new_password']", new_password)
+ engine.type("input[name='password_confirmation']", new_password)
+ engine.click("div[role='button'], button[type='submit']")
+
+ return AutomationResult(status=AutomationStatus.SUCCESS, message="Password changed", changed_at=datetime.utcnow())
+ except Exception as e:
+ return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e))
diff --git a/src/cerberus/automation/types.py b/src/cerberus/automation/types.py
new file mode 100644
index 0000000..f236a28
--- /dev/null
+++ b/src/cerberus/automation/types.py
@@ -0,0 +1,22 @@
+from dataclasses import dataclass, field
+from enum import Enum
+from typing import Optional, Dict, Any
+from datetime import datetime
+
+
+class AutomationStatus(str, Enum):
+ SUCCESS = "success"
+ NEEDS_MFA = "needs_mfa"
+ NEEDS_MANUAL = "needs_manual"
+ FAILED = "failed"
+
+
+@dataclass
+class AutomationResult:
+ status: AutomationStatus
+ message: str = ""
+ changed_at: Optional[datetime] = None
+ error: Optional[str] = None
+ details: Dict[str, Any] = field(default_factory=dict)
+
+
diff --git a/src/cerberus/cli.py b/src/cerberus/cli.py
new file mode 100644
index 0000000..902d4be
--- /dev/null
+++ b/src/cerberus/cli.py
@@ -0,0 +1,53 @@
+import argparse
+from typing import List
+
+from cerberus.core.password_manager import PasswordManager
+from cerberus.automation.playwright_engine import PlaywrightEngine
+from cerberus.automation.selenium_engine import SeleniumEngine, SELENIUM_AVAILABLE
+from cerberus.automation.runner import RotationRunner, RotationSelector
+from cerberus.automation.policy import generate_for_entry
+from cerberus.automation.sites.github import GithubFlow
+
+
+def cli():
+ parser = argparse.ArgumentParser(prog="cerberus")
+ sub = parser.add_subparsers(dest="command")
+
+ rotate = sub.add_parser("rotate", help="Rotate passwords via web automation")
+ rotate.add_argument("--engine", choices=["playwright", "selenium"], default="playwright")
+ rotate.add_argument("--data-dir", default=None, help="Password manager data dir")
+ rotate.add_argument("--master", required=True, help="Master password")
+ rotate.add_argument("--all", action="store_true", help="Rotate all entries")
+ rotate.add_argument("--compromised", action="store_true", help="Rotate only compromised entries")
+ rotate.add_argument("--tag", default=None)
+ rotate.add_argument("--domain", default=None)
+ rotate.add_argument("--dry-run", action="store_true")
+
+ args = parser.parse_args()
+
+ if args.command == "rotate":
+ pm = PasswordManager(data_dir=args.data_dir, master_password=args.master)
+
+ engine = PlaywrightEngine() if args.engine == "playwright" else None
+ if args.engine == "selenium":
+ if not SELENIUM_AVAILABLE:
+ raise SystemExit("Selenium not installed. Install extra: pip install .[automation-selenium]")
+ engine = SeleniumEngine()
+
+ engine.start(headless=True)
+ try:
+ flows = [GithubFlow()]
+ runner = RotationRunner(engine, flows, pm)
+ selector = RotationSelector(
+ all=args.all,
+ compromised_only=args.compromised,
+ tag=args.tag,
+ domain=args.domain,
+ )
+ results = runner.rotate(selector, lambda e: generate_for_entry(pm, e), dry_run=args.dry_run)
+ for r in results:
+ print(f"{r.status}: {r.message}")
+ finally:
+ engine.stop()
+ else:
+ parser.print_help()
diff --git a/src/cerberus/cli/__init__.py b/src/cerberus/cli/__init__.py
new file mode 100644
index 0000000..d3e17ef
--- /dev/null
+++ b/src/cerberus/cli/__init__.py
@@ -0,0 +1,305 @@
+"""
+Cerberus CLI - Command Line Interface for Cerberus Password Manager.
+"""
+from typing import Optional, List, Dict, Any
+import logging
+import sys
+import json
+from pathlib import Path
+from datetime import datetime
+import getpass
+
+import click
+from rich.console import Console
+from rich.table import Table
+from rich.progress import Progress, SpinnerColumn, TextColumn
+
+from ..core.password_manager import PasswordManager, VaultError
+from ..core.models import PasswordEntry
+from ..tui import main as tui_main
+
+# Configure logging
+logging.basicConfig(level=logging.INFO, format='%(message)s')
+logger = logging.getLogger(__name__)
+
+# Create console for rich output
+console = Console()
+
+class CerberusCLI:
+ """Main CLI application for Cerberus Password Manager."""
+
+ def __init__(self, data_dir: Optional[str] = None, debug: bool = False):
+ """Initialize the CLI."""
+ self.data_dir = data_dir
+ self.debug = debug
+ self.pm: Optional[PasswordManager] = None
+
+ if debug:
+ logging.getLogger().setLevel(logging.DEBUG)
+ logger.debug("Debug mode enabled")
+
+ def ensure_initialized(self) -> None:
+ """Ensure the password manager is initialized."""
+ if not self.pm:
+ raise click.UsageError("Vault is not unlocked. Use 'cerberus unlock' first.")
+
+ def unlock_vault(self, master_password: Optional[str] = None) -> None:
+ """Unlock the password vault."""
+ try:
+ if not master_password:
+ master_password = getpass.getpass("Master password: ")
+
+ with self._progress_spinner("Unlocking vault..."):
+ self.pm = PasswordManager(data_dir=self.data_dir, master_password=master_password)
+
+ console.print("[green]✓[/] Vault unlocked successfully!")
+
+ except VaultError as e:
+ raise click.ClickException(f"Failed to unlock vault: {e}")
+ except Exception as e:
+ if self.debug:
+ logger.exception("Error unlocking vault")
+ raise click.ClickException(f"An error occurred: {e}")
+
+ def _progress_spinner(self, description: str):
+ """Create a progress spinner context manager."""
+ return Progress(
+ SpinnerColumn(),
+ TextColumn("[progress.description]{task.description}"),
+ console=console,
+ transient=True
+ )
+
+ def list_entries(self, search: Optional[str] = None) -> List[PasswordEntry]:
+ """List password entries, optionally filtered by search term."""
+ self.ensure_initialized()
+
+ try:
+ entries = self.pm.get_entries()
+
+ if search:
+ search = search.lower()
+ entries = [
+ e for e in entries
+ if search in e.website.lower() or
+ search in (e.username or "").lower() or
+ search in (e.notes or "").lower() or
+ any(search in tag.lower() for tag in (e.tags or []))
+ ]
+
+ return entries
+
+ except Exception as e:
+ if self.debug:
+ logger.exception("Error listing entries")
+ raise click.ClickException(f"Failed to list entries: {e}")
+
+ def get_entry(self, identifier: str) -> PasswordEntry:
+ """Get a specific password entry by ID or website."""
+ self.ensure_initialized()
+
+ try:
+ # Try to get by ID first
+ try:
+ return self.pm.get_entry(identifier)
+ except (ValueError, KeyError):
+ # If not found by ID, try by website
+ entries = [e for e in self.pm.get_entries() if e.website.lower() == identifier.lower()]
+ if not entries:
+ raise ValueError(f"No entry found with ID or website: {identifier}")
+ if len(entries) > 1:
+ raise ValueError(f"Multiple entries found for website: {identifier}. Please use the entry ID instead.")
+ return entries[0]
+
+ except Exception as e:
+ if self.debug:
+ logger.exception(f"Error getting entry: {identifier}")
+ raise click.ClickException(str(e))
+
+ def add_entry(
+ self,
+ website: str,
+ username: str,
+ password: Optional[str] = None,
+ url: str = "",
+ notes: str = "",
+ tags: Optional[List[str]] = None,
+ generate: bool = False,
+ length: int = 16,
+ special_chars: bool = True
+ ) -> PasswordEntry:
+ """Add a new password entry."""
+ self.ensure_initialized()
+
+ try:
+ if generate:
+ with self._progress_spinner("Generating strong password..."):
+ password = self.pm.generate_password_easy(length=length, special=special_chars)
+ elif not password:
+ password = click.prompt("Password", hide_input=True, confirmation_prompt=True)
+
+ entry = PasswordEntry(
+ id=self.pm.generate_id(),
+ website=website,
+ username=username,
+ password=password,
+ url=url,
+ notes=notes,
+ tags=tags or [],
+ created_at=datetime.utcnow(),
+ updated_at=datetime.utcnow()
+ )
+
+ with self._progress_spinner("Saving entry..."):
+ self.pm.add_entry(entry)
+
+ return entry
+
+ except Exception as e:
+ if self.debug:
+ logger.exception("Error adding entry")
+ raise click.ClickException(f"Failed to add entry: {e}")
+
+ def update_entry(self, entry_id: str, **updates) -> PasswordEntry:
+ """Update an existing password entry."""
+ self.ensure_initialized()
+
+ try:
+ entry = self.get_entry(entry_id)
+
+ # Apply updates
+ for key, value in updates.items():
+ if value is not None and hasattr(entry, key):
+ setattr(entry, key, value)
+
+ entry.updated_at = datetime.utcnow()
+
+ with self._progress_spinner("Updating entry..."):
+ self.pm.update_entry(entry)
+
+ return entry
+
+ except Exception as e:
+ if self.debug:
+ logger.exception(f"Error updating entry: {entry_id}")
+ raise click.ClickException(f"Failed to update entry: {e}")
+
+ def delete_entry(self, entry_id: str) -> None:
+ """Delete a password entry."""
+ self.ensure_initialized()
+
+ try:
+ entry = self.get_entry(entry_id)
+
+ if click.confirm(f"Are you sure you want to delete the entry for {entry.website}?"):
+ with self._progress_spinner("Deleting entry..."):
+ self.pm.delete_entry(entry.id)
+
+ console.print(f"[green]✓[/] Deleted entry for {entry.website}")
+
+ except Exception as e:
+ if self.debug:
+ logger.exception(f"Error deleting entry: {entry_id}")
+ raise click.ClickException(f"Failed to delete entry: {e}")
+
+ def rotate_password(
+ self,
+ entry_id: str,
+ length: int = 32,
+ special_chars: bool = True
+ ) -> PasswordEntry:
+ """Generate a new password for an entry."""
+ self.ensure_initialized()
+
+ try:
+ entry = self.get_entry(entry_id)
+
+ with self._progress_spinner("Generating new password..."):
+ new_password = self.pm.generate_password_easy(length=length, special=special_chars)
+
+ entry.password = new_password
+ entry.updated_at = datetime.utcnow()
+
+ with self._progress_spinner("Saving updated entry..."):
+ self.pm.update_entry(entry)
+
+ return entry
+
+ except Exception as e:
+ if self.debug:
+ logger.exception(f"Error rotating password for entry: {entry_id}")
+ raise click.ClickException(f"Failed to rotate password: {e}")
+
+ def export_entries(self, output_file: str, format: str = "json") -> None:
+ """Export password entries to a file."""
+ self.ensure_initialized()
+
+ try:
+ entries = self.pm.get_entries()
+ output_path = Path(output_file).expanduser().resolve()
+
+ with self._progress_spinner(f"Exporting entries to {output_path}..."):
+ if format.lower() == "json":
+ data = [e.to_dict() for e in entries]
+ output_path.write_text(json.dumps(data, indent=2, default=str))
+ else:
+ raise ValueError(f"Unsupported export format: {format}")
+
+ console.print(f"[green]✓[/] Exported {len(entries)} entries to {output_path}")
+
+ except Exception as e:
+ if self.debug:
+ logger.exception("Error exporting entries")
+ raise click.ClickException(f"Failed to export entries: {e}")
+
+ def import_entries(self, input_file: str, format: str = "json") -> None:
+ """Import password entries from a file."""
+ self.ensure_initialized()
+
+ try:
+ input_path = Path(input_file).expanduser().resolve()
+
+ if not input_path.exists():
+ raise FileNotFoundError(f"Input file not found: {input_path}")
+
+ with self._progress_spinner(f"Importing entries from {input_path}..."):
+ if format.lower() == "json":
+ data = json.loads(input_path.read_text())
+ for item in data:
+ entry = PasswordEntry.from_dict(item)
+ # Ensure we don't overwrite existing entries
+ entry.id = self.pm.generate_id()
+ self.pm.add_entry(entry)
+ else:
+ raise ValueError(f"Unsupported import format: {format}")
+
+ console.print(f"[green]✓[/] Imported {len(data)} entries from {input_path}")
+
+ except Exception as e:
+ if self.debug:
+ logger.exception("Error importing entries")
+ raise click.ClickException(f"Failed to import entries: {e}")
+
+def print_entry_table(entries: List[PasswordEntry]) -> None:
+ """Print a table of password entries."""
+ if not entries:
+ console.print("[yellow]No entries found.[/]")
+ return
+
+ table = Table(show_header=True, header_style="bold magenta")
+ table.add_column("ID", style="dim", width=8)
+ table.add_column("Website")
+ table.add_column("Username")
+ table.add_column("Last Used", style="dim")
+ table.add_column("Updated", style="dim")
+
+ for entry in entries:
+ table.add_row(
+ entry.id[:8],
+ entry.website,
+ entry.username,
+ entry.last_used.strftime("%Y-%m-%d") if entry.last_used else "Never",
+ entry.updated_at.strftime("%Y-%m-%d") if entry.updated_at else ""
+ )
+
+ console.print(table)
diff --git a/src/cerberus/cli/__pycache__/__init__.cpython-313.pyc b/src/cerberus/cli/__pycache__/__init__.cpython-313.pyc
new file mode 100644
index 0000000..093e1af
--- /dev/null
+++ b/src/cerberus/cli/__pycache__/__init__.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/cli/__pycache__/main.cpython-313.pyc b/src/cerberus/cli/__pycache__/main.cpython-313.pyc
new file mode 100644
index 0000000..3663a5a
--- /dev/null
+++ b/src/cerberus/cli/__pycache__/main.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/cli/main.py b/src/cerberus/cli/main.py
new file mode 100644
index 0000000..c40c411
--- /dev/null
+++ b/src/cerberus/cli/main.py
@@ -0,0 +1,611 @@
+"""
+Cerberus CLI - Command Line Interface for Cerberus Password Manager.
+"""
+import os
+import sys
+import logging
+from typing import Optional
+
+import click
+from rich.console import Console
+from rich.logging import RichHandler
+
+from . import CerberusCLI, print_entry_table
+from ..automation.playwright_engine import PlaywrightEngine
+from ..automation.selenium_engine import SeleniumEngine, SELENIUM_AVAILABLE
+from ..automation.runner import RotationRunner, RotationSelector
+from ..automation.sites.github import GithubFlow
+from ..automation.sites.google import GoogleFlow
+from ..automation.sites.microsoft import MicrosoftFlow
+from ..automation.sites.twitter import TwitterFlow
+from ..automation.sites.facebook import FacebookFlow
+from ..automation.sites.linkedin import LinkedInFlow
+from ..automation.sites.apple import AppleFlow
+from ..automation.policy import generate_for_entry
+from ..automation.types import AutomationStatus
+
+# Configure logging
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(message)s",
+ datefmt="[%X]",
+ handlers=[RichHandler(rich_tracebacks=True)]
+)
+logger = logging.getLogger("cerberus")
+
+# Create console for rich output
+console = Console()
+
+# Default data directory
+DEFAULT_DATA_DIR = os.path.expanduser("~/.cerberus")
+
+@click.group(invoke_without_command=True)
+@click.option(
+ "--data-dir",
+ type=click.Path(file_okay=False, dir_okay=True, writable=True),
+ default=DEFAULT_DATA_DIR,
+ help="Directory to store password data",
+ show_default=True
+)
+@click.option(
+ "--debug/--no-debug",
+ default=False,
+ help="Enable debug output",
+ show_default=True
+)
+@click.pass_context
+def cli(ctx: click.Context, data_dir: str, debug: bool) -> None:
+ """Cerberus Password Manager - Secure and user-friendly password management."""
+ # Set debug logging if enabled
+ if debug:
+ logging.getLogger().setLevel(logging.DEBUG)
+ logger.debug("Debug mode enabled")
+
+ # Create data directory if it doesn't exist
+ os.makedirs(data_dir, exist_ok=True)
+
+ # Store the CLI instance in the context
+ ctx.obj = CerberusCLI(data_dir=data_dir, debug=debug)
+
+ # If no command is provided, show help
+ if ctx.invoked_subcommand is None:
+ click.echo(ctx.get_help())
+
+@cli.command()
+@click.pass_obj
+def init(cli: CerberusCLI) -> None:
+ """Initialize a new password vault."""
+ master_password = click.prompt(
+ "Enter a master password",
+ hide_input=True,
+ confirmation_prompt=True
+ )
+
+ try:
+ cli.unlock_vault(master_password)
+ console.print("[green]✓[/] Vault initialized and unlocked!")
+ except Exception as e:
+ console.print(f"[red]✗[/] Failed to initialize vault: {e}")
+ sys.exit(1)
+
+@cli.command()
+@click.option("--engine", type=click.Choice(["playwright", "selenium"]), default="playwright", show_default=True)
+@click.option("--all", "all_", is_flag=True, default=False, help="Include all entries")
+@click.option("--compromised", is_flag=True, default=False, help="Only compromised entries")
+@click.option("--tag", default=None, help="Filter by tag")
+@click.option("--domain", default=None, help="Filter by domain")
+@click.pass_obj
+def reliability_report(
+ cli: CerberusCLI,
+ engine: str,
+ all_: bool,
+ compromised: bool,
+ tag: Optional[str],
+ domain: Optional[str],
+) -> None:
+ """Run a dry-run rotate across selected entries and report SUCCESS/NEEDS_MANUAL/FAILED counts."""
+ try:
+ try:
+ cli.ensure_initialized()
+ except Exception:
+ # Attempt interactive unlock
+ password = click.prompt("Master password", hide_input=True)
+ cli.unlock_vault(password)
+ if engine == "playwright":
+ eng = PlaywrightEngine()
+ else:
+ if not SELENIUM_AVAILABLE:
+ raise click.ClickException("Selenium not installed. Install extra: pip install .[automation-selenium]")
+ eng = SeleniumEngine()
+
+ eng.start(headless=True)
+ try:
+ flows = [
+ GithubFlow(), GoogleFlow(), MicrosoftFlow(), TwitterFlow(),
+ FacebookFlow(), LinkedInFlow(), AppleFlow()
+ ]
+ runner = RotationRunner(eng, flows, cli.pm) # type: ignore[arg-type]
+ selector = RotationSelector(
+ all=all_,
+ compromised_only=compromised,
+ tag=tag,
+ domain=domain,
+ )
+ results = runner.rotate(selector, lambda e: generate_for_entry(cli.pm, e), dry_run=True) # type: ignore[arg-type]
+ counts = {AutomationStatus.SUCCESS: 0, AutomationStatus.NEEDS_MANUAL: 0, AutomationStatus.FAILED: 0}
+ for r in results:
+ counts[r.status] = counts.get(r.status, 0) + 1
+ console.print("[bold]Reliability Report[/bold]")
+ console.print(f"SUCCESS: {counts.get(AutomationStatus.SUCCESS, 0)}")
+ console.print(f"NEEDS_MANUAL: {counts.get(AutomationStatus.NEEDS_MANUAL, 0)}")
+ console.print(f"FAILED: {counts.get(AutomationStatus.FAILED, 0)}")
+ finally:
+ eng.stop()
+ except Exception as e:
+ console.print(f"[red]✗[/] Reliability report failed: {e}")
+ if cli.debug:
+ import traceback
+ console.print(traceback.format_exc())
+ sys.exit(1)
+
+@cli.command()
+@click.option(
+ "--password",
+ help="Master password (prompt if not provided)",
+ default=None
+)
+@click.pass_obj
+def unlock(cli: CerberusCLI, password: Optional[str]) -> None:
+ """Unlock the password vault."""
+ try:
+ cli.unlock_vault(password)
+ console.print("[green]✓[/] Vault unlocked!")
+ except Exception as e:
+ console.print(f"[red]✗[/] Failed to unlock vault: {e}")
+ sys.exit(1)
+
+@cli.command()
+@click.option(
+ "--search",
+ "-s",
+ help="Filter entries by search term"
+)
+@click.pass_obj
+def list(cli: CerberusCLI, search: Optional[str]) -> None:
+ """List all password entries."""
+ try:
+ entries = cli.list_entries(search)
+ print_entry_table(entries)
+ except Exception as e:
+ console.print(f"[red]✗[/] Failed to list entries: {e}")
+ sys.exit(1)
+
+@cli.command()
+@click.argument("identifier")
+@click.pass_obj
+def show(cli: CerberusCLI, identifier: str) -> None:
+ """Show details for a specific password entry."""
+ try:
+ entry = cli.get_entry(identifier)
+
+ console.print(f"[bold]Entry:[/bold] {entry.id}")
+ console.print(f"[bold]Website:[/bold] {entry.website}")
+ console.print(f"[bold]Username:[/bold] {entry.username}")
+ console.print(f"[bold]Password:[/bold] {'*' * 12} (use 'cerberus copy-password {entry.id}' to copy)")
+
+ if entry.url:
+ console.print(f"[bold]URL:[/bold] {entry.url}")
+ if entry.notes:
+ console.print("[bold]Notes:[/bold]")
+ console.print(entry.notes)
+ if entry.tags:
+ console.print(f"[bold]Tags:[/bold] {', '.join(entry.tags)}")
+
+ console.print(f"[dim]Created: {entry.created_at}")
+ console.print(f"[dim]Updated: {entry.updated_at}")
+ if entry.last_used:
+ console.print(f"[dim]Last Used: {entry.last_used}")
+
+ except Exception as e:
+ console.print(f"[red]✗[/] Failed to get entry: {e}")
+ sys.exit(1)
+
+@cli.command()
+@click.option(
+ "--website",
+ "-w",
+ required=True,
+ help="Website or service name"
+)
+@click.option(
+ "--username",
+ "-u",
+ required=True,
+ help="Username or email"
+)
+@click.option(
+ "--password",
+ "-p",
+ help="Password (prompt if not provided)"
+)
+@click.option(
+ "--url",
+ help="Website URL"
+)
+@click.option(
+ "--notes",
+ "-n",
+ help="Additional notes"
+)
+@click.option(
+ "--tag",
+ "-t",
+ "tags",
+ multiple=True,
+ help="Tags for organization (can be used multiple times)"
+)
+@click.option(
+ "--generate/--no-generate",
+ "-g",
+ default=False,
+ help="Generate a strong password"
+)
+@click.option(
+ "--length",
+ "-l",
+ type=int,
+ default=16,
+ help="Length of generated password"
+)
+@click.option(
+ "--no-special-chars",
+ is_flag=True,
+ default=False,
+ help="Exclude special characters from generated password"
+)
+@click.pass_obj
+def add(
+ cli: CerberusCLI,
+ website: str,
+ username: str,
+ password: Optional[str],
+ url: str,
+ notes: str,
+ tags: list,
+ generate: bool,
+ length: int,
+ no_special_chars: bool
+) -> None:
+ """Add a new password entry."""
+ try:
+ entry = cli.add_entry(
+ website=website,
+ username=username,
+ password=password,
+ url=url,
+ notes=notes,
+ tags=list(tags) if tags else None,
+ generate=generate,
+ length=length,
+ special_chars=not no_special_chars
+ )
+
+ console.print(f"[green]✓[/] Added entry for [bold]{entry.website}[/bold]")
+ if generate:
+ console.print(f"Generated password: [yellow]{entry.password}[/]")
+
+ except Exception as e:
+ console.print(f"[red]✗[/] Failed to add entry: {e}")
+ sys.exit(1)
+
+@cli.command()
+@click.argument("identifier")
+@click.option(
+ "--website",
+ "-w",
+ help="Update website name"
+)
+@click.option(
+ "--username",
+ "-u",
+ help="Update username"
+)
+@click.option(
+ "--password",
+ "-p",
+ help="Update password (prompt if not provided)"
+)
+@click.option(
+ "--url",
+ help="Update website URL"
+)
+@click.option(
+ "--notes",
+ "-n",
+ help="Update notes"
+)
+@click.option(
+ "--tag",
+ "-t",
+ "tags",
+ multiple=True,
+ help="Update tags (use --tag=clear to remove all tags)"
+)
+@click.pass_obj
+def edit(
+ cli: CerberusCLI,
+ identifier: str,
+ website: str,
+ username: str,
+ password: str,
+ url: str,
+ notes: str,
+ tags: list
+) -> None:
+ """Edit an existing password entry."""
+ try:
+ # Get the current entry
+ entry = cli.get_entry(identifier)
+
+ # Prepare updates
+ updates = {}
+
+ if website is not None:
+ updates["website"] = website
+ if username is not None:
+ updates["username"] = username
+ if password is not None:
+ if password == "":
+ password = click.prompt("New password", hide_input=True, confirmation_prompt=True)
+ updates["password"] = password
+ if url is not None:
+ updates["url"] = url
+ if notes is not None:
+ updates["notes"] = notes
+ if tags:
+ if tags == ("clear",):
+ updates["tags"] = []
+ else:
+ updates["tags"] = list(tags)
+
+ if not updates:
+ console.print("[yellow]No changes specified.[/]")
+ return
+
+ # Apply updates
+ updated_entry = cli.update_entry(entry.id, **updates)
+ console.print(f"[green]✓[/] Updated entry for [bold]{updated_entry.website}[/]")
+
+ except Exception as e:
+ console.print(f"[red]✗[/] Failed to update entry: {e}")
+ sys.exit(1)
+
+@cli.command()
+@click.argument("identifier")
+@click.option(
+ "--length",
+ "-l",
+ type=int,
+ default=32,
+ help="Length of the new password"
+)
+@click.option(
+ "--no-special-chars",
+ is_flag=True,
+ default=False,
+ help="Exclude special characters from the new password"
+)
+@click.pass_obj
+def rotate(
+ cli: CerberusCLI,
+ identifier: str,
+ length: int,
+ no_special_chars: bool
+) -> None:
+ """Generate a new password for an entry."""
+ try:
+ entry = cli.rotate_password(
+ identifier,
+ length=length,
+ special_chars=not no_special_chars
+ )
+
+ console.print(f"[green]✓[/] Rotated password for [bold]{entry.website}[/]")
+ console.print(f"New password: [yellow]{entry.password}[/]")
+
+ except Exception as e:
+ console.print(f"[red]✗[/] Failed to rotate password: {e}")
+ sys.exit(1)
+
+@cli.command()
+@click.argument("identifier")
+@click.pass_obj
+def delete(cli: CerberusCLI, identifier: str) -> None:
+ """Delete a password entry."""
+ try:
+ cli.delete_entry(identifier)
+ except Exception as e:
+ console.print(f"[red]✗[/] Failed to delete entry: {e}")
+ sys.exit(1)
+
+@cli.command()
+@click.argument("identifier")
+@click.pass_obj
+def copy_username(cli: CerberusCLI, identifier: str) -> None:
+ """Copy username to clipboard."""
+ try:
+ entry = cli.get_entry(identifier)
+
+ # Use platform-specific clipboard handling
+ import pyperclip
+ pyperclip.copy(entry.username)
+
+ console.print(f"[green]✓[/] Copied username for [bold]{entry.website}[/] to clipboard")
+
+ except Exception as e:
+ console.print(f"[red]✗[/] Failed to copy username: {e}")
+ sys.exit(1)
+
+@cli.command()
+@click.argument("identifier")
+@click.pass_obj
+def copy_password(cli: CerberusCLI, identifier: str) -> None:
+ """Copy password to clipboard."""
+ try:
+ entry = cli.get_entry(identifier)
+
+ # Use platform-specific clipboard handling
+ import pyperclip
+ pyperclip.copy(entry.password)
+
+ console.print(f"[green]✓[/] Copied password for [bold]{entry.website}[/] to clipboard")
+
+ # Clear clipboard after 30 seconds
+ import threading
+ import time
+
+ def clear_clipboard():
+ time.sleep(30)
+ if pyperclip.paste() == entry.password:
+ pyperclip.copy("")
+ console.print("[yellow]✓[/] Clipboard cleared")
+
+ threading.Thread(target=clear_clipboard, daemon=True).start()
+
+ except Exception as e:
+ console.print(f"[red]✗[/] Failed to copy password: {e}")
+ sys.exit(1)
+
+@cli.command()
+@click.argument("output_file", type=click.Path())
+@click.option(
+ "--format",
+ "-f",
+ type=click.Choice(["json"], case_sensitive=False),
+ default="json",
+ help="Export format"
+)
+@click.pass_obj
+def export(cli: CerberusCLI, output_file: str, format: str) -> None:
+ """Export password entries to a file."""
+ try:
+ cli.export_entries(output_file, format=format)
+ except Exception as e:
+ console.print(f"[red]✗[/] Failed to export entries: {e}")
+ sys.exit(1)
+
+@cli.command()
+@click.argument("input_file", type=click.Path(exists=True))
+@click.option(
+ "--format",
+ "-f",
+ type=click.Choice(["json"], case_sensitive=False),
+ default="json",
+ help="Import format"
+)
+@click.pass_obj
+def import_entries(cli: CerberusCLI, input_file: str, format: str) -> None:
+ """Import password entries from a file."""
+ try:
+ cli.import_entries(input_file, format=format)
+ except Exception as e:
+ console.print(f"[red]✗[/] Failed to import entries: {e}")
+ sys.exit(1)
+
+@cli.command()
+@click.pass_obj
+def tui(cli: CerberusCLI) -> None:
+ """Launch the Terminal User Interface."""
+ from ..tui import main as tui_main
+ tui_main()
+
+@cli.command()
+@click.pass_obj
+def gui(cli: CerberusCLI) -> None:
+ """Launch the Graphical User Interface."""
+ from ..gui import run_app
+ run_app()
+
+@cli.command()
+@click.argument("identifier", required=False)
+@click.option("--engine", type=click.Choice(["playwright", "selenium"]), default="playwright", show_default=True)
+@click.option("--all", "all_", is_flag=True, default=False, help="Rotate all entries")
+@click.option("--compromised", is_flag=True, default=False, help="Only compromised entries")
+@click.option("--tag", default=None, help="Filter by tag")
+@click.option("--domain", default=None, help="Filter by domain")
+@click.option("--dry-run", is_flag=True, default=False, help="Do not perform changes, simulate only")
+@click.option("--user-data-dir", type=str, default=None, help="Browser user data dir for persistent sessions")
+@click.option("--no-headless", is_flag=True, default=False, help="Run browser with a visible window")
+@click.pass_obj
+def web_rotate(
+ cli: CerberusCLI,
+ identifier: Optional[str],
+ engine: str,
+ all_: bool,
+ compromised: bool,
+ tag: Optional[str],
+ domain: Optional[str],
+ dry_run: bool,
+ user_data_dir: Optional[str],
+ no_headless: bool,
+) -> None:
+ """Rotate password(s) on websites via web automation with dynamic discovery.
+
+ If IDENTIFIER is provided, attempts to rotate only that entry (by id or website).
+ Otherwise uses filters (--all/--tag/--domain/--compromised).
+ """
+ try:
+ cli.ensure_initialized()
+ # Create automation engine
+ if engine == "playwright":
+ eng = PlaywrightEngine()
+ else:
+ if not SELENIUM_AVAILABLE:
+ raise click.ClickException("Selenium not installed. Install extra: pip install .[automation-selenium]")
+ eng = SeleniumEngine()
+
+ eng.start(headless=not no_headless, user_data_dir=user_data_dir)
+ try:
+ flows = [
+ GithubFlow(), GoogleFlow(), MicrosoftFlow(), TwitterFlow(),
+ FacebookFlow(), LinkedInFlow(), AppleFlow()
+ ]
+ runner = RotationRunner(eng, flows, cli.pm) # type: ignore[arg-type]
+ if identifier:
+ # Build selector to target specific entry
+ selector = RotationSelector(all=False)
+ # Temporarily filter entries by overriding internals using domain/website matching
+ # We'll rely on runner._filter_entries via domain filter
+ # Best-effort: put identifier into domain filter
+ domain = identifier
+ selector = RotationSelector(
+ all=all_,
+ compromised_only=compromised,
+ tag=tag,
+ domain=domain,
+ )
+ results = runner.rotate(selector, lambda e: generate_for_entry(cli.pm, e), dry_run=dry_run) # type: ignore[arg-type]
+ for r in results:
+ console.print(f"[bold]{r.status.value}[/]: {r.message}")
+ finally:
+ eng.stop()
+ except Exception as e:
+ console.print(f"[red]✗[/] Web rotate failed: {e}")
+ if cli.debug:
+ import traceback
+ console.print(traceback.format_exc())
+ sys.exit(1)
+
+def main() -> None:
+ """Entry point for the Cerberus CLI."""
+ try:
+ cli()
+ except Exception as e:
+ console.print(f"[red]Error:[/red] {e}")
+ if cli.obj and cli.obj.debug:
+ import traceback
+ console.print(traceback.format_exc())
+ sys.exit(1)
+
+if __name__ == "__main__":
+ main()
diff --git a/src/cerberus/core/Makefile b/src/cerberus/core/Makefile
new file mode 100644
index 0000000..14569a9
--- /dev/null
+++ b/src/cerberus/core/Makefile
@@ -0,0 +1,31 @@
+# Compiler and flags
+CC = gcc
+CFLAGS = -fPIC -O2 -Wall -Wextra
+LDFLAGS = -shared -fPIC -lssl -lcrypto -luuid
+TARGET = libcerberus.so
+SOURCES = cerberus.c
+OBJECTS = $(SOURCES:.c=.o)
+
+.PHONY: all clean
+
+all: $(TARGET)
+
+$(TARGET): $(OBJECTS)
+ $(CC) $(LDFLAGS) -o $@ $^
+
+%.o: %.c
+ $(CC) $(CFLAGS) -c -o $@ $<
+
+clean:
+ rm -f $(TARGET) $(OBJECTS)
+
+install: $(TARGET)
+ cp $(TARGET) /usr/local/lib/
+ ldconfig
+
+uninstall:
+ rm -f /usr/local/lib/$(TARGET)
+ ldconfig
+
+test: $(TARGET)
+ python3 -m pytest tests/
diff --git a/src/cerberus/core/__init__.py b/src/cerberus/core/__init__.py
new file mode 100644
index 0000000..5541b05
--- /dev/null
+++ b/src/cerberus/core/__init__.py
@@ -0,0 +1,127 @@
+"""Cerberus Core - Core functionality for the Cerberus password manager.
+
+This module provides the core functionality for the Cerberus password manager,
+including the C core bindings and high-level password management interfaces.
+"""
+
+import os
+import cffi
+from pathlib import Path
+from typing import Optional, Any
+
+# Initialize CFFI (exported for callers that need to manage buffers)
+ffi = cffi.FFI()
+
+def _load_header():
+ cdef_src = '''
+ typedef unsigned int uint32_t;
+ typedef unsigned long size_t;
+ typedef long time_t;
+ typedef int bool;
+
+ typedef struct cerb_vault_t cerb_vault_t;
+ typedef struct cerb_entry_t cerb_entry_t;
+
+ typedef struct {
+ char id[37];
+ char website[256];
+ char username[256];
+ char password[1024];
+ char notes[4096];
+ char url[1024];
+ time_t created_at;
+ time_t updated_at;
+ } cerb_entry_basic_t;
+
+ int cerb_crypto_init(void);
+ void cerb_crypto_cleanup(void);
+
+ int cerb_vault_create(const char *master_password, cerb_vault_t **vault);
+ int cerb_vault_open(const char *master_password, const char *vault_path, cerb_vault_t **vault);
+ int cerb_vault_save(cerb_vault_t *vault, const char *vault_path);
+ void cerb_vault_close(cerb_vault_t *vault);
+
+ int cerb_vault_add_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry);
+ int cerb_vault_update_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry);
+ int cerb_vault_delete_entry(cerb_vault_t *vault, const char *entry_id);
+ int cerb_vault_get_entry_basic(cerb_vault_t *vault, const char *entry_id, cerb_entry_basic_t *entry);
+ int cerb_vault_get_entries_basic(cerb_vault_t *vault, cerb_entry_basic_t **entries, size_t *count);
+ int cerb_vault_search_basic(cerb_vault_t *vault, const char *query, cerb_entry_basic_t **results, size_t *count);
+
+ int cerb_generate_password(uint32_t length, bool use_upper, bool use_lower, bool use_digits, bool use_special, char *buffer, size_t buffer_size);
+ void cerb_generate_uuid(char *uuid);
+ time_t cerb_current_timestamp(void);
+ '''
+ ffi.cdef(cdef_src)
+
+# Load the header
+_load_header()
+
+# Try to load the compiled library
+_lib = None
+
+def init() -> bool:
+ """Initialize the Cerberus C core.
+
+ Returns:
+ bool: True if initialization was successful, False otherwise
+ """
+ global _lib
+
+ if _lib is not None:
+ return True
+
+ # Try multiple candidate names
+ candidates = [
+ Path(__file__).parent / 'libcerberus.so',
+ Path(__file__).parent / 'cerberus.so'
+ ]
+ for lib_path in candidates:
+ try:
+ _lib = ffi.dlopen(str(lib_path))
+ return True
+ except OSError:
+ continue
+ _lib = None
+ return False
+
+# Initialize on import
+if not init():
+ class _DummyLib:
+ def __getattribute__(self, name: str) -> Any:
+ raise RuntimeError(
+ "Cerberus C core not initialized. "
+ "Please ensure the core is compiled and in your library path."
+ )
+ _lib = _DummyLib()
+ CORE_AVAILABLE = False
+else:
+ CORE_AVAILABLE = True
+
+# Re-export the C functions with proper typing
+for name in dir(_lib):
+ if name.startswith('cerb_'):
+ globals()[name] = getattr(_lib, name)
+
+# Error code constants (must match cerberus.h enum)
+CERB_OK = 0
+CERB_ERROR = -1
+
+# Clean up the namespace (keep ffi exported)
+try:
+ del _DummyLib
+except NameError:
+ pass
+del os, Path, _load_header, init
+
+# Export high-level interfaces
+from .password_manager import PasswordManager
+from .models import PasswordEntry
+
+__all__ = [
+ 'PasswordManager',
+ 'PasswordEntry',
+ 'VaultError',
+ 'CoreNotAvailableError',
+ 'ffi'
+]
diff --git a/src/cerberus/core/__pycache__/__init__.cpython-313.pyc b/src/cerberus/core/__pycache__/__init__.cpython-313.pyc
new file mode 100644
index 0000000..1807bd2
--- /dev/null
+++ b/src/cerberus/core/__pycache__/__init__.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/core/__pycache__/models.cpython-313.pyc b/src/cerberus/core/__pycache__/models.cpython-313.pyc
new file mode 100644
index 0000000..ffc2dd6
--- /dev/null
+++ b/src/cerberus/core/__pycache__/models.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/core/__pycache__/password_manager.cpython-313.pyc b/src/cerberus/core/__pycache__/password_manager.cpython-313.pyc
new file mode 100644
index 0000000..813cf2b
--- /dev/null
+++ b/src/cerberus/core/__pycache__/password_manager.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/core/cerberus.c b/src/cerberus/core/cerberus.c
new file mode 100644
index 0000000..8511e3a
--- /dev/null
+++ b/src/cerberus/core/cerberus.c
@@ -0,0 +1,530 @@
+#include "cerberus.h"
+#include <string.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <openssl/evp.h>
+#include <openssl/rand.h>
+#include <openssl/err.h>
+// uuid/uuid.h not required; implement UUID v4 using RAND_bytes
+
+// Vault structure
+typedef struct {
+ uint8_t salt[SALT_LEN];
+ uint8_t key[KEY_LEN];
+ bool key_initialized;
+ cerb_entry_t *entries;
+ size_t num_entries;
+ size_t capacity;
+} cerb_vault_internal_t;
+
+// Initialize crypto
+cerb_error_t cerb_crypto_init(void) {
+ OpenSSL_add_all_algorithms();
+ ERR_load_crypto_strings();
+ return RAND_poll() ? CERB_OK : CERB_CRYPTO_ERROR;
+}
+
+// Cleanup crypto
+void cerb_crypto_cleanup(void) {
+ EVP_cleanup();
+ ERR_free_strings();
+}
+
+// Create new vault
+cerb_error_t cerb_vault_create(const char *master_password, cerb_vault_t **vault) {
+ if (!master_password || !vault) return CERB_INVALID_ARG;
+
+ cerb_vault_internal_t *v = calloc(1, sizeof(cerb_vault_internal_t));
+ if (!v) return CERB_MEMORY_ERROR;
+
+ if (RAND_bytes(v->salt, SALT_LEN) != 1) {
+ free(v);
+ return CERB_CRYPTO_ERROR;
+ }
+
+ // Derive key from password and salt
+ if (!PKCS5_PBKDF2_HMAC(master_password, (int)strlen(master_password),
+ v->salt, SALT_LEN, PBKDF2_ITERATIONS,
+ EVP_sha256(), KEY_LEN, v->key)) {
+ free(v);
+ return CERB_CRYPTO_ERROR;
+ }
+ v->key_initialized = true;
+
+ v->capacity = 32;
+ v->entries = calloc(v->capacity, sizeof(cerb_entry_t));
+ if (!v->entries) {
+ free(v);
+ return CERB_MEMORY_ERROR;
+ }
+
+ *vault = (cerb_vault_t *)v;
+ return CERB_OK;
+}
+
+// Save vault to file (AES-256-GCM encrypted blob)
+cerb_error_t cerb_vault_save(cerb_vault_t *vault, const char *vault_path) {
+ if (!vault || !vault_path) return CERB_INVALID_ARG;
+ cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault;
+
+ FILE *fp = fopen(vault_path, "wb");
+ if (!fp) return CERB_STORAGE_ERROR;
+
+ // Serialize entries: [num_entries][entries...]
+ size_t plain_len = sizeof(uint32_t) + v->num_entries * sizeof(cerb_entry_t);
+ unsigned char *plaintext = malloc(plain_len);
+ if (!plaintext) { fclose(fp); return CERB_MEMORY_ERROR; }
+
+ uint32_t n = (uint32_t)v->num_entries;
+ memcpy(plaintext, &n, sizeof(uint32_t));
+ if (v->num_entries > 0) {
+ memcpy(plaintext + sizeof(uint32_t), v->entries, v->num_entries * sizeof(cerb_entry_t));
+ }
+
+ // Prepare AES-GCM
+ unsigned char iv[IV_LEN];
+ if (RAND_bytes(iv, IV_LEN) != 1) { free(plaintext); fclose(fp); return CERB_CRYPTO_ERROR; }
+ unsigned char *ciphertext = malloc(plain_len);
+ if (!ciphertext) { free(plaintext); fclose(fp); return CERB_MEMORY_ERROR; }
+ int len = 0, ciphertext_len = 0;
+ unsigned char tag[16];
+
+ EVP_CIPHER_CTX *ctx = EVP_CIPHER_CTX_new();
+ if (!ctx) { free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; }
+
+ if (EVP_EncryptInit_ex(ctx, EVP_aes_256_gcm(), NULL, NULL, NULL) != 1) {
+ EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR;
+ }
+ if (EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_IVLEN, IV_LEN, NULL) != 1) {
+ EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR;
+ }
+ if (EVP_EncryptInit_ex(ctx, NULL, NULL, v->key, iv) != 1) {
+ EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR;
+ }
+
+ if (EVP_EncryptUpdate(ctx, ciphertext, &len, plaintext, (int)plain_len) != 1) {
+ EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR;
+ }
+ ciphertext_len = len;
+ if (EVP_EncryptFinal_ex(ctx, ciphertext + len, &len) != 1) {
+ EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR;
+ }
+ ciphertext_len += len;
+ if (EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_GET_TAG, 16, tag) != 1) {
+ EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR;
+ }
+ EVP_CIPHER_CTX_free(ctx);
+
+ // Write file: MAGIC, VERSION, SALT, IV, TAG, CIPHERTEXT_LEN, CIPHERTEXT
+ const char magic[8] = { 'C','E','R','B','E','R','U','S' };
+ uint32_t version = 1;
+ uint32_t clen = (uint32_t)ciphertext_len;
+
+ if (fwrite(magic, 1, sizeof(magic), fp) != sizeof(magic) ||
+ fwrite(&version, 1, sizeof(version), fp) != sizeof(version) ||
+ fwrite(v->salt, 1, SALT_LEN, fp) != SALT_LEN ||
+ fwrite(iv, 1, IV_LEN, fp) != IV_LEN ||
+ fwrite(tag, 1, sizeof(tag), fp) != sizeof(tag) ||
+ fwrite(&clen, 1, sizeof(clen), fp) != sizeof(clen) ||
+ fwrite(ciphertext, 1, ciphertext_len, fp) != (size_t)ciphertext_len) {
+ free(plaintext); free(ciphertext); fclose(fp); return CERB_STORAGE_ERROR;
+ }
+
+ free(plaintext);
+ free(ciphertext);
+ fclose(fp);
+ return CERB_OK;
+}
+
+// Open vault from file
+cerb_error_t cerb_vault_open(const char *master_password, const char *vault_path, cerb_vault_t **vault) {
+ if (!master_password || !vault_path || !vault) return CERB_INVALID_ARG;
+ FILE *fp = fopen(vault_path, "rb");
+ if (!fp) return CERB_STORAGE_ERROR;
+
+ const char expected_magic[8] = { 'C','E','R','B','E','R','U','S' };
+ char magic[8];
+ uint32_t version = 0;
+ unsigned char salt[SALT_LEN];
+ unsigned char iv[IV_LEN];
+ unsigned char tag[16];
+ uint32_t clen = 0;
+
+ if (fread(magic, 1, sizeof(magic), fp) != sizeof(magic) ||
+ memcmp(magic, expected_magic, sizeof(magic)) != 0 ||
+ fread(&version, 1, sizeof(version), fp) != sizeof(version) ||
+ fread(salt, 1, SALT_LEN, fp) != SALT_LEN ||
+ fread(iv, 1, IV_LEN, fp) != IV_LEN ||
+ fread(tag, 1, sizeof(tag), fp) != sizeof(tag) ||
+ fread(&clen, 1, sizeof(clen), fp) != sizeof(clen)) {
+ fclose(fp);
+ return CERB_STORAGE_ERROR;
+ }
+
+ unsigned char *ciphertext = malloc(clen);
+ if (!ciphertext) { fclose(fp); return CERB_MEMORY_ERROR; }
+ if (fread(ciphertext, 1, clen, fp) != clen) { free(ciphertext); fclose(fp); return CERB_STORAGE_ERROR; }
+ fclose(fp);
+
+ // Derive key
+ unsigned char key[KEY_LEN];
+ if (!PKCS5_PBKDF2_HMAC(master_password, (int)strlen(master_password),
+ salt, SALT_LEN, PBKDF2_ITERATIONS,
+ EVP_sha256(), KEY_LEN, key)) {
+ free(ciphertext);
+ return CERB_CRYPTO_ERROR;
+ }
+
+ // Decrypt
+ unsigned char *plaintext = malloc(clen); // ciphertext_len >= plaintext_len
+ if (!plaintext) { free(ciphertext); return CERB_MEMORY_ERROR; }
+ int len = 0, plain_len = 0;
+ cerb_error_t status = CERB_OK;
+ EVP_CIPHER_CTX *ctx = EVP_CIPHER_CTX_new();
+ if (!ctx) { free(ciphertext); free(plaintext); return CERB_CRYPTO_ERROR; }
+ if (EVP_DecryptInit_ex(ctx, EVP_aes_256_gcm(), NULL, NULL, NULL) != 1) status = CERB_CRYPTO_ERROR;
+ if (status == CERB_OK && EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_IVLEN, IV_LEN, NULL) != 1) status = CERB_CRYPTO_ERROR;
+ if (status == CERB_OK && EVP_DecryptInit_ex(ctx, NULL, NULL, key, iv) != 1) status = CERB_CRYPTO_ERROR;
+ if (status == CERB_OK && EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, (int)clen) != 1) status = CERB_CRYPTO_ERROR;
+ plain_len = len;
+ if (status == CERB_OK && EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_TAG, 16, tag) != 1) status = CERB_CRYPTO_ERROR;
+ if (status == CERB_OK && EVP_DecryptFinal_ex(ctx, plaintext + len, &len) != 1) status = CERB_CRYPTO_ERROR;
+ plain_len += len;
+ EVP_CIPHER_CTX_free(ctx);
+ if (status != CERB_OK) { free(ciphertext); free(plaintext); return CERB_CRYPTO_ERROR; }
+
+ // Deserialize
+ if ((size_t)plain_len < sizeof(uint32_t)) { free(ciphertext); free(plaintext); return CERB_STORAGE_ERROR; }
+ uint32_t n = 0; memcpy(&n, plaintext, sizeof(uint32_t));
+ size_t expected = sizeof(uint32_t) + (size_t)n * sizeof(cerb_entry_t);
+ if ((size_t)plain_len != expected) { free(ciphertext); free(plaintext); return CERB_STORAGE_ERROR; }
+
+ cerb_vault_internal_t *v = calloc(1, sizeof(cerb_vault_internal_t));
+ if (!v) { free(ciphertext); free(plaintext); return CERB_MEMORY_ERROR; }
+ memcpy(v->salt, salt, SALT_LEN);
+ memcpy(v->key, key, KEY_LEN);
+ v->key_initialized = true;
+ v->capacity = n > 0 ? n : 32;
+ v->entries = calloc(v->capacity, sizeof(cerb_entry_t));
+ if (!v->entries) { free(v); free(ciphertext); free(plaintext); return CERB_MEMORY_ERROR; }
+ v->num_entries = n;
+ if (n > 0) {
+ memcpy(v->entries, plaintext + sizeof(uint32_t), (size_t)n * sizeof(cerb_entry_t));
+ }
+
+ *vault = (cerb_vault_t *)v;
+ free(ciphertext);
+ free(plaintext);
+ return CERB_OK;
+}
+
+// Add entry to vault
+cerb_error_t cerb_vault_add_entry(cerb_vault_t *vault, const cerb_entry_t *entry) {
+ if (!vault || !entry) return CERB_INVALID_ARG;
+
+ cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault;
+
+ // Check for duplicates
+ for (size_t i = 0; i < v->num_entries; i++) {
+ if (strcmp(v->entries[i].id, entry->id) == 0) {
+ return CERB_DUPLICATE;
+ }
+ }
+
+ // Resize if needed
+ if (v->num_entries >= v->capacity) {
+ size_t new_capacity = v->capacity * 2;
+ cerb_entry_t *new_entries = realloc(v->entries, new_capacity * sizeof(cerb_entry_t));
+ if (!new_entries) return CERB_MEMORY_ERROR;
+ v->entries = new_entries;
+ v->capacity = new_capacity;
+ }
+
+ // Add entry
+ v->entries[v->num_entries++] = *entry;
+ return CERB_OK;
+}
+
+// Update existing entry
+cerb_error_t cerb_vault_update_entry(cerb_vault_t *vault, const cerb_entry_t *entry) {
+ if (!vault || !entry) return CERB_INVALID_ARG;
+
+ cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault;
+
+ for (size_t i = 0; i < v->num_entries; i++) {
+ if (strcmp(v->entries[i].id, entry->id) == 0) {
+ v->entries[i] = *entry;
+ return CERB_OK;
+ }
+ }
+
+ return CERB_NOT_FOUND;
+}
+
+// Delete entry by ID
+cerb_error_t cerb_vault_delete_entry(cerb_vault_t *vault, const char *entry_id) {
+ if (!vault || !entry_id) return CERB_INVALID_ARG;
+
+ cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault;
+
+ for (size_t i = 0; i < v->num_entries; i++) {
+ if (strcmp(v->entries[i].id, entry_id) == 0) {
+ // Move last entry into this slot to keep array compact
+ if (i != v->num_entries - 1) {
+ v->entries[i] = v->entries[v->num_entries - 1];
+ }
+ memset(&v->entries[v->num_entries - 1], 0, sizeof(cerb_entry_t));
+ v->num_entries--;
+ return CERB_OK;
+ }
+ }
+
+ return CERB_NOT_FOUND;
+}
+
+// Get entry by ID
+cerb_error_t cerb_vault_get_entry(cerb_vault_t *vault, const char *entry_id, cerb_entry_t *entry) {
+ if (!vault || !entry_id || !entry) return CERB_INVALID_ARG;
+
+ cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault;
+
+ for (size_t i = 0; i < v->num_entries; i++) {
+ if (strcmp(v->entries[i].id, entry_id) == 0) {
+ *entry = v->entries[i];
+ return CERB_OK;
+ }
+ }
+
+ return CERB_NOT_FOUND;
+}
+
+// Get all entries (returns a newly allocated array the caller must free)
+cerb_error_t cerb_vault_get_entries(cerb_vault_t *vault, cerb_entry_t **entries, size_t *count) {
+ if (!vault || !entries || !count) return CERB_INVALID_ARG;
+
+ cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault;
+
+ if (v->num_entries == 0) {
+ *entries = NULL;
+ *count = 0;
+ return CERB_OK;
+ }
+
+ cerb_entry_t *out = calloc(v->num_entries, sizeof(cerb_entry_t));
+ if (!out) return CERB_MEMORY_ERROR;
+
+ memcpy(out, v->entries, v->num_entries * sizeof(cerb_entry_t));
+ *entries = out;
+ *count = v->num_entries;
+ return CERB_OK;
+}
+
+// Basic substring search across website, username, and url
+cerb_error_t cerb_vault_search(cerb_vault_t *vault, const char *query, cerb_entry_t **results, size_t *count) {
+ if (!vault || !results || !count) return CERB_INVALID_ARG;
+
+ cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault;
+
+ if (!query || *query == '\0') {
+ return cerb_vault_get_entries(vault, results, count);
+ }
+
+ size_t matched = 0;
+ // First pass: count
+ for (size_t i = 0; i < v->num_entries; i++) {
+ if ((strstr(v->entries[i].website, query) != NULL) ||
+ (strstr(v->entries[i].username, query) != NULL) ||
+ (strstr(v->entries[i].url, query) != NULL)) {
+ matched++;
+ }
+ }
+
+ if (matched == 0) {
+ *results = NULL;
+ *count = 0;
+ return CERB_OK;
+ }
+
+ cerb_entry_t *out = calloc(matched, sizeof(cerb_entry_t));
+ if (!out) return CERB_MEMORY_ERROR;
+
+ size_t idx = 0;
+ for (size_t i = 0; i < v->num_entries; i++) {
+ if ((strstr(v->entries[i].website, query) != NULL) ||
+ (strstr(v->entries[i].username, query) != NULL) ||
+ (strstr(v->entries[i].url, query) != NULL)) {
+ out[idx++] = v->entries[i];
+ }
+ }
+
+ *results = out;
+ *count = matched;
+ return CERB_OK;
+}
+
+// Basic variants for FFI (flattened struct)
+static void entry_to_basic(const cerb_entry_t *in, cerb_entry_basic_t *out) {
+ memset(out, 0, sizeof(*out));
+ strncpy(out->id, in->id, sizeof(out->id)-1);
+ strncpy(out->website, in->website, sizeof(out->website)-1);
+ strncpy(out->username, in->username, sizeof(out->username)-1);
+ strncpy(out->password, in->password, sizeof(out->password)-1);
+ strncpy(out->notes, in->notes, sizeof(out->notes)-1);
+ strncpy(out->url, in->url, sizeof(out->url)-1);
+ out->created_at = in->created_at;
+ out->updated_at = in->updated_at;
+}
+
+static void basic_to_entry(const cerb_entry_basic_t *in, cerb_entry_t *out) {
+ memset(out, 0, sizeof(*out));
+ strncpy(out->id, in->id, sizeof(out->id)-1);
+ strncpy(out->website, in->website, sizeof(out->website)-1);
+ strncpy(out->username, in->username, sizeof(out->username)-1);
+ strncpy(out->password, in->password, sizeof(out->password)-1);
+ strncpy(out->notes, in->notes, sizeof(out->notes)-1);
+ strncpy(out->url, in->url, sizeof(out->url)-1);
+ out->created_at = in->created_at;
+ out->updated_at = in->updated_at;
+}
+
+cerb_error_t cerb_vault_add_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry) {
+ cerb_entry_t full;
+ basic_to_entry(entry, &full);
+ return cerb_vault_add_entry(vault, &full);
+}
+
+cerb_error_t cerb_vault_update_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry) {
+ cerb_entry_t full;
+ basic_to_entry(entry, &full);
+ return cerb_vault_update_entry(vault, &full);
+}
+
+cerb_error_t cerb_vault_get_entry_basic(cerb_vault_t *vault, const char *entry_id, cerb_entry_basic_t *entry) {
+ cerb_entry_t full;
+ cerb_error_t r = cerb_vault_get_entry(vault, entry_id, &full);
+ if (r != CERB_OK) return r;
+ entry_to_basic(&full, entry);
+ return CERB_OK;
+}
+
+cerb_error_t cerb_vault_get_entries_basic(cerb_vault_t *vault, cerb_entry_basic_t **entries, size_t *count) {
+ cerb_entry_t *full = NULL;
+ cerb_error_t r = cerb_vault_get_entries(vault, &full, count);
+ if (r != CERB_OK) return r;
+ if (*count == 0) { *entries = NULL; return CERB_OK; }
+ cerb_entry_basic_t *out = calloc(*count, sizeof(cerb_entry_basic_t));
+ if (!out) return CERB_MEMORY_ERROR;
+ for (size_t i = 0; i < *count; i++) entry_to_basic(&full[i], &out[i]);
+ *entries = out;
+ free(full);
+ return CERB_OK;
+}
+
+cerb_error_t cerb_vault_search_basic(cerb_vault_t *vault, const char *query, cerb_entry_basic_t **results, size_t *count) {
+ cerb_entry_t *full = NULL;
+ cerb_error_t r = cerb_vault_search(vault, query, &full, count);
+ if (r != CERB_OK) return r;
+ if (*count == 0) { *results = NULL; return CERB_OK; }
+ cerb_entry_basic_t *out = calloc(*count, sizeof(cerb_entry_basic_t));
+ if (!out) return CERB_MEMORY_ERROR;
+ for (size_t i = 0; i < *count; i++) entry_to_basic(&full[i], &out[i]);
+ *results = out;
+ free(full);
+ return CERB_OK;
+}
+
+// Generate password
+cerb_error_t cerb_generate_password(
+ uint32_t length,
+ bool use_upper,
+ bool use_lower,
+ bool use_digits,
+ bool use_special,
+ char *buffer,
+ size_t buffer_size
+) {
+ if (!buffer || length < 8 || length > MAX_PASSWORD_LEN || buffer_size < length + 1) {
+ return CERB_INVALID_ARG;
+ }
+
+ const char *lower = "abcdefghijklmnopqrstuvwxyz";
+ const char *upper = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+ const char *digits = "0123456789";
+ const char *special = "!@#$%^&*()-_=+[]{}|;:,.<>?";
+
+ char charset[256] = {0};
+ size_t pos = 0;
+
+ if (use_lower) { strcpy(charset + pos, lower); pos += strlen(lower); }
+ if (use_upper) { strcpy(charset + pos, upper); pos += strlen(upper); }
+ if (use_digits) { strcpy(charset + pos, digits); pos += strlen(digits); }
+ if (use_special) { strcpy(charset + pos, special); pos += strlen(special); }
+
+ if (pos == 0) return CERB_INVALID_ARG;
+
+ // Generate random password
+ for (size_t i = 0; i < length; i++) {
+ unsigned char byte;
+ do {
+ if (RAND_bytes(&byte, 1) != 1) {
+ return CERB_CRYPTO_ERROR;
+ }
+ } while (byte >= (256 / pos) * pos);
+
+ buffer[i] = charset[byte % pos];
+ }
+
+ buffer[length] = '\0';
+ return CERB_OK;
+}
+
+// Generate UUID v4 (xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx)
+void cerb_generate_uuid(char *uuid) {
+ unsigned char bytes[16];
+ if (RAND_bytes(bytes, sizeof(bytes)) != 1) {
+ // Fallback to zeroed UUID on failure
+ memset(uuid, '0', 36);
+ uuid[8] = uuid[13] = uuid[18] = uuid[23] = '-';
+ uuid[36] = '\0';
+ return;
+ }
+ // Set version (4)
+ bytes[6] = (bytes[6] & 0x0F) | 0x40;
+ // Set variant (10xx)
+ bytes[8] = (bytes[8] & 0x3F) | 0x80;
+
+ static const char *hex = "0123456789abcdef";
+ int p = 0;
+ for (int i = 0; i < 16; i++) {
+ if (i == 4 || i == 6 || i == 8 || i == 10) {
+ uuid[p++] = '-';
+ }
+ uuid[p++] = hex[(bytes[i] >> 4) & 0x0F];
+ uuid[p++] = hex[bytes[i] & 0x0F];
+ }
+ uuid[p] = '\0';
+}
+
+// Get current timestamp
+time_t cerb_current_timestamp(void) {
+ return time(NULL);
+}
+
+// Cleanup vault
+void cerb_vault_close(cerb_vault_t *vault) {
+ if (!vault) return;
+
+ cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault;
+
+ // Securely wipe sensitive data
+ memset(v->key, 0, KEY_LEN);
+ memset(v->salt, 0, SALT_LEN);
+
+ // Wipe entries
+ for (size_t i = 0; i < v->num_entries; i++) {
+ memset(&v->entries[i], 0, sizeof(cerb_entry_t));
+ }
+
+ free(v->entries);
+ free(v);
+}
diff --git a/src/cerberus/core/cerberus.h b/src/cerberus/core/cerberus.h
new file mode 100644
index 0000000..06c736a
--- /dev/null
+++ b/src/cerberus/core/cerberus.h
@@ -0,0 +1,144 @@
+#ifndef CERBERUS_CORE_H
+#define CERBERUS_CORE_H
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <time.h>
+
+// Maximum lengths for fields
+#define MAX_WEBSITE_LEN 256
+#define MAX_USERNAME_LEN 256
+#define MAX_PASSWORD_LEN 1024
+#define MAX_NOTES_LEN 4096
+#define MAX_TAGS 32
+#define MAX_TAG_LEN 64
+#define MAX_CUSTOM_FIELDS 32
+#define MAX_CUSTOM_KEY_LEN 64
+#define MAX_CUSTOM_VALUE_LEN 1024
+#define MAX_ENTRIES 65536
+#define SALT_LEN 32
+#define KEY_LEN 32 // 256 bits for AES-256
+#define IV_LEN 16 // 128 bits for AES block size
+#define PBKDF2_ITERATIONS 100000
+
+// Error codes
+typedef enum {
+ CERB_OK = 0,
+ CERB_ERROR = -1,
+ CERB_INVALID_ARG = -2,
+ CERB_NOT_FOUND = -3,
+ CERB_DUPLICATE = -4,
+ CERB_STORAGE_ERROR = -5,
+ CERB_CRYPTO_ERROR = -6,
+ CERB_MEMORY_ERROR = -7,
+ CERB_INVALID_STATE = -8,
+ CERB_NOT_IMPLEMENTED = -9
+} cerb_error_t;
+
+// Custom field structure
+typedef struct {
+ char key[MAX_CUSTOM_KEY_LEN];
+ char value[MAX_CUSTOM_VALUE_LEN];
+} cerb_custom_field_t;
+
+// Password entry structure
+typedef struct {
+ char id[37]; // UUID string (36 chars + null terminator)
+ char website[MAX_WEBSITE_LEN];
+ char username[MAX_USERNAME_LEN];
+ char password[MAX_PASSWORD_LEN];
+ char notes[MAX_NOTES_LEN];
+ char url[1024];
+ char tags[MAX_TAGS][MAX_TAG_LEN];
+ size_t num_tags;
+ time_t created_at;
+ time_t updated_at;
+ time_t last_used;
+ bool favorite;
+ cerb_custom_field_t custom_fields[MAX_CUSTOM_FIELDS];
+ size_t num_custom_fields;
+} cerb_entry_t;
+
+// Vault structure
+typedef struct cerb_vault_t cerb_vault_t;
+
+// Basic entry struct for FFI bindings (avoids nested arrays)
+typedef struct {
+ char id[37];
+ char website[MAX_WEBSITE_LEN];
+ char username[MAX_USERNAME_LEN];
+ char password[MAX_PASSWORD_LEN];
+ char notes[MAX_NOTES_LEN];
+ char url[1024];
+ time_t created_at;
+ time_t updated_at;
+} cerb_entry_basic_t;
+
+// Core API
+
+// Initialize the crypto subsystem
+cerb_error_t cerb_crypto_init(void);
+
+// Cleanup the crypto subsystem
+void cerb_crypto_cleanup(void);
+
+// Initialize a new vault
+cerb_error_t cerb_vault_create(const char *master_password, cerb_vault_t **vault);
+
+// Open an existing vault
+cerb_error_t cerb_vault_open(const char *master_password, const char *vault_path, cerb_vault_t **vault);
+
+// Save vault to file
+cerb_error_t cerb_vault_save(cerb_vault_t *vault, const char *vault_path);
+
+// Close and free a vault
+void cerb_vault_close(cerb_vault_t *vault);
+
+// Add a new entry to the vault
+cerb_error_t cerb_vault_add_entry(cerb_vault_t *vault, const cerb_entry_t *entry);
+cerb_error_t cerb_vault_add_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry);
+
+// Update an existing entry
+cerb_error_t cerb_vault_update_entry(cerb_vault_t *vault, const cerb_entry_t *entry);
+cerb_error_t cerb_vault_update_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry);
+
+// Delete an entry by ID
+cerb_error_t cerb_vault_delete_entry(cerb_vault_t *vault, const char *entry_id);
+
+// Get an entry by ID
+cerb_error_t cerb_vault_get_entry(cerb_vault_t *vault, const char *entry_id, cerb_entry_t *entry);
+cerb_error_t cerb_vault_get_entry_basic(cerb_vault_t *vault, const char *entry_id, cerb_entry_basic_t *entry);
+
+// Get all entries
+cerb_error_t cerb_vault_get_entries(cerb_vault_t *vault, cerb_entry_t **entries, size_t *count);
+cerb_error_t cerb_vault_get_entries_basic(cerb_vault_t *vault, cerb_entry_basic_t **entries, size_t *count);
+
+// Search entries by query string
+cerb_error_t cerb_vault_search(cerb_vault_t *vault, const char *query, cerb_entry_t **results, size_t *count);
+cerb_error_t cerb_vault_search_basic(cerb_vault_t *vault, const char *query, cerb_entry_basic_t **results, size_t *count);
+
+// Generate a secure random password
+cerb_error_t cerb_generate_password(
+ uint32_t length,
+ bool use_upper,
+ bool use_lower,
+ bool use_digits,
+ bool use_special,
+ char *buffer,
+ size_t buffer_size
+);
+
+// Import from other password managers
+cerb_error_t cerb_import_bitwarden_json(cerb_vault_t *vault, const char *json_path);
+cerb_error_t cerb_import_lastpass_csv(cerb_vault_t *vault, const char *csv_path);
+cerb_error_t cerb_import_chrome_csv(cerb_vault_t *vault, const char *csv_path);
+
+// Export to various formats
+cerb_error_t cerb_export_json(cerb_vault_t *vault, const char *json_path);
+cerb_error_t cerb_export_csv(cerb_vault_t *vault, const char *csv_path);
+
+// Utility functions
+void cerb_generate_uuid(char *uuid);
+time_t cerb_current_timestamp(void);
+
+#endif // CERBERUS_CORE_H
diff --git a/src/cerberus/core/cerberus.so b/src/cerberus/core/cerberus.so
new file mode 100755
index 0000000..2cdd65a
--- /dev/null
+++ b/src/cerberus/core/cerberus.so
Binary files differ
diff --git a/src/cerberus/core/models.py b/src/cerberus/core/models.py
new file mode 100644
index 0000000..ecdcb3c
--- /dev/null
+++ b/src/cerberus/core/models.py
@@ -0,0 +1,58 @@
+from dataclasses import dataclass, field
+from datetime import datetime
+from typing import Optional, List, Dict, Any
+import json
+
+@dataclass
+class PasswordEntry:
+ """Represents a single password entry in the password manager."""
+ id: str
+ website: str
+ username: str
+ password: str
+ url: str = ""
+ notes: str = ""
+ tags: List[str] = field(default_factory=list)
+ created_at: datetime = field(default_factory=datetime.utcnow)
+ updated_at: datetime = field(default_factory=datetime.utcnow)
+ last_used: Optional[datetime] = None
+ password_strength: Optional[float] = None
+ compromised: bool = False
+ custom_fields: Dict[str, Any] = field(default_factory=dict)
+
+ def to_dict(self) -> Dict[str, Any]:
+ """Convert the password entry to a dictionary for serialization."""
+ return {
+ 'id': self.id,
+ 'website': self.website,
+ 'username': self.username,
+ 'password': self.password,
+ 'url': self.url,
+ 'notes': self.notes,
+ 'tags': self.tags,
+ 'created_at': self.created_at.isoformat(),
+ 'updated_at': self.updated_at.isoformat(),
+ 'last_used': self.last_used.isoformat() if self.last_used else None,
+ 'password_strength': self.password_strength,
+ 'compromised': self.compromised,
+ 'custom_fields': self.custom_fields
+ }
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'PasswordEntry':
+ """Create a PasswordEntry from a dictionary."""
+ return cls(
+ id=data.get('id', ''),
+ website=data['website'],
+ username=data['username'],
+ password=data['password'],
+ url=data.get('url', ''),
+ notes=data.get('notes', ''),
+ tags=data.get('tags', []),
+ created_at=datetime.fromisoformat(data['created_at']),
+ updated_at=datetime.fromisoformat(data['updated_at']),
+ last_used=datetime.fromisoformat(data['last_used']) if data.get('last_used') else None,
+ password_strength=data.get('password_strength'),
+ compromised=data.get('compromised', False),
+ custom_fields=data.get('custom_fields', {})
+ )
diff --git a/src/cerberus/core/password_manager.py b/src/cerberus/core/password_manager.py
new file mode 100644
index 0000000..018c727
--- /dev/null
+++ b/src/cerberus/core/password_manager.py
@@ -0,0 +1,462 @@
+import os
+import json
+import logging
+from pathlib import Path
+from typing import Dict, List, Optional, Union, Any
+from datetime import datetime
+
+logger = logging.getLogger(__name__)
+
+# Import the C core from our package
+try:
+ from . import (
+ cerb_crypto_init, cerb_crypto_cleanup,
+ cerb_vault_create, cerb_vault_open, cerb_vault_save, cerb_vault_close,
+ cerb_vault_add_entry_basic, cerb_vault_update_entry_basic, cerb_vault_delete_entry,
+ cerb_vault_get_entry_basic, cerb_vault_get_entries_basic, cerb_vault_search_basic,
+ cerb_generate_password, cerb_generate_uuid, cerb_current_timestamp,
+ CERB_OK, CERB_ERROR, ffi
+ )
+ CORE_AVAILABLE = True
+except (ImportError, OSError) as e:
+ CORE_AVAILABLE = False
+ logger.warning("Cerberus C core not available. Using Python fallback.")
+
+from .models import PasswordEntry
+from ..integrations import get_integration, IntegrationError
+
+class VaultError(Exception):
+ """Base exception for vault-related errors."""
+ pass
+
+class CoreNotAvailableError(VaultError):
+ """Raised when the C core is not available."""
+ pass
+
+class PasswordManager:
+ """Core password manager with C-based encryption and integration support."""
+
+ def __init__(self, data_dir: str = None, master_password: str = None):
+ """Initialize the password manager.
+
+ Args:
+ data_dir: Directory to store password data
+ master_password: Master password for encryption
+ """
+ if not CORE_AVAILABLE:
+ raise CoreNotAvailableError(
+ "Cerberus C core not available. Please compile it first."
+ )
+
+ self.data_dir = Path(data_dir or os.path.expanduser("~/.cerberus_pm"))
+ self.data_dir.mkdir(parents=True, exist_ok=True)
+
+ self.master_password = master_password
+ self.vault_file = self.data_dir / "vault.cerb"
+ self._vault = ffi.NULL
+
+ # Initialize crypto
+ if cerb_crypto_init() != CERB_OK:
+ raise VaultError("Failed to initialize crypto")
+
+ # Create or load vault if master password is provided
+ if master_password:
+ if self.vault_file.exists():
+ self._open_vault()
+ else:
+ self._create_vault()
+
+ def _create_vault(self):
+ """Create a new vault."""
+ if not self.master_password:
+ raise VaultError("Master password is required to create a vault")
+
+ vault_ptr = ffi.new("cerb_vault_t**")
+ result = cerb_vault_create(
+ self.master_password.encode('utf-8'),
+ vault_ptr
+ )
+
+ if result != CERB_OK:
+ raise VaultError(f"Failed to create vault: error code {result}")
+
+ self._vault = vault_ptr[0]
+ logger.info("Created new vault")
+
+ def _open_vault(self):
+ """Open an existing vault."""
+ if not self.vault_file.exists():
+ raise VaultError(f"Vault file not found: {self.vault_file}")
+
+ if not self.master_password:
+ raise VaultError("Master password is required to open the vault")
+
+ # Open existing vault from file using C core
+ vault_ptr = ffi.new("cerb_vault_t**")
+ result = cerb_vault_open(
+ self.master_password.encode('utf-8'),
+ str(self.vault_file).encode('utf-8'),
+ vault_ptr
+ )
+ if result != CERB_OK:
+ raise VaultError(f"Failed to open vault: error code {result}")
+ self._vault = vault_ptr[0]
+
+ def save_vault(self):
+ """Save the vault to disk."""
+ if self._vault == ffi.NULL:
+ raise VaultError("No vault is open")
+
+ result = cerb_vault_save(self._vault, str(self.vault_file).encode('utf-8'))
+ if result != CERB_OK:
+ raise VaultError(f"Failed to save vault: error code {result}")
+
+ def initialize(self, master_password: str) -> bool:
+ """Initialize the password manager with a master password.
+
+ Args:
+ master_password: The master password
+
+ Returns:
+ bool: True if initialization was successful
+ """
+ self.master_password = master_password
+ self._create_vault()
+ return True
+
+ def unlock(self, master_password: str) -> bool:
+ """Unlock the password manager with the master password.
+
+ Args:
+ master_password: The master password
+
+ Returns:
+ bool: True if unlock was successful
+ """
+ try:
+ self.master_password = master_password
+ self._open_vault()
+ return True
+ except VaultError as e:
+ logger.error(f"Failed to unlock password manager: {e}")
+ return False
+
+ def _find_entries(self, query: str = None) -> List[PasswordEntry]:
+ """Find entries matching the query.
+
+ Args:
+ query: Search query (e.g., 'website:example.com', 'tag:work')
+
+ Returns:
+ List of matching PasswordEntry objects
+ """
+ if self._vault == ffi.NULL:
+ raise VaultError("No vault is open")
+
+ entries_ptr = ffi.new("cerb_entry_basic_t**")
+ count_ptr = ffi.new("size_t*")
+ if query:
+ result = cerb_vault_search_basic(self._vault, query.encode('utf-8'), entries_ptr, count_ptr)
+ else:
+ result = cerb_vault_get_entries_basic(self._vault, entries_ptr, count_ptr)
+ if result != CERB_OK:
+ raise VaultError(f"Search failed: error code {result}")
+
+ results: List[PasswordEntry] = []
+ count = int(count_ptr[0])
+ if count == 0:
+ return results
+
+ entries_array = entries_ptr[0]
+ for i in range(count):
+ c_entry = entries_array[i]
+ results.append(PasswordEntry(
+ id=ffi.string(c_entry.id).decode('utf-8'),
+ website=ffi.string(c_entry.website).decode('utf-8'),
+ username=ffi.string(c_entry.username).decode('utf-8'),
+ password=ffi.string(c_entry.password).decode('utf-8'),
+ url=ffi.string(c_entry.url).decode('utf-8'),
+ notes=ffi.string(c_entry.notes).decode('utf-8'),
+ ))
+
+ # Caller-owned memory: free via C if a free function exists; else rely on C API contract
+ return results
+
+ def add_password(self, entry: PasswordEntry) -> str:
+ """Add a new password entry to the vault.
+
+ Args:
+ entry: The password entry to add
+
+ Returns:
+ str: The ID of the new entry
+
+ Raises:
+ VaultError: If the vault is not open or an error occurs
+ """
+ if self._vault == ffi.NULL:
+ raise VaultError("No vault is open")
+
+ # Create a new C entry
+ c_entry = ffi.new("cerb_entry_basic_t*")
+
+ # Generate a new UUID if not provided
+ if entry.id:
+ entry_id = entry.id
+ else:
+ uuid_buf = ffi.new("char[37]")
+ cerb_generate_uuid(uuid_buf)
+ entry_id = ffi.string(uuid_buf).decode('utf-8')
+
+ # Set fields
+ ffi.memmove(c_entry.id, entry_id.encode('utf-8'), len(entry_id))
+ c_entry.id[len(entry_id)] = b'\0'
+ for field_name in ["website", "username", "password", "notes", "url"]:
+ val = (getattr(entry, field_name) or '').encode('utf-8')
+ buf = getattr(c_entry, field_name)
+ ffi.memmove(buf, val, len(val))
+ buf[len(val)] = b'\0'
+ c_entry.created_at = int(entry.created_at.timestamp()) if entry.created_at else cerb_current_timestamp()
+ c_entry.updated_at = int(entry.updated_at.timestamp()) if entry.updated_at else c_entry.created_at
+
+ # Add to vault
+ result = cerb_vault_add_entry_basic(self._vault, c_entry)
+ if result != CERB_OK:
+ raise VaultError(f"Failed to add entry: error code {result}")
+
+ # Save the vault
+ self.save_vault()
+
+ return entry_id
+
+ def update_password(self, entry_id: str, **updates) -> bool:
+ """Update an existing password entry.
+
+ Args:
+ entry_id: The ID of the entry to update
+ **updates: Fields to update
+
+ Returns:
+ bool: True if the update was successful
+
+ Raises:
+ VaultError: If the vault is not open or an error occurs
+ ValueError: If the entry is not found
+ """
+ if self._vault == ffi.NULL:
+ raise VaultError("No vault is open")
+
+ # Fetch, modify, send to C update
+ c_existing = ffi.new("cerb_entry_basic_t*")
+ res = cerb_vault_get_entry_basic(self._vault, entry_id.encode('utf-8'), c_existing)
+ if res != CERB_OK:
+ raise ValueError(f"Entry with ID {entry_id} not found")
+
+ # Apply updates
+ def _set_field(dst, value: str, max_len: int):
+ data = (value or '').encode('utf-8')
+ ln = min(len(data), max_len - 1)
+ ffi.memmove(dst, data, ln)
+ dst[ln] = b'\0'
+
+ for key, value in updates.items():
+ if key == 'website': _set_field(c_existing.website, value, 256)
+ elif key == 'username': _set_field(c_existing.username, value, 256)
+ elif key == 'password': _set_field(c_existing.password, value, 1024)
+ elif key == 'notes': _set_field(c_existing.notes, value, 4096)
+ elif key == 'url': _set_field(c_existing.url, value, 1024)
+
+ c_existing.updated_at = int(datetime.utcnow().timestamp())
+
+ res = cerb_vault_update_entry_basic(self._vault, c_existing)
+ if res != CERB_OK:
+ raise VaultError(f"Failed to update entry: error code {res}")
+ self.save_vault()
+ return True
+
+ def delete_password(self, entry_id: str) -> bool:
+ """Delete a password entry.
+
+ Args:
+ entry_id: The ID of the entry to delete
+
+ Returns:
+ bool: True if the deletion was successful
+
+ Raises:
+ VaultError: If the vault is not open or an error occurs
+ """
+ if self._vault == ffi.NULL:
+ raise VaultError("No vault is open")
+
+ result = cerb_vault_delete_entry(self._vault, entry_id.encode('utf-8'))
+ if result != CERB_OK:
+ raise VaultError(f"Failed to delete entry: error code {result}")
+
+ # Save the vault
+ self.save_vault()
+ return True
+
+ def get_password(self, entry_id: str) -> Optional[PasswordEntry]:
+ """Get a password entry by ID.
+
+ Args:
+ entry_id: The ID of the entry to retrieve
+
+ Returns:
+ PasswordEntry if found, None otherwise
+ """
+ c_entry = ffi.new("cerb_entry_basic_t*")
+ res = cerb_vault_get_entry_basic(self._vault, entry_id.encode('utf-8'), c_entry)
+ if res != CERB_OK:
+ return None
+ return PasswordEntry(
+ id=ffi.string(c_entry.id).decode('utf-8'),
+ website=ffi.string(c_entry.website).decode('utf-8'),
+ username=ffi.string(c_entry.username).decode('utf-8'),
+ password=ffi.string(c_entry.password).decode('utf-8'),
+ url=ffi.string(c_entry.url).decode('utf-8'),
+ notes=ffi.string(c_entry.notes).decode('utf-8'),
+ )
+
+ def list_passwords(self) -> List[PasswordEntry]:
+ """List all password entries.
+
+ Returns:
+ List of all PasswordEntry objects
+ """
+ return self._find_entries()
+
+ def search_passwords(self, query: str) -> List[PasswordEntry]:
+ """Search password entries by website, username, or tags.
+
+ Args:
+ query: Search query
+
+ Returns:
+ List of matching PasswordEntry objects
+ """
+ return self._find_entries(query)
+
+ @staticmethod
+ def generate_password(
+ length: int = 16,
+ use_upper: bool = True,
+ use_lower: bool = True,
+ use_digits: bool = True,
+ use_special: bool = True
+ ) -> str:
+ """Generate a secure random password using the C core.
+
+ Args:
+ length: Length of the password (8-1024)
+ use_upper: Include uppercase letters
+ use_lower: Include lowercase letters
+ use_digits: Include digits
+ use_special: Include special characters
+
+ Returns:
+ The generated password
+
+ Raises:
+ VaultError: If password generation fails
+ """
+ if not CORE_AVAILABLE:
+ raise CoreNotAvailableError("C core not available for password generation")
+
+ # Validate length
+ if length < 8 or length > 1024:
+ raise ValueError("Password length must be between 8 and 1024 characters")
+
+ # At least one character set must be selected
+ if not (use_upper or use_lower or use_digits or use_special):
+ raise ValueError("At least one character set must be selected")
+
+ # Allocate buffer for the password (+1 for null terminator)
+ buffer = ffi.new(f"char[{length + 1}]") # +1 for null terminator
+
+ # Generate the password
+ result = cerb_generate_password(
+ length,
+ use_upper,
+ use_lower,
+ use_digits,
+ use_special,
+ buffer,
+ length + 1 # Include space for null terminator
+ )
+
+ if result != CERB_OK:
+ raise VaultError(f"Failed to generate password: error code {result}")
+
+ # Convert from C string to Python string
+ password = ffi.string(buffer).decode('utf-8')
+ return password
+
+ # ---- Convenience methods for CLI/TUI compatibility ----
+ def generate_id(self) -> str:
+ """Generate a UUID string using the C core."""
+ if not CORE_AVAILABLE:
+ raise CoreNotAvailableError("C core not available for ID generation")
+ uuid_buf = ffi.new("char[37]")
+ cerb_generate_uuid(uuid_buf)
+ return ffi.string(uuid_buf).decode("utf-8")
+
+ def get_entries(self) -> List[PasswordEntry]:
+ """Alias for list_passwords()."""
+ return self.list_passwords()
+
+ def add_entry(self, entry: PasswordEntry) -> str:
+ """Alias for add_password(entry)."""
+ return self.add_password(entry)
+
+ def update_entry(self, entry: PasswordEntry) -> bool:
+ """Update an existing entry using the entry object."""
+ fields = {
+ "website": entry.website,
+ "username": entry.username,
+ "password": entry.password,
+ "notes": entry.notes,
+ "url": entry.url,
+ }
+ return self.update_password(entry.id, **fields)
+
+ def delete_entry(self, entry_id: str) -> bool:
+ """Alias for delete_password(entry_id)."""
+ return self.delete_password(entry_id)
+
+ def get_entry(self, identifier: str) -> PasswordEntry:
+ """Get an entry by ID, or by exact website match as a fallback."""
+ entry = self.get_password(identifier)
+ if entry is not None:
+ return entry
+ # Fallback: search by exact website name
+ matches = [e for e in self.list_passwords() if (e.website or "").lower() == (identifier or "").lower()]
+ if not matches:
+ raise ValueError(f"Entry not found: {identifier}")
+ if len(matches) > 1:
+ raise ValueError(f"Multiple entries found for website '{identifier}'. Use the entry ID instead.")
+ return matches[0]
+
+ def generate_password_easy(
+ self,
+ length: int = 16,
+ special: bool = True,
+ upper: Optional[bool] = None,
+ lower: Optional[bool] = None,
+ digits: Optional[bool] = None,
+ ) -> str:
+ """Friendly wrapper for password generation used by new CLI/TUI.
+
+ Args:
+ length: desired length
+ special: include special characters (maps to use_special)
+ upper, lower, digits: if None use defaults (True). If bool provided, override.
+ """
+ return PasswordManager.generate_password(
+ length=length,
+ use_upper=True if upper is None else upper,
+ use_lower=True if lower is None else lower,
+ use_digits=True if digits is None else digits,
+ use_special=special,
+ )
diff --git a/src/cerberus/gui/__init__.py b/src/cerberus/gui/__init__.py
new file mode 100644
index 0000000..5cc77e9
--- /dev/null
+++ b/src/cerberus/gui/__init__.py
@@ -0,0 +1,6 @@
+"""
+Graphical User Interface (GUI) for Cerberus Password Manager.
+"""
+from .main_window import run_app
+
+__all__ = ["run_app"]
diff --git a/src/cerberus/gui/main_window.py b/src/cerberus/gui/main_window.py
new file mode 100644
index 0000000..a7cac07
--- /dev/null
+++ b/src/cerberus/gui/main_window.py
@@ -0,0 +1,269 @@
+"""
+PyQt6-based GUI for Cerberus Password Manager.
+"""
+from __future__ import annotations
+
+import sys
+from dataclasses import asdict
+from typing import Optional, List
+
+from PyQt6.QtCore import Qt
+from PyQt6.QtWidgets import (
+ QApplication,
+ QMainWindow,
+ QWidget,
+ QVBoxLayout,
+ QHBoxLayout,
+ QListWidget,
+ QListWidgetItem,
+ QPushButton,
+ QLabel,
+ QLineEdit,
+ QTextEdit,
+ QMessageBox,
+ QInputDialog,
+ QFileDialog,
+)
+
+from ..core.password_manager import PasswordManager, VaultError
+from ..core.models import PasswordEntry
+
+
+class MainWindow(QMainWindow):
+ def __init__(self, pm: PasswordManager):
+ super().__init__()
+ self.pm = pm
+ self.setWindowTitle("Cerberus Password Manager")
+ self.resize(900, 600)
+
+ # Root container
+ root = QWidget()
+ layout = QHBoxLayout()
+ root.setLayout(layout)
+ self.setCentralWidget(root)
+
+ # Left: list
+ left = QWidget()
+ left_layout = QVBoxLayout()
+ left.setLayout(left_layout)
+ layout.addWidget(left, 1)
+
+ self.search_input = QLineEdit()
+ self.search_input.setPlaceholderText("Search website, username, tags...")
+ self.search_input.textChanged.connect(self.refresh_list)
+ left_layout.addWidget(self.search_input)
+
+ self.list_widget = QListWidget()
+ self.list_widget.itemSelectionChanged.connect(self.on_selection_changed)
+ left_layout.addWidget(self.list_widget, 1)
+
+ btn_bar = QHBoxLayout()
+ self.btn_add = QPushButton("Add")
+ self.btn_edit = QPushButton("Edit")
+ self.btn_rotate = QPushButton("Rotate")
+ self.btn_delete = QPushButton("Delete")
+ self.btn_export = QPushButton("Export")
+ self.btn_import = QPushButton("Import")
+ btn_bar.addWidget(self.btn_add)
+ btn_bar.addWidget(self.btn_edit)
+ btn_bar.addWidget(self.btn_rotate)
+ btn_bar.addWidget(self.btn_delete)
+ btn_bar.addWidget(self.btn_export)
+ btn_bar.addWidget(self.btn_import)
+ left_layout.addLayout(btn_bar)
+
+ # Right: detail
+ right = QWidget()
+ right_layout = QVBoxLayout()
+ right.setLayout(right_layout)
+ layout.addWidget(right, 2)
+
+ self.lbl_website = QLineEdit()
+ self.lbl_username = QLineEdit()
+ self.lbl_password = QLineEdit()
+ self.lbl_password.setEchoMode(QLineEdit.EchoMode.Password)
+ self.lbl_url = QLineEdit()
+ self.txt_notes = QTextEdit()
+
+ right_layout.addWidget(QLabel("Website"))
+ right_layout.addWidget(self.lbl_website)
+ right_layout.addWidget(QLabel("Username"))
+ right_layout.addWidget(self.lbl_username)
+ right_layout.addWidget(QLabel("Password"))
+ right_layout.addWidget(self.lbl_password)
+ right_layout.addWidget(QLabel("URL"))
+ right_layout.addWidget(self.lbl_url)
+ right_layout.addWidget(QLabel("Notes"))
+ right_layout.addWidget(self.txt_notes, 1)
+
+ act_bar = QHBoxLayout()
+ self.btn_save = QPushButton("Save")
+ self.btn_copy_user = QPushButton("Copy User")
+ self.btn_copy_pass = QPushButton("Copy Pass")
+ act_bar.addWidget(self.btn_save)
+ act_bar.addWidget(self.btn_copy_user)
+ act_bar.addWidget(self.btn_copy_pass)
+ right_layout.addLayout(act_bar)
+
+ # Wire actions
+ self.btn_add.clicked.connect(self.on_add)
+ self.btn_edit.clicked.connect(self.on_edit)
+ self.btn_rotate.clicked.connect(self.on_rotate)
+ self.btn_delete.clicked.connect(self.on_delete)
+ self.btn_export.clicked.connect(self.on_export)
+ self.btn_import.clicked.connect(self.on_import)
+ self.btn_save.clicked.connect(self.on_save)
+ self.btn_copy_user.clicked.connect(self.on_copy_user)
+ self.btn_copy_pass.clicked.connect(self.on_copy_pass)
+
+ # Data
+ self.entries: List[PasswordEntry] = []
+ self.current: Optional[PasswordEntry] = None
+ self.refresh_list()
+
+ def refresh_list(self) -> None:
+ query = self.search_input.text().lower().strip()
+ try:
+ self.entries = self.pm.get_entries()
+ except Exception as e:
+ QMessageBox.critical(self, "Error", f"Failed to load entries: {e}")
+ self.entries = []
+ self.list_widget.clear()
+ for e in self.entries:
+ if query and not (
+ query in (e.website or '').lower()
+ or query in (e.username or '').lower()
+ or query in (e.notes or '').lower()
+ or any(query in t.lower() for t in (e.tags or []))
+ ):
+ continue
+ item = QListWidgetItem(f"{e.website} — {e.username}")
+ item.setData(Qt.ItemDataRole.UserRole, e.id)
+ self.list_widget.addItem(item)
+
+ def on_selection_changed(self) -> None:
+ item = self.list_widget.currentItem()
+ if not item:
+ self.current = None
+ return
+ entry_id = item.data(Qt.ItemDataRole.UserRole)
+ try:
+ # get_entry may accept id or website; ensure id fetch
+ entry = self.pm.get_entry(entry_id)
+ except Exception:
+ # fallback: find in self.entries
+ entry = next((x for x in self.entries if x.id == entry_id), None)
+ self.current = entry
+ if entry:
+ self.lbl_website.setText(entry.website)
+ self.lbl_username.setText(entry.username)
+ self.lbl_password.setText(entry.password)
+ self.lbl_url.setText(entry.url)
+ self.txt_notes.setText(entry.notes)
+
+ def on_add(self) -> None:
+ website, ok = QInputDialog.getText(self, "Add Entry", "Website:")
+ if not ok or not website:
+ return
+ username, ok = QInputDialog.getText(self, "Add Entry", "Username:")
+ if not ok or not username:
+ return
+ # Generate or prompt for password
+ choice = QMessageBox.question(self, "Password", "Generate a strong password?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No)
+ if choice == QMessageBox.StandardButton.Yes:
+ password = self.pm.generate_password()
+ else:
+ password, ok = QInputDialog.getText(self, "Add Entry", "Password:")
+ if not ok or not password:
+ return
+ try:
+ entry = PasswordEntry(
+ id=self.pm.generate_id(),
+ website=website,
+ username=username,
+ password=password,
+ )
+ self.pm.add_entry(entry)
+ self.refresh_list()
+ except Exception as e:
+ QMessageBox.critical(self, "Error", f"Failed to add entry: {e}")
+
+ def on_edit(self) -> None:
+ if not self.current:
+ return
+ self.lbl_website.setFocus()
+
+ def on_save(self) -> None:
+ if not self.current:
+ return
+ try:
+ self.current.website = self.lbl_website.text()
+ self.current.username = self.lbl_username.text()
+ self.current.password = self.lbl_password.text()
+ self.current.url = self.lbl_url.text()
+ self.current.notes = self.txt_notes.toPlainText()
+ self.pm.update_entry(self.current)
+ self.refresh_list()
+ except Exception as e:
+ QMessageBox.critical(self, "Error", f"Failed to save entry: {e}")
+
+ def on_rotate(self) -> None:
+ if not self.current:
+ return
+ try:
+ new_password = self.pm.generate_password(length=24)
+ self.current.password = new_password
+ self.pm.update_entry(self.current)
+ self.lbl_password.setText(new_password)
+ QMessageBox.information(self, "Rotated", "Password rotated and saved.")
+ except Exception as e:
+ QMessageBox.critical(self, "Error", f"Failed to rotate: {e}")
+
+ def on_delete(self) -> None:
+ if not self.current:
+ return
+ if QMessageBox.question(self, "Delete", f"Delete entry for {self.current.website}?") == QMessageBox.StandardButton.Yes:
+ try:
+ self.pm.delete_entry(self.current.id)
+ self.current = None
+ self.refresh_list()
+ except Exception as e:
+ QMessageBox.critical(self, "Error", f"Failed to delete: {e}")
+
+ def on_copy_user(self) -> None:
+ if not self.current:
+ return
+ try:
+ import pyperclip
+ pyperclip.copy(self.current.username)
+ QMessageBox.information(self, "Copied", "Username copied to clipboard.")
+ except Exception as e:
+ QMessageBox.critical(self, "Error", f"Clipboard failed: {e}")
+
+ def on_copy_pass(self) -> None:
+ if not self.current:
+ return
+ try:
+ import pyperclip
+ pyperclip.copy(self.current.password)
+ QMessageBox.information(self, "Copied", "Password copied to clipboard.")
+ except Exception as e:
+ QMessageBox.critical(self, "Error", f"Clipboard failed: {e}")
+
+
+def run_app() -> None:
+ # Prompt for master password
+ app = QApplication(sys.argv)
+ from PyQt6.QtWidgets import QInputDialog
+
+ pw, ok = QInputDialog.getText(None, "Cerberus", "Master password:")
+ if not ok or not pw:
+ return
+ try:
+ pm = PasswordManager(master_password=pw)
+ except VaultError as e:
+ QMessageBox.critical(None, "Error", f"Failed to unlock vault: {e}")
+ return
+ win = MainWindow(pm)
+ win.show()
+ sys.exit(app.exec())
diff --git a/src/cerberus/integrations/__init__.py b/src/cerberus/integrations/__init__.py
new file mode 100644
index 0000000..2d81122
--- /dev/null
+++ b/src/cerberus/integrations/__init__.py
@@ -0,0 +1,111 @@
+"""Password manager integrations for Cerberus."""
+
+from typing import Dict, Type, List, Optional, Any
+from pathlib import Path
+import importlib
+import json
+
+from ..core.models import PasswordEntry
+
+class IntegrationError(Exception):
+ """Base exception for integration errors."""
+ pass
+
+class BaseIntegration:
+ """Base class for password manager integrations."""
+
+ def __init__(self, **kwargs):
+ """Initialize the integration with any required parameters."""
+ self.connected = False
+
+ def connect(self, **kwargs) -> bool:
+ """Connect to the password manager.
+
+ Returns:
+ bool: True if connection was successful
+ """
+ raise NotImplementedError
+
+ def disconnect(self):
+ """Disconnect from the password manager."""
+ self.connected = False
+
+ def list_entries(self) -> List[PasswordEntry]:
+ """List all password entries.
+
+ Returns:
+ List of PasswordEntry objects
+ """
+ raise NotImplementedError
+
+ def export_entries(self, output_path: Path) -> bool:
+ """Export entries to a file.
+
+ Args:
+ output_path: Path to save the exported data
+
+ Returns:
+ bool: True if export was successful
+ """
+ raise NotImplementedError
+
+ def import_entries(self, input_path: Path) -> List[PasswordEntry]:
+ """Import entries from a file.
+
+ Args:
+ input_path: Path to the file to import from
+
+ Returns:
+ List of imported PasswordEntry objects
+ """
+ raise NotImplementedError
+
+# Dictionary of available integrations
+INTEGRATIONS: Dict[str, Type[BaseIntegration]] = {}
+
+def register_integration(name: str):
+ """Decorator to register an integration class."""
+ def decorator(cls: Type[BaseIntegration]) -> Type[BaseIntegration]:
+ INTEGRATIONS[name.lower()] = cls
+ return cls
+ return decorator
+
+def get_integration(name: str, **kwargs) -> BaseIntegration:
+ """Get an instance of the specified integration.
+
+ Args:
+ name: Name of the integration
+ **kwargs: Additional arguments to pass to the integration
+
+ Returns:
+ An instance of the specified integration
+
+ Raises:
+ IntegrationError: If the integration is not found
+ """
+ name = name.lower()
+ if name not in INTEGRATIONS:
+ raise IntegrationError(f"Integration '{name}' not found")
+
+ return INTEGRATIONS[name](**kwargs)
+
+def list_available_integrations() -> List[str]:
+ """List all available integrations.
+
+ Returns:
+ List of integration names
+ """
+ return list(INTEGRATIONS.keys())
+
+# Import all integration modules to register them
+# This will be populated by the individual integration modules
+# that use the @register_integration decorator
+
+try:
+ from . import bitwarden # noqa
+ from . import lastpass # noqa
+ from . import keepass # noqa
+ from . import chrome # noqa
+except ImportError as e:
+ # Some integrations may have additional dependencies
+ pass
diff --git a/src/cerberus/integrations/__pycache__/__init__.cpython-313.pyc b/src/cerberus/integrations/__pycache__/__init__.cpython-313.pyc
new file mode 100644
index 0000000..fbceef5
--- /dev/null
+++ b/src/cerberus/integrations/__pycache__/__init__.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/integrations/__pycache__/bitwarden.cpython-313.pyc b/src/cerberus/integrations/__pycache__/bitwarden.cpython-313.pyc
new file mode 100644
index 0000000..cb25ecf
--- /dev/null
+++ b/src/cerberus/integrations/__pycache__/bitwarden.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/integrations/__pycache__/chrome.cpython-313.pyc b/src/cerberus/integrations/__pycache__/chrome.cpython-313.pyc
new file mode 100644
index 0000000..02a7447
--- /dev/null
+++ b/src/cerberus/integrations/__pycache__/chrome.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/integrations/__pycache__/keepass.cpython-313.pyc b/src/cerberus/integrations/__pycache__/keepass.cpython-313.pyc
new file mode 100644
index 0000000..ca99f53
--- /dev/null
+++ b/src/cerberus/integrations/__pycache__/keepass.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/integrations/__pycache__/lastpass.cpython-313.pyc b/src/cerberus/integrations/__pycache__/lastpass.cpython-313.pyc
new file mode 100644
index 0000000..fac83f5
--- /dev/null
+++ b/src/cerberus/integrations/__pycache__/lastpass.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/integrations/bitwarden.py b/src/cerberus/integrations/bitwarden.py
new file mode 100644
index 0000000..7a4cd73
--- /dev/null
+++ b/src/cerberus/integrations/bitwarden.py
@@ -0,0 +1,268 @@
+import subprocess
+import json
+import logging
+from typing import List, Dict, Optional, Any
+from pathlib import Path
+import os
+
+from ..core.models import PasswordEntry
+
+logger = logging.getLogger(__name__)
+
+class BitwardenCLIError(Exception):
+ """Exception raised for errors in the Bitwarden CLI."""
+ pass
+
+class BitwardenIntegration:
+ """Integration with Bitwarden password manager."""
+
+ def __init__(self, email: str = None, password: str = None, session: str = None):
+ """Initialize the Bitwarden integration.
+
+ Args:
+ email: Bitwarden account email
+ password: Bitwarden master password
+ session: Existing Bitwarden session key
+ """
+ self.email = email
+ self.password = password
+ self.session = session
+ self.bw_path = self._find_bw()
+
+ def _run_command(self, command: List[str], input_data: str = None) -> Dict:
+ """Run a Bitwarden CLI command and return the result."""
+ try:
+ env = os.environ.copy()
+ if self.session:
+ env['BW_SESSION'] = self.session
+
+ result = subprocess.run(
+ [self.bw_path] + command,
+ input=input_data.encode() if input_data else None,
+ capture_output=True,
+ check=True,
+ env=env
+ )
+
+ if result.stdout:
+ try:
+ return json.loads(result.stdout)
+ except json.JSONDecodeError:
+ return result.stdout.decode().strip()
+ return {}
+
+ except subprocess.CalledProcessError as e:
+ error_msg = e.stderr.decode().strip() if e.stderr else str(e)
+ logger.error(f"Bitwarden CLI error: {error_msg}")
+ raise BitwardenCLIError(f"Bitwarden command failed: {error_msg}")
+
+ @staticmethod
+ def _find_bw() -> str:
+ """Find the Bitwarden CLI executable."""
+ # Check common locations
+ possible_paths = [
+ '/usr/local/bin/bw',
+ '/usr/bin/bw',
+ 'bw' # Try PATH
+ ]
+
+ for path in possible_paths:
+ try:
+ result = subprocess.run(
+ [path, '--version'],
+ capture_output=True,
+ text=True
+ )
+ if result.returncode == 0:
+ logger.info(f"Found Bitwarden CLI at {path}")
+ return path
+ except (FileNotFoundError, subprocess.CalledProcessError):
+ continue
+
+ raise BitwardenCLIError(
+ "Bitwarden CLI not found. Please install it from "
+ "https://bitwarden.com/help/cli/"
+ )
+
+ def login(self) -> bool:
+ """Log in to Bitwarden and get a session key."""
+ if not self.email or not self.password:
+ raise BitwardenCLIError("Email and password are required for login")
+
+ try:
+ # Log in and get the session key
+ result = subprocess.run(
+ [self.bw_path, 'login', self.email, self.password, '--raw'],
+ capture_output=True,
+ check=True,
+ text=True
+ )
+
+ self.session = result.stdout.strip()
+ return True
+
+ except subprocess.CalledProcessError as e:
+ error_msg = e.stderr.decode().strip() if e.stderr else str(e)
+ logger.error(f"Bitwarden login failed: {error_msg}")
+ return False
+
+ def logout(self) -> bool:
+ """Log out of Bitwarden."""
+ try:
+ self._run_command(['logout'])
+ self.session = None
+ return True
+ except BitwardenCLIError:
+ return False
+
+ def sync(self) -> bool:
+ """Sync with Bitwarden server."""
+ try:
+ self._run_command(['sync'])
+ return True
+ except BitwardenCLIError:
+ return False
+
+ def export_vault(self, output_file: str, format: str = 'encrypted_json') -> bool:
+ """Export the Bitwarden vault.
+
+ Args:
+ output_file: Path to save the exported file
+ format: Export format ('encrypted_json', 'json', 'csv', 'encrypted_json')
+
+ Returns:
+ bool: True if export was successful
+ """
+ try:
+ result = self._run_command(['export', '--format', format, '--output', output_file])
+ return True
+ except BitwardenCLIError:
+ return False
+
+ def get_items(self, search: str = None) -> List[Dict]:
+ """Get items from the vault, optionally filtered by search term."""
+ try:
+ if search:
+ return self._run_command(['list', 'items', '--search', search])
+ return self._run_command(['list', 'items'])
+ except BitwardenCLIError:
+ return []
+
+ def get_item(self, item_id: str) -> Optional[Dict]:
+ """Get a specific item by ID."""
+ try:
+ return self._run_command(['get', 'item', item_id])
+ except BitwardenCLIError:
+ return None
+
+ def create_item(self, item_data: Dict) -> Optional[Dict]:
+ """Create a new item in the vault."""
+ try:
+ return self._run_command(
+ ['create', 'item'],
+ input_data=json.dumps(item_data)
+ )
+ except BitwardenCLIError:
+ return None
+
+ def update_item(self, item_id: str, item_data: Dict) -> Optional[Dict]:
+ """Update an existing item."""
+ try:
+ return self._run_command(
+ ['edit', 'item', item_id],
+ input_data=json.dumps(item_data)
+ )
+ except BitwardenCLIError:
+ return None
+
+ def delete_item(self, item_id: str) -> bool:
+ """Delete an item from the vault."""
+ try:
+ self._run_command(['delete', 'item', item_id])
+ return True
+ except BitwardenCLIError:
+ return False
+
+ def import_from_bitwarden(self) -> List[PasswordEntry]:
+ """Import passwords from Bitwarden to Cerberus format."""
+ try:
+ items = self.get_items()
+ entries = []
+
+ for item in items:
+ try:
+ entry = PasswordEntry(
+ id=item.get('id'),
+ website=item.get('name', ''),
+ username=next(
+ (field['value'] for field in item.get('login', {}).get('uris', [{}])
+ if field.get('name', '').lower() == 'username'),
+ ''
+ ),
+ password=item.get('login', {}).get('password', ''),
+ url=next(
+ (uri.get('uri', '') for uri in item.get('login', {}).get('uris', [])
+ if uri.get('uri')),
+ ''
+ ),
+ notes=item.get('notes', ''),
+ tags=item.get('collectionIds', []),
+ custom_fields={
+ 'folderId': item.get('folderId'),
+ 'organizationId': item.get('organizationId'),
+ 'favorite': item.get('favorite', False),
+ 'reprompt': item.get('reprompt', 0),
+ 'revisionDate': item.get('revisionDate')
+ }
+ )
+ entries.append(entry)
+ except Exception as e:
+ logger.error(f"Error converting Bitwarden item to PasswordEntry: {e}")
+
+ return entries
+
+ except Exception as e:
+ logger.error(f"Error importing from Bitwarden: {e}")
+ return []
+
+ def export_to_bitwarden(self, entries: List[PasswordEntry], folder_id: str = None) -> List[str]:
+ """Export passwords from Cerberus format to Bitwarden.
+
+ Args:
+ entries: List of PasswordEntry objects to export
+ folder_id: Optional Bitwarden folder ID to place items in
+
+ Returns:
+ List of Bitwarden item IDs that were created
+ """
+ created_ids = []
+
+ for entry in entries:
+ try:
+ item_data = {
+ 'type': 1, # Login type
+ 'name': entry.website,
+ 'notes': entry.notes,
+ 'favorite': entry.custom_fields.get('favorite', False),
+ 'login': {
+ 'username': entry.username,
+ 'password': entry.password,
+ 'uris': [
+ {
+ 'match': None,
+ 'uri': entry.url
+ }
+ ] if entry.url else []
+ },
+ 'collectionIds': entry.tags,
+ 'folderId': folder_id or entry.custom_fields.get('folderId')
+ }
+
+ result = self.create_item(item_data)
+ if result and 'id' in result:
+ created_ids.append(result['id'])
+
+ except Exception as e:
+ logger.error(f"Error exporting entry to Bitwarden: {e}")
+
+ return created_ids
diff --git a/src/cerberus/integrations/chrome.py b/src/cerberus/integrations/chrome.py
new file mode 100644
index 0000000..ddc6acc
--- /dev/null
+++ b/src/cerberus/integrations/chrome.py
@@ -0,0 +1,135 @@
+"""Chrome/Chromium password export integration for Cerberus."""
+
+import csv
+import json
+import sqlite3
+import shutil
+import tempfile
+from pathlib import Path
+from typing import List, Optional, Dict, Any
+
+from ..core.models import PasswordEntry
+from . import BaseIntegration, register_integration, IntegrationError
+
+class ChromeIntegration(BaseIntegration):
+ """Integration with Chrome/Chromium password exports."""
+
+ def __init__(self, export_path: Optional[Path] = None):
+ """Initialize the Chrome integration.
+
+ Args:
+ export_path: Path to Chrome passwords CSV export
+ """
+ super().__init__()
+ self.export_path = export_path
+
+ def connect(self, export_path: Optional[Path] = None, **kwargs) -> bool:
+ """Load the Chrome passwords export file.
+
+ Args:
+ export_path: Path to the Chrome passwords CSV export
+
+ Returns:
+ bool: True if the export file exists and is accessible
+ """
+ if export_path:
+ self.export_path = Path(export_path)
+
+ if not self.export_path or not self.export_path.exists():
+ raise IntegrationError("Chrome passwords export file not found")
+
+ self.connected = True
+ return True
+
+ def list_entries(self) -> List[PasswordEntry]:
+ """List all password entries from the Chrome export.
+
+ Returns:
+ List of PasswordEntry objects
+
+ Raises:
+ IntegrationError: If not connected or error reading the export file
+ """
+ if not self.connected:
+ raise IntegrationError("Not connected to Chrome passwords")
+
+ entries: List[PasswordEntry] = []
+
+ try:
+ with open(self.export_path, 'r', encoding='utf-8') as f:
+ # Chrome CSV format: name,url,username,password
+ reader = csv.reader(f)
+
+ # Skip header if it exists
+ header = next(reader, None)
+ if not header or len(header) < 4:
+ # Try without skipping header
+ f.seek(0)
+
+ for row in reader:
+ if len(row) < 4: # Ensure we have enough columns
+ continue
+
+ name, url, username, password = row[:4]
+
+ # Create a PasswordEntry
+ entry = PasswordEntry(
+ website=name or url or "Unknown",
+ username=username,
+ password=password,
+ url=url
+ )
+
+ entries.append(entry)
+
+ except Exception as e:
+ raise IntegrationError(f"Error reading Chrome passwords export: {e}")
+
+ return entries
+
+ def import_entries(self, input_path: Optional[Path] = None) -> List[PasswordEntry]:
+ """Import entries from a Chrome passwords export.
+
+ Args:
+ input_path: Path to the Chrome passwords export file
+
+ Returns:
+ List of imported PasswordEntry objects
+ """
+ if input_path:
+ self.export_path = Path(input_path)
+
+ if not self.export_path or not self.export_path.exists():
+ raise IntegrationError("Chrome passwords export file not found")
+
+ return self.list_entries()
+
+ def export_entries(self, output_path: Path) -> bool:
+ """Export entries to a Chrome-compatible CSV file.
+
+ Args:
+ output_path: Path to save the exported data
+
+ Returns:
+ bool: True if export was successful
+ """
+ # This would require converting from our format to Chrome's format
+ # For now, we'll just raise a NotImplementedError
+ raise NotImplementedError("Export to Chrome format is not yet implemented")
+
+ @classmethod
+ def export_help(cls) -> str:
+ """Get instructions for exporting from Chrome.
+
+ Returns:
+ str: Instructions for exporting from Chrome
+ """
+ return """To export passwords from Chrome:
+ 1. Open Chrome and go to: chrome://settings/passwords
+ 2. Click the three dots menu next to 'Saved Passwords'
+ 3. Select 'Export passwords...'
+ 4. Follow the prompts to save the passwords to a CSV file
+ """
+
+# Register the integration with the name 'chrome'
+register_integration("chrome")(ChromeIntegration)
diff --git a/src/cerberus/integrations/keepass.py b/src/cerberus/integrations/keepass.py
new file mode 100644
index 0000000..f445826
--- /dev/null
+++ b/src/cerberus/integrations/keepass.py
@@ -0,0 +1,208 @@
+"""KeePass integration for Cerberus."""
+
+import xml.etree.ElementTree as ET
+from pathlib import Path
+from typing import List, Optional, Dict, Any
+
+from ..core.models import PasswordEntry
+from . import BaseIntegration, register_integration, IntegrationError
+
+try:
+ import pykeepass
+ KEEPASS_AVAILABLE = True
+except ImportError:
+ KEEPASS_AVAILABLE = False
+
+@register_integration("keepass")
+class KeePassIntegration(BaseIntegration):
+ """Integration with KeePass password manager."""
+
+ def __init__(self, database_path: Optional[Path] = None, keyfile: Optional[Path] = None):
+ """Initialize the KeePass integration.
+
+ Args:
+ database_path: Path to the KeePass database file (.kdbx)
+ keyfile: Path to the keyfile (if used)
+ """
+ super().__init__()
+ if not KEEPASS_AVAILABLE:
+ raise IntegrationError(
+ "pykeepass package is required for KeePass integration. "
+ "Install with: pip install pykeepass"
+ )
+
+ self.database_path = database_path
+ self.keyfile = keyfile
+ self.kp = None
+
+ def connect(self, password: str, database_path: Optional[Path] = None,
+ keyfile: Optional[Path] = None, **kwargs) -> bool:
+ """Connect to a KeePass database.
+
+ Args:
+ password: Database password
+ database_path: Path to the KeePass database file
+ keyfile: Path to the keyfile (if used)
+
+ Returns:
+ bool: True if connection was successful
+ """
+ if database_path:
+ self.database_path = Path(database_path)
+ if keyfile:
+ self.keyfile = Path(keyfile)
+
+ if not self.database_path or not self.database_path.exists():
+ raise IntegrationError("KeePass database file not found")
+
+ try:
+ self.kp = pykeepass.PyKeePass(
+ self.database_path,
+ password=password,
+ keyfile=str(self.keyfile) if self.keyfile and self.keyfile.exists() else None
+ )
+ self.connected = True
+ return True
+ except Exception as e:
+ raise IntegrationError(f"Failed to open KeePass database: {e}")
+
+ def disconnect(self):
+ """Close the KeePass database."""
+ self.kp = None
+ self.connected = False
+
+ def list_entries(self) -> List[PasswordEntry]:
+ """List all password entries from the KeePass database.
+
+ Returns:
+ List of PasswordEntry objects
+
+ Raises:
+ IntegrationError: If not connected or error reading the database
+ """
+ if not self.connected or not self.kp:
+ raise IntegrationError("Not connected to KeePass database")
+
+ entries: List[PasswordEntry] = []
+
+ try:
+ for entry in self.kp.entries:
+ # Skip entries without URLs or usernames
+ if not (entry.url or entry.title) or not entry.username:
+ continue
+
+ # Get entry notes and custom fields
+ notes = entry.notes or ""
+ custom_fields = {}
+
+ # Add any custom fields
+ for key, value in entry.custom_properties.items():
+ if key and value and key.lower() not in ['notes', 'password']:
+ custom_fields[key] = value
+
+ # Create a PasswordEntry
+ entry_obj = PasswordEntry(
+ website=entry.url or entry.title,
+ username=entry.username,
+ password=entry.password,
+ notes=notes,
+ url=entry.url,
+ tags=[entry.group.name] if entry.group else [],
+ custom_fields=custom_fields if custom_fields else None
+ )
+
+ entries.append(entry_obj)
+
+ except Exception as e:
+ raise IntegrationError(f"Error reading KeePass database: {e}")
+
+ return entries
+
+ def import_entries(self, input_path: Optional[Path] = None, password: str = None,
+ keyfile: Optional[Path] = None) -> List[PasswordEntry]:
+ """Import entries from a KeePass database.
+
+ Args:
+ input_path: Path to the KeePass database file
+ password: Database password
+ keyfile: Path to the keyfile (if used)
+
+ Returns:
+ List of imported PasswordEntry objects
+ """
+ if input_path:
+ self.database_path = Path(input_path)
+ if keyfile:
+ self.keyfile = Path(keyfile)
+
+ if not password:
+ raise IntegrationError("Password is required to open KeePass database")
+
+ self.connect(password=password, database_path=self.database_path, keyfile=self.keyfile)
+ return self.list_entries()
+
+ def export_entries(self, output_path: Path, entries: List[PasswordEntry] = None) -> bool:
+ """Export entries to a new KeePass database.
+
+ Args:
+ output_path: Path to save the new KeePass database
+ entries: List of PasswordEntry objects to export
+
+ Returns:
+ bool: True if export was successful
+ """
+ if not self.connected or not self.kp:
+ raise IntegrationError("Not connected to KeePass database")
+
+ try:
+ # Create a new KeePass database
+ new_kp = pykeepass.create_database(str(output_path))
+
+ # Add a group for the imported entries
+ imported_group = new_kp.add_group(new_kp.root_group, 'Imported')
+
+ # Add each entry to the database
+ for entry in (entries or self.list_entries()):
+ # Skip entries without URLs or usernames
+ if not (entry.website or entry.url) or not entry.username:
+ continue
+
+ # Add the entry to the database
+ new_entry = new_kp.add_entry(
+ imported_group,
+ title=entry.website or entry.url,
+ username=entry.username,
+ password=entry.password,
+ url=entry.url or entry.website,
+ notes=entry.notes,
+ tags=','.join(entry.tags) if entry.tags else None
+ )
+
+ # Add custom fields
+ if entry.custom_fields:
+ for key, value in entry.custom_fields.items():
+ if key and value and key.lower() not in ['notes', 'password']:
+ new_entry.set_custom_property(key, str(value))
+
+ # Save the new database
+ new_kp.save()
+ return True
+
+ except Exception as e:
+ raise IntegrationError(f"Error exporting to KeePass database: {e}")
+
+ @classmethod
+ def export_help(cls) -> str:
+ """Get instructions for exporting from KeePass.
+
+ Returns:
+ str: Instructions for exporting from KeePass
+ """
+ return """To export from KeePass:
+ 1. Open your KeePass database
+ 2. Go to 'File' > 'Export'
+ 3. Choose a format (e.g., XML)
+ 4. Save the exported file
+
+ Note: For better security, use the direct database import method.
+ """
diff --git a/src/cerberus/integrations/lastpass.py b/src/cerberus/integrations/lastpass.py
new file mode 100644
index 0000000..9c587d2
--- /dev/null
+++ b/src/cerberus/integrations/lastpass.py
@@ -0,0 +1,126 @@
+"""LastPass integration for Cerberus."""
+
+import csv
+from pathlib import Path
+from typing import List, Optional, Dict, Any
+
+from ..core.models import PasswordEntry
+from . import BaseIntegration, register_integration, IntegrationError
+
+@register_integration("lastpass")
+class LastPassIntegration(BaseIntegration):
+ """Integration with LastPass password manager."""
+
+ def __init__(self, export_path: Optional[Path] = None):
+ """Initialize the LastPass integration.
+
+ Args:
+ export_path: Path to LastPass export file
+ """
+ super().__init__()
+ self.export_path = export_path
+
+ def connect(self, export_path: Optional[Path] = None, **kwargs) -> bool:
+ """Connect to LastPass (loads the export file).
+
+ Args:
+ export_path: Path to LastPass export file
+
+ Returns:
+ bool: True if the export file exists and is accessible
+ """
+ if export_path:
+ self.export_path = Path(export_path)
+
+ if not self.export_path or not self.export_path.exists():
+ raise IntegrationError("LastPass export file not found")
+
+ self.connected = True
+ return True
+
+ def list_entries(self) -> List[PasswordEntry]:
+ """List all password entries from the LastPass export.
+
+ Returns:
+ List of PasswordEntry objects
+
+ Raises:
+ IntegrationError: If not connected or error reading the export file
+ """
+ if not self.connected:
+ raise IntegrationError("Not connected to LastPass")
+
+ entries: List[PasswordEntry] = []
+
+ try:
+ with open(self.export_path, 'r', encoding='utf-8') as f:
+ # Skip the first line (header)
+ next(f)
+
+ reader = csv.reader(f)
+ for row in reader:
+ if len(row) < 7: # Ensure we have enough columns
+ continue
+
+ url, username, password, extra, name, grouping, fav = row[:7]
+
+ # Create a PasswordEntry
+ entry = PasswordEntry(
+ website=url or name or "Unknown",
+ username=username,
+ password=password,
+ notes=extra,
+ tags=[grouping] if grouping else []
+ )
+
+ entries.append(entry)
+
+ except Exception as e:
+ raise IntegrationError(f"Error reading LastPass export: {e}")
+
+ return entries
+
+ def import_entries(self, input_path: Optional[Path] = None) -> List[PasswordEntry]:
+ """Import entries from a LastPass export file.
+
+ Args:
+ input_path: Path to the LastPass export file
+
+ Returns:
+ List of imported PasswordEntry objects
+ """
+ if input_path:
+ self.export_path = Path(input_path)
+
+ if not self.export_path or not self.export_path.exists():
+ raise IntegrationError("LastPass export file not found")
+
+ return self.list_entries()
+
+ def export_entries(self, output_path: Path) -> bool:
+ """Export entries to a LastPass-compatible CSV file.
+
+ Args:
+ output_path: Path to save the exported data
+
+ Returns:
+ bool: True if export was successful
+ """
+ # This would require converting from our format to LastPass format
+ # For now, we'll just raise a NotImplementedError
+ raise NotImplementedError("Export to LastPass format is not yet implemented")
+
+ @classmethod
+ def export_help(cls) -> str:
+ """Get instructions for exporting from LastPass.
+
+ Returns:
+ str: Instructions for exporting from LastPass
+ """
+ return """To export from LastPass:
+ 1. Log in to your LastPass account
+ 2. Click on your email in the bottom left
+ 3. Select 'Advanced' > 'Export'
+ 4. Enter your master password
+ 5. Save the exported file
+ """
diff --git a/src/cerberus/native/host.py b/src/cerberus/native/host.py
new file mode 100644
index 0000000..75459b1
--- /dev/null
+++ b/src/cerberus/native/host.py
@@ -0,0 +1,138 @@
+"""
+Native Messaging host for Cerberus Password Manager (development scaffold).
+
+WARNING: Development-only. For testing with the Firefox/Chrome extension.
+Authentication: expects master password via CERB_MASTER environment variable.
+
+Protocol: newline-free JSON messages with 4-byte little-endian length prefix.
+Messages:
+ {"type":"ping"}
+ {"type":"list_entries"}
+ {"type":"get_for_origin", "origin":"https://example.com", "include_password": false}
+
+Responses:
+ {"ok": true, "result": ...} or {"ok": false, "error": "..."}
+"""
+from __future__ import annotations
+
+import json
+import os
+import struct
+import sys
+from typing import Any, Dict, List
+from urllib.parse import urlparse
+
+from cerberus.core.password_manager import PasswordManager, VaultError
+
+
+def _read_msg() -> Dict[str, Any] | None:
+ raw_len = sys.stdin.buffer.read(4)
+ if not raw_len:
+ return None
+ msg_len = struct.unpack("<I", raw_len)[0]
+ data = sys.stdin.buffer.read(msg_len)
+ if not data:
+ return None
+ return json.loads(data.decode("utf-8"))
+
+
+essential_fields = ["id", "website", "username", "url", "notes", "tags", "updated_at", "last_used"]
+
+
+def _write_msg(obj: Dict[str, Any]) -> None:
+ data = json.dumps(obj, default=str, separators=(",", ":")).encode("utf-8")
+ sys.stdout.buffer.write(struct.pack("<I", len(data)))
+ sys.stdout.buffer.write(data)
+ sys.stdout.buffer.flush()
+
+
+def _origin_host(origin: str) -> str:
+ try:
+ u = urlparse(origin)
+ return u.hostname or origin
+ except Exception:
+ return origin
+
+
+def main() -> None:
+ master = os.environ.get("CERB_MASTER")
+ data_dir = os.environ.get("CERB_DATA_DIR")
+ pm: PasswordManager | None = None
+
+ if master:
+ try:
+ pm = PasswordManager(data_dir=data_dir, master_password=master)
+ except VaultError as e:
+ _write_msg({"ok": False, "error": f"unlock_failed:{e}"})
+ return
+ else:
+ # Defer unlock until first request arrives
+ pm = None
+
+ allow_remote_unlock = os.environ.get("CERB_ALLOW_REMOTE_UNLOCK") == "1"
+
+ while True:
+ msg = _read_msg()
+ if msg is None:
+ break
+ try:
+ mtype = msg.get("type")
+ if mtype == "ping":
+ _write_msg({"ok": True, "result": "pong"})
+ continue
+ if mtype == "unlock":
+ if not allow_remote_unlock:
+ _write_msg({"ok": False, "error": "remote_unlock_disabled"})
+ else:
+ master = msg.get("master")
+ if not master:
+ _write_msg({"ok": False, "error": "missing_master"})
+ continue
+ try:
+ pm = PasswordManager(data_dir=data_dir, master_password=master)
+ _write_msg({"ok": True, "result": "unlocked"})
+ except VaultError as e:
+ _write_msg({"ok": False, "error": f"unlock_failed:{e}"})
+ continue
+ if pm is None:
+ _write_msg({"ok": False, "error": "locked"})
+ continue
+ if mtype == "list_entries":
+ entries = pm.get_entries()
+ slim = [
+ {
+ "id": e.id,
+ "website": e.website,
+ "username": e.username,
+ "url": e.url,
+ }
+ for e in entries
+ ]
+ _write_msg({"ok": True, "result": slim})
+ continue
+ if mtype == "get_for_origin":
+ origin = msg.get("origin") or ""
+ include_password = bool(msg.get("include_password", False))
+ host = _origin_host(origin).lower()
+ matches = []
+ for e in pm.get_entries():
+ target = (e.url or e.website or "").lower()
+ if host and host in target:
+ item = {
+ "id": e.id,
+ "website": e.website,
+ "username": e.username,
+ "url": e.url,
+ }
+ if include_password:
+ item["password"] = e.password
+ matches.append(item)
+ _write_msg({"ok": True, "result": matches})
+ continue
+ _write_msg({"ok": False, "error": f"unknown_type:{mtype}"})
+ except Exception as e:
+ _write_msg({"ok": False, "error": f"exception:{e}"})
+
+
+if __name__ == "__main__":
+ main()
diff --git a/src/cerberus/native/manifests/chrome_com.cerberus.pm.json b/src/cerberus/native/manifests/chrome_com.cerberus.pm.json
new file mode 100644
index 0000000..7eec080
--- /dev/null
+++ b/src/cerberus/native/manifests/chrome_com.cerberus.pm.json
@@ -0,0 +1,9 @@
+{
+ "name": "com.cerberus.pm",
+ "description": "Cerberus Password Manager Native Messaging Host (dev)",
+ "path": "/usr/local/bin/cerberus-native-host",
+ "type": "stdio",
+ "allowed_origins": [
+ "chrome-extension://REPLACE_WITH_EXTENSION_ID/"
+ ]
+}
diff --git a/src/cerberus/native/manifests/firefox_com.cerberus.pm.json b/src/cerberus/native/manifests/firefox_com.cerberus.pm.json
new file mode 100644
index 0000000..fda6d6e
--- /dev/null
+++ b/src/cerberus/native/manifests/firefox_com.cerberus.pm.json
@@ -0,0 +1,9 @@
+{
+ "name": "com.cerberus.pm",
+ "description": "Cerberus Password Manager Native Messaging Host (dev)",
+ "path": "/usr/local/bin/cerberus-native-host",
+ "type": "stdio",
+ "allowed_extensions": [
+ "cerberus@example.com"
+ ]
+}
diff --git a/src/cerberus/tui/__init__.py b/src/cerberus/tui/__init__.py
new file mode 100644
index 0000000..75461d6
--- /dev/null
+++ b/src/cerberus/tui/__init__.py
@@ -0,0 +1,16 @@
+"""
+Terminal User Interface (TUI) for Cerberus Password Manager.
+
+This module provides a rich, interactive terminal interface for managing passwords.
+"""
+
+__all__ = ["main"]
+
+def main():
+ """Launch the Cerberus TUI."""
+ from .app import CerberusTUI
+ app = CerberusTUI()
+ app.run()
+
+if __name__ == "__main__":
+ main()
diff --git a/src/cerberus/tui/__pycache__/__init__.cpython-313.pyc b/src/cerberus/tui/__pycache__/__init__.cpython-313.pyc
new file mode 100644
index 0000000..bb0f474
--- /dev/null
+++ b/src/cerberus/tui/__pycache__/__init__.cpython-313.pyc
Binary files differ
diff --git a/src/cerberus/tui/app.py b/src/cerberus/tui/app.py
new file mode 100644
index 0000000..6e3d513
--- /dev/null
+++ b/src/cerberus/tui/app.py
@@ -0,0 +1,243 @@
+""
+Main TUI application for Cerberus Password Manager.
+"""
+from typing import Optional, List, Dict, Any
+from pathlib import Path
+import logging
+
+from rich.console import Console
+from rich.panel import Panel
+from rich.table import Table
+from rich.prompt import Prompt, Confirm
+from rich.progress import Progress, SpinnerColumn, TextColumn
+from rich.traceback import install as install_rich_traceback
+from textual.app import App, ComposeResult
+from textual.containers import Container, VerticalScroll
+from textual.widgets import (
+ Header, Footer, Button, Static, Input, Label,
+ DataTable, Select, Switch, LoadingIndicator
+)
+
+from ..core.password_manager import PasswordManager, VaultError
+from ..core.models import PasswordEntry
+
+# Set up rich traceback for better error messages
+install_rich_traceback(show_locals=True)
+logger = logging.getLogger(__name__)
+
+class CerberusTUI(App):
+ """Main TUI application for Cerberus Password Manager."""
+
+ CSS = """
+ Screen {
+ layout: vertical;
+ }
+
+ #login-screen {
+ width: 100%;
+ height: 100%;
+ align: center middle;
+ }
+
+ #main-screen {
+ width: 100%;
+ height: 100%;
+ layout: horizontal;
+ }
+
+ #sidebar {
+ width: 30%;
+ height: 100%;
+ border: solid $accent;
+ }
+
+ #content {
+ width: 70%;
+ height: 100%;
+ border: solid $accent;
+ }
+
+ .entry-list {
+ width: 100%;
+ height: 100%;
+ }
+
+ .entry-detail {
+ width: 100%;
+ height: 100%;
+ padding: 1;
+ }
+ """
+
+ BINDINGS = [
+ ("q", "quit", "Quit"),
+ ("n", "new_password", "New Password"),
+ ("f", "find", "Find Password"),
+ ("r", "refresh", "Refresh"),
+ ("c", "copy_password", "Copy Password"),
+ ("u", "copy_username", "Copy Username"),
+ ("d", "delete", "Delete Entry"),
+ ]
+
+ def __init__(self, data_dir: Optional[str] = None, **kwargs):
+ super().__init__(**kwargs)
+ self.console = Console()
+ self.pm: Optional[PasswordManager] = None
+ self.data_dir = data_dir
+ self.current_entry: Optional[PasswordEntry] = None
+ self.entries: List[PasswordEntry] = []
+
+ def compose(self) -> ComposeResult:
+ """Create child widgets for the app."""
+ if not self.pm:
+ yield Container(
+ VerticalScroll(
+ Label("🔒 Cerberus Password Manager", id="login-title"),
+ Input(placeholder="Master Password", password=True, id="master-password"),
+ Button("Unlock", variant="primary", id="unlock-button"),
+ id="login-screen"
+ )
+ )
+ else:
+ with Container(id="main-screen"):
+ with Container(id="sidebar"):
+ yield DataTable(id="entry-list", cursor_type="row")
+ with Container(id="content"):
+ yield Static("Select an entry to view details", id="entry-detail")
+
+ def on_mount(self) -> None:
+ """Initialize the UI after mounting."""
+ if not self.pm:
+ self.query_one("#master-password", Input).focus()
+ else:
+ self.load_entries()
+
+ def on_button_pressed(self, event: Button.Pressed) -> None:
+ """Handle button press events."""
+ if event.button.id == "unlock-button":
+ self.unlock_vault()
+
+ def on_input_submitted(self, event: Input.Submitted) -> None:
+ """Handle input submission."""
+ if event.input.id == "master-password":
+ self.unlock_vault()
+
+ def unlock_vault(self) -> None:
+ """Attempt to unlock the password vault."""
+ password_input = self.query_one("#master-password", Input)
+ password = password_input.value
+
+ if not password:
+ self.notify("Please enter a master password", severity="error")
+ return
+
+ try:
+ with Progress(
+ SpinnerColumn(),
+ TextColumn("[progress.description]{task.description}"),
+ transient=True,
+ console=self.console
+ ) as progress:
+ task = progress.add_task("Unlocking vault...", total=None)
+ self.pm = PasswordManager(data_dir=self.data_dir, master_password=password)
+ progress.update(task, completed=1)
+
+ # Clear the login screen and show the main interface
+ self.query("#login-screen").remove()
+ self.compose()
+ self.load_entries()
+
+ except VaultError as e:
+ self.notify(f"Failed to unlock vault: {e}", severity="error")
+ except Exception as e:
+ logger.exception("Error unlocking vault")
+ self.notify(f"An error occurred: {e}", severity="error")
+
+ def load_entries(self) -> None:
+ """Load password entries into the UI."""
+ if not self.pm:
+ return
+
+ try:
+ table = self.query_one("#entry-list", DataTable)
+ table.clear()
+ table.add_columns("Website", "Username", "Last Used")
+
+ self.entries = self.pm.get_entries()
+ for entry in self.entries:
+ table.add_row(
+ entry.website,
+ entry.username,
+ entry.last_used.strftime("%Y-%m-%d") if entry.last_used else "Never"
+ )
+
+ except Exception as e:
+ logger.exception("Error loading entries")
+ self.notify(f"Failed to load entries: {e}", severity="error")
+
+ def action_new_password(self) -> None:
+ """Create a new password entry."""
+ self.notify("New password functionality coming soon!", severity="information")
+
+ def action_find(self) -> None:
+ """Find a password entry."""
+ self.notify("Find functionality coming soon!", severity="information")
+
+ def action_refresh(self) -> None:
+ """Refresh the entry list."""
+ self.load_entries()
+ self.notify("Entries refreshed", severity="information")
+
+ def action_copy_password(self) -> None:
+ """Copy the current entry's password to clipboard."""
+ if not self.current_entry:
+ self.notify("No entry selected", severity="warning")
+ return
+
+ try:
+ # Use platform-specific clipboard handling
+ import pyperclip
+ pyperclip.copy(self.current_entry.password)
+ self.notify("Password copied to clipboard", severity="information")
+ except Exception as e:
+ logger.exception("Error copying to clipboard")
+ self.notify(f"Failed to copy to clipboard: {e}", severity="error")
+
+ def action_copy_username(self) -> None:
+ """Copy the current entry's username to clipboard."""
+ if not self.current_entry:
+ self.notify("No entry selected", severity="warning")
+ return
+
+ try:
+ import pyperclip
+ pyperclip.copy(self.current_entry.username)
+ self.notify("Username copied to clipboard", severity="information")
+ except Exception as e:
+ logger.exception("Error copying username")
+ self.notify(f"Failed to copy username: {e}", severity="error")
+
+ def action_delete(self) -> None:
+ """Delete the current entry."""
+ if not self.current_entry:
+ self.notify("No entry selected", severity="warning")
+ return
+
+ if Confirm.ask(f"Delete entry for {self.current_entry.website}?"):
+ try:
+ self.pm.delete_entry(self.current_entry.id)
+ self.load_entries()
+ self.notify("Entry deleted", severity="information")
+ self.current_entry = None
+ self.query_one("#entry-detail", Static).update("Select an entry to view details")
+ except Exception as e:
+ logger.exception("Error deleting entry")
+ self.notify(f"Failed to delete entry: {e}", severity="error")
+
+def main():
+ """Run the Cerberus TUI."""
+ app = CerberusTUI()
+ app.run()
+
+if __name__ == "__main__":
+ main()