diff options
87 files changed, 6363 insertions, 709 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..27dc406 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,55 @@ +cmake_minimum_required(VERSION 3.10) +project(cerberus VERSION 0.1.0 LANGUAGES C) + +# Set C standard +set(CMAKE_C_STANDARD 11) +set(CMAKE_C_STANDARD_REQUIRED ON) + +# Set output directories +set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib) +set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib) +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) + +# Find required packages +find_package(OpenSSL REQUIRED) +find_package(Threads REQUIRED) + +# Add include directories +include_directories( + ${CMAKE_CURRENT_SOURCE_DIR}/core + ${OPENSSL_INCLUDE_DIR} +) + +# Add library +add_library(cerberus SHARED core/cerberus.c) + +# Link libraries +target_link_libraries(cerberus + PRIVATE + OpenSSL::Crypto + Threads::Threads +) + +# Set output name and properties +set_target_properties(cerberus PROPERTIES + OUTPUT_NAME "cerberus" + PREFIX "" + SUFFIX ".so" +) + +# Install rules +install(TARGETS cerberus + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib + RUNTIME DESTINATION bin +) + +# Install header files +install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/core/cerberus.h DESTINATION include/cerberus/core) + +# Optional tests (disabled by default) +option(BUILD_TESTS "Build tests" OFF) +if(BUILD_TESTS AND EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/tests/CMakeLists.txt") + enable_testing() + add_subdirectory(tests) +endif() diff --git a/Makefile b/Makefile deleted file mode 100644 index e69de29..0000000 --- a/Makefile +++ /dev/null @@ -1,58 +1,281 @@ -# cerberus +# 🐕 Cerberus Password Manager +A secure, high-performance password manager with a C core for cryptographic operations, featuring a modern TUI, GUI, and browser extensions. + +## 🚀 Features + +- **High-performance** cryptographic operations powered by a C core +- **Secure** password storage with zero-knowledge encryption +- **Cross-platform** support (Windows, macOS, Linux) +- **Multiple Interfaces**: + - 🔍 Command Line Interface (CLI) + - 🖥️ Terminal User Interface (TUI) + - 🖱️ Graphical User Interface (GUI) + - 🌐 Browser Extensions (Firefox, Chrome/Edge) +- **Smart Password Management**: + - 🔄 Auto-detection of password change forms + - 🔄 One-click password rotation + - 🔒 Password strength analysis + - 🚨 Breach monitoring +- **Browser Integration**: + - 🔌 Auto-fill login forms + - 🔄 Auto-save new logins + - 🔄 Auto-update changed passwords + - 🎯 Smart detection of login forms +- **Import/Export** from other password managers +- **Biometric** authentication support +- **Secure Sharing** of passwords (coming soon) +- **CLI, TUI, and GUI** interfaces for all operations + +## 📦 Installation + +### Prerequisites + +- Python 3.8+ +- CMake 3.10+ +- OpenSSL development libraries +- C compiler (GCC/Clang) +- Node.js 16+ (for browser extensions) +- Optional for TUI: `textual`, `rich` (install with extra `ui-tui`) +- Optional for GUI: `PyQt6` (install with extra `ui-gui`) + +### Quick Start + +```bash +# Clone the repository +git clone https://github.com/srdusr/cerberus.git +cd cerberus + +# Install base package +pip install -e . + +# Optional extras +# TUI +pip install -e .[ui-tui] +# GUI +pip install -e .[ui-gui] +# Selenium automation (optional) +pip install -e .[automation-selenium] + +# Build and install the C core +mkdir -p build && cd build +cmake .. +make +make install + +# Initialize your password vault +cerberus init ``` - #(# - ((# @@(((((#(((@ - #((@#(((((&(((@(((((@ @#@ - @(@ @#@@@@##@,,,(((/((/@ @##(/@//@ @ - (@#(((@ %#@,,,.,,,,,(((@((/( @###(((((((/@ ((/# - #(#@@ @((((((((((#@ #@#%@@@@%/@&,/(((((( @##((((((((((@(/@/ - ###@#((((((((((((##@###@#,%@(((@@,(((((@ ###((((((((&./@@(@(@ - @#(((((@(((((((((###@##@#@@** .,,((((## @###(((((((((((((@&(@ - @#((((((((((((((((#######@(,,,,,#(((###@######(((((((,,,,,,,,,%%@ @//////( - @((,,,,(@((((((((@(###@##########((((########((#((((,,@((@@ .@ @(((((@ - @@@/,,,,@#,,(((((((((##@########((((((##((((((((((((((,, *( @ @(@&((@ - @**/. @%.,,##((((((((((@#######(((((((((#((((((((((((((@,,.,/ @(@ - @**%@ @@&####((((((&######((((((((##((((((((((((((@(((//// (((* - ####((((((((((####(((((((###(((((((((((@((((((((/// @#((( - @####((((((((((@&((((((((((((((((((((((((((((((((//@ (((//( - @@##(((((((((((((((((((((((((((((@(((((((((((((((((((((//((( - &#####((((((((((((((((((((((((((((@((#((((((((((@((((((((((( - @####((((((((((((((#((((((((((((((((##((((((((((@((((###(( - #@#####(((((((((((#@#((((((((((((((#@###((((((((((@###(@ - @###@#######@########&##((((((((((((#@&####((((((((((@ - @#####&&################@##((((((((((( @@#####((((( - (((######## .@@, @##(((((((( ##(((((@ - ((((((#####% @#(((((((( ##((((( - (((((((###% #########((((((( ###(((((( - ((((((((##@ &.@#. ###@#((((((( @@(@((((((( - @(((((((((((##/ #(((((((@ @ @@.@&@.@@ - %. . @(.@((((#@ @(((((((((((@ - .,@@@ % @@ (@@ @(# - . @ # @*@ + +### One-command install (Linux) + +Use the provided `scripts/cerberus-install.sh` to automate Python install, C core build, and (optionally) native messaging setup. + +```bash +# Base install +bash scripts/cerberus-install.sh + +# With extras (TUI, GUI, Selenium) and Firefox native messaging manifest +CERB_EXTRAS="ui-tui,ui-gui,automation-selenium" CERB_INSTALL_FF=1 bash scripts/cerberus-install.sh + +# With Chrome native messaging manifest +CERB_INSTALL_CHROME=1 bash scripts/cerberus-install.sh + +# Skip C core build (if already built/installed) +CERB_SKIP_BUILD=1 bash scripts/cerberus-install.sh ``` -## Description: +Environment variables: + +- `CERB_EXTRAS`: comma-separated extras to install (e.g., `ui-tui,ui-gui,automation-selenium`). +- `CERB_INSTALL_FF=1`: also install Firefox native messaging manifest. +- `CERB_INSTALL_CHROME=1`: also install Chrome native messaging manifest. +- `CERB_SKIP_BUILD=1`: skip building the C core via CMake. -- Password manager written in C +## 🛠️ Usage -## Requirements: +### Command Line Interface (CLI) + +```bash +# Initialize a new password vault +cerberus init -- Linux -- xclip +# Add a new password entry +cerberus add --website example.com --username user@example.com -## Usage: +# Get a password (copies to clipboard) +cerberus get example.com -- Command to compile +# List all entries +cerberus list +# Rotate a password (local vault only) +cerberus rotate example.com + +# Web-rotate via browser automation with dynamic discovery +# Simulate (dry-run) across all entries +cerberus web-rotate --dry-run --all + +# Rotate for a single target using Playwright (default) +cerberus web-rotate example.com + +# Use Selenium instead +cerberus web-rotate example.com --engine selenium + +# Launch the GUI +pip install -e .[ui-gui] +cerberus gui +``` + +### Terminal User Interface (TUI) + +Launch the TUI with: ```bash -$ gcc -o cerberus main.c -lcrypto +cerberus tui ``` -- Run the program: +### Graphical User Interface (GUI) +Launch the GUI with: ```bash -$ ./cerberus +cerberus gui ``` -- NOTE: This program is still in very early stages of development and should not be used in any production environment, use at your own risk. +### Browser Extensions + +Currently, a development Firefox extension is included under `webext/firefox/`. + +Manual install steps for development: + +1. Open `about:debugging#/runtime/this-firefox` in Firefox +2. Click "Load Temporary Add-on..." +3. Select `webext/firefox/manifest.json` +4. A Cerberus icon will appear in the toolbar +5. Use the popup to fill credentials on the current tab + +Note: This extension is a scaffold for development. A native messaging bridge to the local +vault is planned for secure autofill and save. Today it supports simple page form fill. + +### Native Messaging (development) + +Native messaging lets the browser extension talk to your local Cerberus vault securely. + +1) Install the native host (installed as a console script): + +```bash +pip install -e . +# The host command will be available as: +which cerberus-native-host +``` + +2) Install the native messaging manifest for your browser: + +- Firefox (Linux): copy the provided manifest and adjust the `path` if needed + +```bash +mkdir -p ~/.mozilla/native-messaging-hosts/ +cp native/manifests/firefox_com.cerberus.pm.json ~/.mozilla/native-messaging-hosts/com.cerberus.pm.json +# Ensure the path points to your cerberus-native-host binary (e.g., /usr/local/bin/cerberus-native-host) +sed -i "s#/usr/local/bin/cerberus-native-host#$(command -v cerberus-native-host | sed 's#/#\\/#g')#" ~/.mozilla/native-messaging-hosts/com.cerberus.pm.json +``` + +- Chrome/Edge (Linux): create manifest at the standard location + +```bash +mkdir -p ~/.config/google-chrome/NativeMessagingHosts/ +cat > ~/.config/google-chrome/NativeMessagingHosts/com.cerberus.pm.json << 'EOF' +{ + "name": "com.cerberus.pm", + "description": "Cerberus Password Manager Native Messaging Host (dev)", + "path": "/usr/local/bin/cerberus-native-host", + "type": "stdio", + "allowed_origins": [ + "chrome-extension://REPLACE_WITH_EXTENSION_ID/" + ] +} +EOF +# Replace the path with $(command -v cerberus-native-host) +sed -i "s#/usr/local/bin/cerberus-native-host#$(command -v cerberus-native-host | sed 's#/#\\/#g')#" ~/.config/google-chrome/NativeMessagingHosts/com.cerberus.pm.json +``` + +3) Unlocking the vault for native host: + +For development, you can pass the master via environment variable (only for local dev!): + +```bash +CERB_MASTER='your-master' CERB_DATA_DIR=~/.cerberus cerberus-native-host +# Typically launched by the browser; running manually is for debugging only. +``` + +In the extension popup, click "Fetch from Vault" to retrieve credentials for the current tab. + +## 🔄 Password Change Automation + +Cerberus can automatically detect and handle many password change flows via web automation. +It uses a hybrid approach: + +- Tries a site-specific flow when available (e.g., `GithubFlow` in `cerberus/automation/sites/`) +- Falls back to heuristic discovery (`cerberus/automation/discovery.py`): + - Scans the DOM for common "Change/Reset Password" links/buttons + - Tries common settings paths like `/settings/security` and `/settings/password` + - Attempts to locate current/new/confirm password inputs and submit + +```bash +# Automatically detect and update password for a website +cerberus web-rotate example.com + +# Check for password changes on all supported sites +cerberus web-rotate --all + +Tip: Use `--dry-run` first to preview actions without making changes. + +Limitations: Some sites require MFA/2FA or complex flows; in those cases the tool will +return a NEEDS_MANUAL status and avoid unsafe actions. +``` + +## 🛠 Development + +### Setup Development Environment + +```bash +# Install development dependencies +pip install -e ".[dev]" + +# Install pre-commit hooks +pre-commit install + +# Run tests +pytest + +# Run type checking +mypy . + +# Format code +black . + +# Lint code +flake8 +``` + +### Building Browser Extensions + +```bash +The Firefox development extension needs no build step. For Chromium-based browsers +and production bundles, a separate web extension build pipeline will be added. +``` + +## 🤝 Contributing + +Contributions are welcome! Please read our [Contributing Guidelines](CONTRIBUTING.md) for details. + +## 📄 License + +This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. + +## 🔒 Security + +NOTE: This program is still in very early stages of development and should not be used in any production environment, use at your own risk. + +## License + +MIT diff --git a/ascii-art.txt b/ascii-art.txt deleted file mode 100644 index 3c61f76..0000000 --- a/ascii-art.txt +++ /dev/null @@ -1,40 +0,0 @@ - - #(# - ((# @@(((((#(((@ - #((@#(((((&(((@(((((@ @#@ - @(@ @#@@@@##@,,,(((/((/@ @##(/@//@ @ - (@#(((@ %#@,,,.,,,,,(((@((/( @###(((((((/@ ((/# -#(#@@ @((((((((((#@ #@#%@@@@%/@&,/(((((( @##((((((((((@(/@/ - ###@#((((((((((((##@###@#,%@(((@@,(((((@ ###((((((((&./@@(@(@ - @#(((((@(((((((((###@##@#@@** .,,((((## @###(((((((((((((@&(@ - @#((((((((((((((((#######@(,,,,,#(((###@######(((((((,,,,,,,,,%%@ @//////( - @((,,,,(@((((((((@(###@##########((((########((#((((,,@((@@ .@ @(((((@ -@@@/,,,,@#,,(((((((((##@########((((((##((((((((((((((,, *( @ @(@&((@ -@**/. @%.,,##((((((((((@#######(((((((((#((((((((((((((@,,.,/ @(@ - @**%@ @@&####((((((&######((((((((##((((((((((((((@(((//// (((* - ####((((((((((####(((((((###(((((((((((@((((((((/// @#((( - @####((((((((((@&((((((((((((((((((((((((((((((((//@ (((//( - @@##(((((((((((((((((((((((((((((@(((((((((((((((((((((//((( - &#####((((((((((((((((((((((((((((@((#((((((((((@((((((((((( - @####((((((((((((((#((((((((((((((((##((((((((((@((((###(( - #@#####(((((((((((#@#((((((((((((((#@###((((((((((@###(@ - @###@#######@########&##((((((((((((#@&####((((((((((@ - @#####&&################@##((((((((((( @@#####((((( - (((######## .@@, @##(((((((( ##(((((@ - ((((((#####% @#(((((((( ##((((( - (((((((###% #########((((((( ###(((((( - ((((((((##@ &.@#. ###@#((((((( @@(@((((((( - @(((((((((((##/ #(((((((@ @ @@.@&@.@@ - %. . @(.@((((#@ @(((((((((((@ - .,@@@ % @@ (@@ @(# - . @ # @*@ - @@@@@@@ @@@@@@@@ @@@@@@@ @@@@@@@ @@@@@@@@ @@@@@@@ @@@ @@@ @@@@@@ -@@@@@@@@ @@@@@@@@ @@@@@@@@ @@@@@@@@ @@@@@@@@ @@@@@@@@ @@@ @@@ @@@@@@@ -!@@ @@! @@! @@@ @@! @@@ @@! @@! @@@ @@! @@@ !@@ -!@! !@! !@! @!@ !@ @!@ !@! !@! @!@ !@! @!@ !@! -!@! @!!!:! @!@!!@! @!@!@!@ @!!!:! @!@!!@! @!@ !@! !!@@!! -!!! !!!!!: !!@!@! !!!@!!!! !!!!!: !!@!@! !@! !!! !!@!!! -:!! !!: !!: :!! !!: !!! !!: !!: :!! !!: !!! !:! -:!: :!: :!: !:! :!: !:! :!: :!: !:! :!: !:! !:! - ::: ::: :: :::: :: ::: :: :::: :: :::: :: ::: ::::: :: :::: :: - :: :: : : :: :: : : : :: : :: : :: :: : : : : : : :: : :
\ No newline at end of file diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..368a4d7 --- /dev/null +++ b/core/__init__.py @@ -0,0 +1,93 @@ +"""Cerberus Core - Core functionality for the Cerberus password manager. + +This module provides the core functionality for the Cerberus password manager, +including the C core bindings and high-level password management interfaces. +""" + +import os +import cffi +from pathlib import Path +from typing import Optional, Any + +# Initialize CFFI (exported for callers that need to manage buffers) +ffi = cffi.FFI() + +# Load the C header +def _load_header(): + header_path = Path(__file__).parent / 'cerberus.h' + with open(header_path) as f: + # Read and clean up the header for CFFI + lines = [] + for line in f: + # Remove #include directives and other preprocessor commands + if line.startswith('#'): + continue + # Remove C++ style comments + if '//' in line: + line = line.split('//')[0] + '\n' + lines.append(line) + + # Join the cleaned lines and pass to cdef + ffi.cdef('\n'.join(filter(None, lines))) + +# Load the header +_load_header() + +# Try to load the compiled library +_lib = None + +def init() -> bool: + """Initialize the Cerberus C core. + + Returns: + bool: True if initialization was successful, False otherwise + """ + global _lib + + if _lib is not None: + return True + + # Try multiple candidate names + candidates = [ + Path(__file__).parent / 'libcerberus.so', + Path(__file__).parent / 'cerberus.so' + ] + for lib_path in candidates: + try: + _lib = ffi.dlopen(str(lib_path)) + return True + except OSError: + continue + _lib = None + return False + +# Initialize on import +if not init(): + class DummyLib: + def __getattribute__(self, name: str) -> Any: + raise RuntimeError( + "Cerberus C core not initialized. " + "Please ensure the core is compiled and in your library path." + ) + + _lib = DummyLib() + +# Re-export the C functions with proper typing +for name in dir(_lib): + if name.startswith('cerb_'): + globals()[name] = getattr(_lib, name) + +# Clean up the namespace (keep ffi exported) +del os, Path, _load_header, init, DummyLib + +# Export high-level interfaces +from .password_manager import PasswordManager +from .models import PasswordEntry + +__all__ = [ + 'PasswordManager', + 'PasswordEntry', + 'VaultError', + 'CoreNotAvailableError', + 'ffi' +] diff --git a/core/cerberus.c b/core/cerberus.c new file mode 100644 index 0000000..9f12f7d --- /dev/null +++ b/core/cerberus.c @@ -0,0 +1,459 @@ +#include "cerberus.h" +#include <string.h> +#include <stdlib.h> +#include <stdio.h> +#include <openssl/evp.h> +#include <openssl/rand.h> +#include <openssl/err.h> +// uuid/uuid.h not required; implement UUID v4 using RAND_bytes + +// Vault structure +typedef struct { + uint8_t salt[SALT_LEN]; + uint8_t key[KEY_LEN]; + bool key_initialized; + cerb_entry_t *entries; + size_t num_entries; + size_t capacity; +} cerb_vault_internal_t; + +// Initialize crypto +cerb_error_t cerb_crypto_init(void) { + OpenSSL_add_all_algorithms(); + ERR_load_crypto_strings(); + return RAND_poll() ? CERB_OK : CERB_CRYPTO_ERROR; +} + +// Cleanup crypto +void cerb_crypto_cleanup(void) { + EVP_cleanup(); + ERR_free_strings(); +} + +// Create new vault +cerb_error_t cerb_vault_create(const char *master_password, cerb_vault_t **vault) { + if (!master_password || !vault) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = calloc(1, sizeof(cerb_vault_internal_t)); + if (!v) return CERB_MEMORY_ERROR; + + if (RAND_bytes(v->salt, SALT_LEN) != 1) { + free(v); + return CERB_CRYPTO_ERROR; + } + + // Derive key from password and salt + if (!PKCS5_PBKDF2_HMAC(master_password, (int)strlen(master_password), + v->salt, SALT_LEN, PBKDF2_ITERATIONS, + EVP_sha256(), KEY_LEN, v->key)) { + free(v); + return CERB_CRYPTO_ERROR; + } + v->key_initialized = true; + + v->capacity = 32; + v->entries = calloc(v->capacity, sizeof(cerb_entry_t)); + if (!v->entries) { + free(v); + return CERB_MEMORY_ERROR; + } + + *vault = (cerb_vault_t *)v; + return CERB_OK; +} + +// Save vault to file (AES-256-GCM encrypted blob) +cerb_error_t cerb_vault_save(cerb_vault_t *vault, const char *vault_path) { + if (!vault || !vault_path) return CERB_INVALID_ARG; + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + FILE *fp = fopen(vault_path, "wb"); + if (!fp) return CERB_STORAGE_ERROR; + + // Serialize entries: [num_entries][entries...] + size_t plain_len = sizeof(uint32_t) + v->num_entries * sizeof(cerb_entry_t); + unsigned char *plaintext = malloc(plain_len); + if (!plaintext) { fclose(fp); return CERB_MEMORY_ERROR; } + + uint32_t n = (uint32_t)v->num_entries; + memcpy(plaintext, &n, sizeof(uint32_t)); + if (v->num_entries > 0) { + memcpy(plaintext + sizeof(uint32_t), v->entries, v->num_entries * sizeof(cerb_entry_t)); + } + + // Prepare AES-GCM + unsigned char iv[IV_LEN]; + if (RAND_bytes(iv, IV_LEN) != 1) { free(plaintext); fclose(fp); return CERB_CRYPTO_ERROR; } + unsigned char *ciphertext = malloc(plain_len); + if (!ciphertext) { free(plaintext); fclose(fp); return CERB_MEMORY_ERROR; } + int len = 0, ciphertext_len = 0; + unsigned char tag[16]; + + EVP_CIPHER_CTX *ctx = EVP_CIPHER_CTX_new(); + if (!ctx) { free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; } + + if (EVP_EncryptInit_ex(ctx, EVP_aes_256_gcm(), NULL, NULL, NULL) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + if (EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_IVLEN, IV_LEN, NULL) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + if (EVP_EncryptInit_ex(ctx, NULL, NULL, v->key, iv) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + + if (EVP_EncryptUpdate(ctx, ciphertext, &len, plaintext, (int)plain_len) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + ciphertext_len = len; + if (EVP_EncryptFinal_ex(ctx, ciphertext + len, &len) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + ciphertext_len += len; + if (EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_GET_TAG, 16, tag) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + EVP_CIPHER_CTX_free(ctx); + + // Write file: MAGIC, VERSION, SALT, IV, TAG, CIPHERTEXT_LEN, CIPHERTEXT + const char magic[8] = { 'C','E','R','B','E','R','U','S' }; + uint32_t version = 1; + uint32_t clen = (uint32_t)ciphertext_len; + + if (fwrite(magic, 1, sizeof(magic), fp) != sizeof(magic) || + fwrite(&version, 1, sizeof(version), fp) != sizeof(version) || + fwrite(v->salt, 1, SALT_LEN, fp) != SALT_LEN || + fwrite(iv, 1, IV_LEN, fp) != IV_LEN || + fwrite(tag, 1, sizeof(tag), fp) != sizeof(tag) || + fwrite(&clen, 1, sizeof(clen), fp) != sizeof(clen) || + fwrite(ciphertext, 1, ciphertext_len, fp) != (size_t)ciphertext_len) { + free(plaintext); free(ciphertext); fclose(fp); return CERB_STORAGE_ERROR; + } + + free(plaintext); + free(ciphertext); + fclose(fp); + return CERB_OK; +} + +// Open vault from file +cerb_error_t cerb_vault_open(const char *master_password, const char *vault_path, cerb_vault_t **vault) { + if (!master_password || !vault_path || !vault) return CERB_INVALID_ARG; + FILE *fp = fopen(vault_path, "rb"); + if (!fp) return CERB_STORAGE_ERROR; + + const char expected_magic[8] = { 'C','E','R','B','E','R','U','S' }; + char magic[8]; + uint32_t version = 0; + unsigned char salt[SALT_LEN]; + unsigned char iv[IV_LEN]; + unsigned char tag[16]; + uint32_t clen = 0; + + if (fread(magic, 1, sizeof(magic), fp) != sizeof(magic) || + memcmp(magic, expected_magic, sizeof(magic)) != 0 || + fread(&version, 1, sizeof(version), fp) != sizeof(version) || + fread(salt, 1, SALT_LEN, fp) != SALT_LEN || + fread(iv, 1, IV_LEN, fp) != IV_LEN || + fread(tag, 1, sizeof(tag), fp) != sizeof(tag) || + fread(&clen, 1, sizeof(clen), fp) != sizeof(clen)) { + fclose(fp); + return CERB_STORAGE_ERROR; + } + + unsigned char *ciphertext = malloc(clen); + if (!ciphertext) { fclose(fp); return CERB_MEMORY_ERROR; } + if (fread(ciphertext, 1, clen, fp) != clen) { free(ciphertext); fclose(fp); return CERB_STORAGE_ERROR; } + fclose(fp); + + // Derive key + unsigned char key[KEY_LEN]; + if (!PKCS5_PBKDF2_HMAC(master_password, (int)strlen(master_password), + salt, SALT_LEN, PBKDF2_ITERATIONS, + EVP_sha256(), KEY_LEN, key)) { + free(ciphertext); + return CERB_CRYPTO_ERROR; + } + + // Decrypt + unsigned char *plaintext = malloc(clen); // ciphertext_len >= plaintext_len + if (!plaintext) { free(ciphertext); return CERB_MEMORY_ERROR; } + int len = 0, plain_len = 0; + cerb_error_t status = CERB_OK; + EVP_CIPHER_CTX *ctx = EVP_CIPHER_CTX_new(); + if (!ctx) { free(ciphertext); free(plaintext); return CERB_CRYPTO_ERROR; } + if (EVP_DecryptInit_ex(ctx, EVP_aes_256_gcm(), NULL, NULL, NULL) != 1) status = CERB_CRYPTO_ERROR; + if (status == CERB_OK && EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_IVLEN, IV_LEN, NULL) != 1) status = CERB_CRYPTO_ERROR; + if (status == CERB_OK && EVP_DecryptInit_ex(ctx, NULL, NULL, key, iv) != 1) status = CERB_CRYPTO_ERROR; + if (status == CERB_OK && EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, (int)clen) != 1) status = CERB_CRYPTO_ERROR; + plain_len = len; + if (status == CERB_OK && EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_TAG, 16, tag) != 1) status = CERB_CRYPTO_ERROR; + if (status == CERB_OK && EVP_DecryptFinal_ex(ctx, plaintext + len, &len) != 1) status = CERB_CRYPTO_ERROR; + plain_len += len; + EVP_CIPHER_CTX_free(ctx); + if (status != CERB_OK) { free(ciphertext); free(plaintext); return CERB_CRYPTO_ERROR; } + + // Deserialize + if ((size_t)plain_len < sizeof(uint32_t)) { free(ciphertext); free(plaintext); return CERB_STORAGE_ERROR; } + uint32_t n = 0; memcpy(&n, plaintext, sizeof(uint32_t)); + size_t expected = sizeof(uint32_t) + (size_t)n * sizeof(cerb_entry_t); + if ((size_t)plain_len != expected) { free(ciphertext); free(plaintext); return CERB_STORAGE_ERROR; } + + cerb_vault_internal_t *v = calloc(1, sizeof(cerb_vault_internal_t)); + if (!v) { free(ciphertext); free(plaintext); return CERB_MEMORY_ERROR; } + memcpy(v->salt, salt, SALT_LEN); + memcpy(v->key, key, KEY_LEN); + v->key_initialized = true; + v->capacity = n > 0 ? n : 32; + v->entries = calloc(v->capacity, sizeof(cerb_entry_t)); + if (!v->entries) { free(v); free(ciphertext); free(plaintext); return CERB_MEMORY_ERROR; } + v->num_entries = n; + if (n > 0) { + memcpy(v->entries, plaintext + sizeof(uint32_t), (size_t)n * sizeof(cerb_entry_t)); + } + + *vault = (cerb_vault_t *)v; + free(ciphertext); + free(plaintext); + return CERB_OK; +} + +// Add entry to vault +cerb_error_t cerb_vault_add_entry(cerb_vault_t *vault, const cerb_entry_t *entry) { + if (!vault || !entry) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + // Check for duplicates + for (size_t i = 0; i < v->num_entries; i++) { + if (strcmp(v->entries[i].id, entry->id) == 0) { + return CERB_DUPLICATE; + } + } + + // Resize if needed + if (v->num_entries >= v->capacity) { + size_t new_capacity = v->capacity * 2; + cerb_entry_t *new_entries = realloc(v->entries, new_capacity * sizeof(cerb_entry_t)); + if (!new_entries) return CERB_MEMORY_ERROR; + v->entries = new_entries; + v->capacity = new_capacity; + } + + // Add entry + v->entries[v->num_entries++] = *entry; + return CERB_OK; +} + +// Update existing entry +cerb_error_t cerb_vault_update_entry(cerb_vault_t *vault, const cerb_entry_t *entry) { + if (!vault || !entry) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + for (size_t i = 0; i < v->num_entries; i++) { + if (strcmp(v->entries[i].id, entry->id) == 0) { + v->entries[i] = *entry; + return CERB_OK; + } + } + + return CERB_NOT_FOUND; +} + +// Delete entry by ID +cerb_error_t cerb_vault_delete_entry(cerb_vault_t *vault, const char *entry_id) { + if (!vault || !entry_id) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + for (size_t i = 0; i < v->num_entries; i++) { + if (strcmp(v->entries[i].id, entry_id) == 0) { + // Move last entry into this slot to keep array compact + if (i != v->num_entries - 1) { + v->entries[i] = v->entries[v->num_entries - 1]; + } + memset(&v->entries[v->num_entries - 1], 0, sizeof(cerb_entry_t)); + v->num_entries--; + return CERB_OK; + } + } + + return CERB_NOT_FOUND; +} + +// Get entry by ID +cerb_error_t cerb_vault_get_entry(cerb_vault_t *vault, const char *entry_id, cerb_entry_t *entry) { + if (!vault || !entry_id || !entry) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + for (size_t i = 0; i < v->num_entries; i++) { + if (strcmp(v->entries[i].id, entry_id) == 0) { + *entry = v->entries[i]; + return CERB_OK; + } + } + + return CERB_NOT_FOUND; +} + +// Get all entries (returns a newly allocated array the caller must free) +cerb_error_t cerb_vault_get_entries(cerb_vault_t *vault, cerb_entry_t **entries, size_t *count) { + if (!vault || !entries || !count) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + if (v->num_entries == 0) { + *entries = NULL; + *count = 0; + return CERB_OK; + } + + cerb_entry_t *out = calloc(v->num_entries, sizeof(cerb_entry_t)); + if (!out) return CERB_MEMORY_ERROR; + + memcpy(out, v->entries, v->num_entries * sizeof(cerb_entry_t)); + *entries = out; + *count = v->num_entries; + return CERB_OK; +} + +// Basic substring search across website, username, and url +cerb_error_t cerb_vault_search(cerb_vault_t *vault, const char *query, cerb_entry_t **results, size_t *count) { + if (!vault || !results || !count) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + if (!query || *query == '\0') { + return cerb_vault_get_entries(vault, results, count); + } + + size_t matched = 0; + // First pass: count + for (size_t i = 0; i < v->num_entries; i++) { + if ((strstr(v->entries[i].website, query) != NULL) || + (strstr(v->entries[i].username, query) != NULL) || + (strstr(v->entries[i].url, query) != NULL)) { + matched++; + } + } + + if (matched == 0) { + *results = NULL; + *count = 0; + return CERB_OK; + } + + cerb_entry_t *out = calloc(matched, sizeof(cerb_entry_t)); + if (!out) return CERB_MEMORY_ERROR; + + size_t idx = 0; + for (size_t i = 0; i < v->num_entries; i++) { + if ((strstr(v->entries[i].website, query) != NULL) || + (strstr(v->entries[i].username, query) != NULL) || + (strstr(v->entries[i].url, query) != NULL)) { + out[idx++] = v->entries[i]; + } + } + + *results = out; + *count = matched; + return CERB_OK; +} + +// Generate password +cerb_error_t cerb_generate_password( + uint32_t length, + bool use_upper, + bool use_lower, + bool use_digits, + bool use_special, + char *buffer, + size_t buffer_size +) { + if (!buffer || length < 8 || length > MAX_PASSWORD_LEN || buffer_size < length + 1) { + return CERB_INVALID_ARG; + } + + const char *lower = "abcdefghijklmnopqrstuvwxyz"; + const char *upper = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + const char *digits = "0123456789"; + const char *special = "!@#$%^&*()-_=+[]{}|;:,.<>?"; + + char charset[256] = {0}; + size_t pos = 0; + + if (use_lower) { strcpy(charset + pos, lower); pos += strlen(lower); } + if (use_upper) { strcpy(charset + pos, upper); pos += strlen(upper); } + if (use_digits) { strcpy(charset + pos, digits); pos += strlen(digits); } + if (use_special) { strcpy(charset + pos, special); pos += strlen(special); } + + if (pos == 0) return CERB_INVALID_ARG; + + // Generate random password + for (size_t i = 0; i < length; i++) { + unsigned char byte; + do { + if (RAND_bytes(&byte, 1) != 1) { + return CERB_CRYPTO_ERROR; + } + } while (byte >= (256 / pos) * pos); + + buffer[i] = charset[byte % pos]; + } + + buffer[length] = '\0'; + return CERB_OK; +} + +// Generate UUID v4 (xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx) +void cerb_generate_uuid(char *uuid) { + unsigned char bytes[16]; + if (RAND_bytes(bytes, sizeof(bytes)) != 1) { + // Fallback to zeroed UUID on failure + memset(uuid, '0', 36); + uuid[8] = uuid[13] = uuid[18] = uuid[23] = '-'; + uuid[36] = '\0'; + return; + } + // Set version (4) + bytes[6] = (bytes[6] & 0x0F) | 0x40; + // Set variant (10xx) + bytes[8] = (bytes[8] & 0x3F) | 0x80; + + static const char *hex = "0123456789abcdef"; + int p = 0; + for (int i = 0; i < 16; i++) { + if (i == 4 || i == 6 || i == 8 || i == 10) { + uuid[p++] = '-'; + } + uuid[p++] = hex[(bytes[i] >> 4) & 0x0F]; + uuid[p++] = hex[bytes[i] & 0x0F]; + } + uuid[p] = '\0'; +} + +// Get current timestamp +time_t cerb_current_timestamp(void) { + return time(NULL); +} + +// Cleanup vault +void cerb_vault_close(cerb_vault_t *vault) { + if (!vault) return; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + // Securely wipe sensitive data + memset(v->key, 0, KEY_LEN); + memset(v->salt, 0, SALT_LEN); + + // Wipe entries + for (size_t i = 0; i < v->num_entries; i++) { + memset(&v->entries[i], 0, sizeof(cerb_entry_t)); + } + + free(v->entries); + free(v); +} diff --git a/core/cerberus.h b/core/cerberus.h new file mode 100644 index 0000000..26bd369 --- /dev/null +++ b/core/cerberus.h @@ -0,0 +1,74 @@ +#ifndef CERBERUS_H +#define CERBERUS_H + +#ifdef __cplusplus +extern "C" { +#endif + +// Public API for Cerberus C core +#include <stdint.h> +#include <stddef.h> +#include <time.h> +#include <stdbool.h> + +// Constants +#define SALT_LEN 16 +#define KEY_LEN 32 +#define IV_LEN 12 +#define PBKDF2_ITERATIONS 200000 +#define MAX_PASSWORD_LEN 256 + +// Error codes +typedef enum cerb_error_e { + CERB_OK = 0, + CERB_INVALID_ARG = 1, + CERB_CRYPTO_ERROR = 2, + CERB_MEMORY_ERROR = 3, + CERB_STORAGE_ERROR = 4, + CERB_DUPLICATE = 5, + CERB_NOT_FOUND = 6 +} cerb_error_t; + +// Password entry +typedef struct cerb_entry_s { + char id[37]; // UUID v4 (36 chars + NUL) + char website[128]; + char username[128]; + char url[256]; + char password[MAX_PASSWORD_LEN]; + time_t created_at; + time_t updated_at; + time_t last_used; +} cerb_entry_t; + +// Opaque vault type +typedef struct cerb_vault_s cerb_vault_t; + +// Crypto lifecycle +cerb_error_t cerb_crypto_init(void); +void cerb_crypto_cleanup(void); + +// Vault lifecycle +cerb_error_t cerb_vault_create(const char *master_password, cerb_vault_t **vault); +cerb_error_t cerb_vault_save(cerb_vault_t *vault, const char *vault_path); +cerb_error_t cerb_vault_open(const char *master_password, const char *vault_path, cerb_vault_t **vault); +void cerb_vault_close(cerb_vault_t *vault); + +// Vault CRUD +cerb_error_t cerb_vault_add_entry(cerb_vault_t *vault, const cerb_entry_t *entry); +cerb_error_t cerb_vault_update_entry(cerb_vault_t *vault, const cerb_entry_t *entry); +cerb_error_t cerb_vault_delete_entry(cerb_vault_t *vault, const char *entry_id); +cerb_error_t cerb_vault_get_entry(cerb_vault_t *vault, const char *entry_id, cerb_entry_t *entry); +cerb_error_t cerb_vault_get_entries(cerb_vault_t *vault, cerb_entry_t **entries, size_t *count); +cerb_error_t cerb_vault_search(cerb_vault_t *vault, const char *query, cerb_entry_t **results, size_t *count); + +// Utilities +cerb_error_t cerb_generate_password(uint32_t length, bool use_upper, bool use_lower, bool use_digits, bool use_special, char *buffer, size_t buffer_size); +void cerb_generate_uuid(char *uuid); +time_t cerb_current_timestamp(void); + +#ifdef __cplusplus +} +#endif + +#endif // CERBERUS_H diff --git a/install.sh b/install.sh new file mode 100755 index 0000000..8ed52ef --- /dev/null +++ b/install.sh @@ -0,0 +1,168 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Cerberus installer (development convenience) +# - Installs Python package with optional extras +# - Builds C core +# - Sets up Native Messaging for Firefox and Chrome (Linux) +# +# Options via env: +# CERB_EXTRAS="ui-tui,ui-gui,automation-selenium" (comma-separated) +# CERB_SKIP_BUILD=1 # Skip cmake build +# CERB_INSTALL_FF=1 # Install Firefox native host manifest +# CERB_INSTALL_CHROME=1 # Install Chrome native host manifest +# CERB_DATA_DIR=~/.cerberus + +ROOT_DIR="$(cd "$(dirname "$0")" && pwd)" +EXTRAS="${CERB_EXTRAS:-}" +VYPROJ="${ROOT_DIR}/pyproject.toml" +VENV_DIR="${ROOT_DIR}/.venv" +PIP_CMD="python3 -m pip" + +info() { echo -e "\033[1;34m[cerberus]\033[0m $*"; } +warn() { echo -e "\033[1;33m[cerberus]\033[0m $*"; } +err() { echo -e "\033[1;31m[cerberus]\033[0m $*"; } + +install_playwright_browsers() { + # If Playwright is installed in venv, install the browser binaries + if [[ -x "${VENV_DIR}/bin/playwright" ]]; then + info "Installing Playwright browsers (chromium, firefox, webkit)" + "${VENV_DIR}/bin/playwright" install || warn "Playwright browser install failed; you can run '${VENV_DIR}/bin/playwright install' later" + else + warn "Playwright CLI not found in venv; skipping browser install" + fi +} + +install_deps() { + # Install required system packages if a known package manager is available + if command -v apt-get >/dev/null 2>&1; then + info "Installing system dependencies with apt (may prompt for sudo password)" + if command -v sudo >/dev/null 2>&1; then + sudo apt-get update -y || true + sudo apt-get install -y cmake build-essential libssl-dev python3-venv || true + else + apt-get update -y || true + apt-get install -y cmake build-essential libssl-dev python3-venv || true + fi + elif command -v dnf >/dev/null 2>&1; then + warn "dnf detected. Please install: sudo dnf install cmake gcc gcc-c++ openssl-devel python3-virtualenv" + elif command -v yum >/dev/null 2>&1; then + warn "yum detected. Please install: sudo yum install cmake gcc gcc-c++ openssl-devel python3-virtualenv" + elif command -v pacman >/dev/null 2>&1; then + warn "pacman detected. Please install: sudo pacman -S cmake base-devel openssl python-virtualenv" + elif command -v brew >/dev/null 2>&1; then + warn "Homebrew detected. Please install: brew install cmake openssl@3 && ensure Python venv module is available" + else + warn "Unknown package manager. Please install cmake, a C toolchain, OpenSSL dev headers, and python3-venv manually." + fi +} + +ensure_venv() { + if [[ "${CERB_USE_VENV:-1}" == "1" ]]; then + if [[ ! -d "${VENV_DIR}" ]]; then + info "Creating virtual environment at ${VENV_DIR}" + if ! python3 -m venv "${VENV_DIR}" 2>/dev/null; then + err "Failed to create virtualenv. Please install python3-venv (e.g., sudo apt install python3.13-venv) and re-run." + exit 1 + fi + fi + PIP_CMD="${VENV_DIR}/bin/pip" + fi +} + +pip_install() { + local user_flag="" + if [[ "${CERB_PIP_USER:-}" == "1" ]]; then + user_flag="--user" + warn "Using --user install (CERB_PIP_USER=1)" + fi + if [[ -n "${EXTRAS}" ]]; then + info "Installing Python package with extras: ${EXTRAS}" + ${PIP_CMD} install ${user_flag} -e ".[${EXTRAS}]" + else + info "Installing Python package (base)" + ${PIP_CMD} install ${user_flag} -e . + fi +} + +build_c_core() { + if [[ "${CERB_SKIP_BUILD:-}" == "1" ]]; then + warn "Skipping C core build (CERB_SKIP_BUILD=1)" + return + fi + info "Building C core with CMake" + mkdir -p build + (cd build && cmake .. && make -j) + if command -v sudo >/dev/null 2>&1; then + warn "Attempting 'make install' (may prompt for sudo password)" + (cd build && sudo make install) || warn "'make install' failed; continuing" + else + warn "sudo not available; skipping 'make install'" + fi +} + +setup_native_firefox() { + local host_bin host_manifest target + local vhost="${VENV_DIR}/bin/cerberus-native-host" + if [[ -x "${vhost}" ]]; then + host_bin="${vhost}" + else + host_bin="$(command -v cerberus-native-host || true)" + fi + if [[ -z "${host_bin}" ]]; then + err "cerberus-native-host not found on PATH; ensure pip install succeeded" + return 1 + fi + host_manifest="${ROOT_DIR}/native/manifests/firefox_com.cerberus.pm.json" + target="$HOME/.mozilla/native-messaging-hosts/com.cerberus.pm.json" + mkdir -p "$(dirname "${target}")" + cp "${host_manifest}" "${target}" + # Replace path in manifest + sed -i "s#/usr/local/bin/cerberus-native-host#${host_bin//\//\\/}#" "${target}" + info "Installed Firefox native host manifest at ${target}" +} + +setup_native_chrome() { + local host_bin target dir + host_bin="$(command -v cerberus-native-host || true)" + if [[ -z "${host_bin}" ]]; then + err "cerberus-native-host not found on PATH; ensure pip install succeeded" + return 1 + fi + dir="$HOME/.config/google-chrome/NativeMessagingHosts" + target="${dir}/com.cerberus.pm.json" + mkdir -p "${dir}" + cat > "${target}" <<EOF +{ + "name": "com.cerberus.pm", + "description": "Cerberus Password Manager Native Messaging Host (dev)", + "path": "${host_bin}", + "type": "stdio", + "allowed_origins": [ + "chrome-extension://REPLACE_WITH_EXTENSION_ID/" + ] +} +EOF + info "Installed Chrome native host manifest at ${target}" + warn "Replace REPLACE_WITH_EXTENSION_ID with your unpacked extension ID in chrome://extensions" +} + +main() { + info "Starting Cerberus installation" + install_deps + ensure_venv + pip_install + install_playwright_browsers + build_c_core + + if [[ "${CERB_INSTALL_FF:-}" == "1" ]]; then + setup_native_firefox || true + fi + if [[ "${CERB_INSTALL_CHROME:-}" == "1" ]]; then + setup_native_chrome || true + fi + + info "Done. You can run: cerberus init, cerberus tui, cerberus gui" +} + +main "$@" @@ -1,581 +0,0 @@ -// TODO: Secret stored in memory must be encrypted with a key derived from the -// user password or consider making password manager stateless (no passwords -// stored locally) - -#include "main.h" - -struct Password passwords[MAX_PASSWORDS]; -int numPasswords = 0; - -FILE *log_file; - -void log_message(const char *message) { - if (log_file == NULL) { - printf("Error: Log file not available.\n"); - return; - } - - time_t current_time; - struct tm *time_info; - char time_string[80]; - - time(¤t_time); - time_info = localtime(¤t_time); - - strftime(time_string, sizeof(time_string), "[%Y-%m-%d %H:%M:%S] ", time_info); - fprintf(log_file, "%s %s\n", time_string, - message); // Print timestamp and message -} - -void initialize_log() { - log_file = fopen("cerberus.log", "a"); - if (log_file == NULL) { - printf("Error: Could not open log file.\n"); - } else { - log_message("=== cerberus log ==="); // Initial log message - } -} - -void close_log() { - if (log_file != NULL) { - fclose(log_file); - } -} - -void error_exit(const char *error_message) { - log_message(error_message); - if (log_file != NULL) { - fclose(log_file); - } - exit(1); -} - -void clear_input_buffer() { - int c; - while ((c = getchar()) != '\n' && c != EOF) - ; -} - -void check_password_policy(const char *password) { - int length = strlen(password); - if (length < 8) { - error_exit("Password must be at least 8 characters long."); - } - - bool has_upper = false, has_lower = false, has_digit = false, - has_special = false; - for (int i = 0; i < length; i++) { - if (isupper(password[i])) { - has_upper = true; - } else if (islower(password[i])) { - has_lower = true; - } else if (isdigit(password[i])) { - has_digit = true; - } else { - has_special = true; - } - } - - if (!has_upper || !has_lower || !has_digit || !has_special) { - error_exit("Password must contain at least one uppercase letter, one " - "lowercase letter, one digit, and one special character."); - } -} - -void generate_password(char *password, int length) { - if (length < 8 || length > MAX_PASSWORD_LENGTH - 1) { - printf("Invalid password length. Please enter a length between 8 and %d.\n", - MAX_PASSWORD_LENGTH - 1); - return; - } - - char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456" - "789!@#$%^&*()-_=+[]{}|;:',.<>?"; - int charset_size = strlen(charset); - - srand(time(NULL)); - - // Initialize flags to check if each required character type is included - bool has_upper = false, has_lower = false, has_digit = false, - has_special = false; - - // Create password with at least one character from each required type - password[0] = charset[rand() % 26]; // at least one uppercase letter - password[1] = charset[26 + rand() % 26]; // at least one lowercase letter - password[2] = charset[52 + rand() % 10]; // at least one digit - password[3] = charset[62 + rand() % 14]; // at least one special character - - // Fill the rest of the password randomly from the character set - for (int i = 4; i < length; i++) { - password[i] = charset[rand() % charset_size]; - } - - password[length] = '\0'; - - // Shuffle the password characters - for (int i = 0; i < length; i++) { - int j = rand() % length; - char temp = password[i]; - password[i] = password[j]; - password[j] = temp; - } - - printf("Generated Password: %s\n", password); -} - -void derive_key(unsigned char *key, const char *password, unsigned char *salt) { - PKCS5_PBKDF2_HMAC(password, strlen(password), salt, SALT_SIZE, - PBKDF2_ITERATIONS, EVP_sha256(), AES_KEY_SIZE, key); -} - -void encrypt_password(struct Password *password, const char *user_password) { - unsigned char iv[AES_BLOCK_SIZE]; - unsigned char encrypted_password[MAX_PASSWORD_LENGTH]; - - RAND_bytes(iv, AES_BLOCK_SIZE); - - unsigned char key[AES_KEY_SIZE]; - derive_key(key, user_password, iv); - - EVP_CIPHER_CTX *ctx = EVP_CIPHER_CTX_new(); - EVP_EncryptInit_ex(ctx, EVP_aes_256_gcm(), NULL, key, iv); - - int outlen, tmplen; - EVP_EncryptUpdate(ctx, encrypted_password, &outlen, - (unsigned char *)password->password, - strlen(password->password)); - EVP_EncryptFinal_ex(ctx, encrypted_password + outlen, &tmplen); - - memcpy(password->password, encrypted_password, outlen + tmplen); - memcpy(password->username, iv, AES_BLOCK_SIZE); - - EVP_CIPHER_CTX_free(ctx); -} - -void decrypt_password(struct Password *password, const char *user_password) { - unsigned char iv[AES_BLOCK_SIZE]; - unsigned char decrypted_password[MAX_PASSWORD_LENGTH]; - int decrypted_len = 0; - - memcpy(iv, password->username, AES_BLOCK_SIZE); - - unsigned char key[AES_KEY_SIZE]; - derive_key(key, user_password, iv); - - EVP_CIPHER_CTX *ctx = EVP_CIPHER_CTX_new(); - EVP_DecryptInit_ex(ctx, EVP_aes_256_gcm(), NULL, key, iv); - - int outlen, tmplen; - EVP_DecryptUpdate(ctx, decrypted_password, &outlen, - (unsigned char *)password->password, - strlen(password->password)); - EVP_DecryptFinal_ex(ctx, decrypted_password + outlen, &tmplen); - - decrypted_password[outlen + tmplen] = '\0'; - - strcpy(password->password, (char *)decrypted_password); - - EVP_CIPHER_CTX_free(ctx); -} - -void save_password(struct Password *password, const char *website, - const char *dir, const char *user_password) { - unsigned char iv[AES_BLOCK_SIZE]; - RAND_bytes(iv, AES_BLOCK_SIZE); - - unsigned char key[AES_KEY_SIZE]; - derive_key(key, user_password, iv); - - unsigned char encrypted_password[MAX_PASSWORD_LENGTH]; - EVP_CIPHER_CTX *ctx = EVP_CIPHER_CTX_new(); - EVP_EncryptInit_ex(ctx, EVP_aes_256_gcm(), NULL, key, iv); - - int outlen, tmplen; - EVP_EncryptUpdate(ctx, encrypted_password, &outlen, - (unsigned char *)password->password, - strlen(password->password)); - EVP_EncryptFinal_ex(ctx, encrypted_password + outlen, &tmplen); - - char filename[256]; - snprintf(filename, sizeof(filename), "%s/%s_%s.dat", dir, website, - password->username); - FILE *fp = fopen(filename, "wb"); - if (fp == NULL) { - printf("Error: could not open file %s for writing.\n", filename); - return; - } - fwrite(iv, sizeof(iv), 1, fp); - fwrite(encrypted_password, outlen + tmplen, 1, fp); - - fclose(fp); - EVP_CIPHER_CTX_free(ctx); -} - -void edit_password(struct Password *password) { - int choice; - printf("\nEdit Password:\n"); - printf("1. Website: %s\n", password->website); - printf("2. Username: %s\n", password->username); - printf("3. Password: %s\n", password->password); - printf("4. Cancel\n"); - printf("Enter your choice: "); - scanf("%d", &choice); - clear_input_buffer(); - - switch (choice) { - case 1: { - char old_filename[256]; - snprintf(old_filename, sizeof(old_filename), "tmp/%s_%s.dat", - password->website, password->username); - if (remove(old_filename) != 0) { - printf("Error deleting old password file.\n"); - } - - printf("Enter new website: "); - fgets(password->website, sizeof(password->website), stdin); - password->website[strcspn(password->website, "\n")] = - 0; // Remove newline character - printf("Website updated.\n"); - save_password(password, password->website, "tmp", "user_password"); - break; - } - case 2: { - char old_filename[256]; - snprintf(old_filename, sizeof(old_filename), "tmp/%s_%s.dat", - password->website, password->username); - if (remove(old_filename) != 0) { - printf("Error deleting old password file.\n"); - } - - printf("Enter new username: "); - fgets(password->username, sizeof(password->username), stdin); - password->username[strcspn(password->username, "\n")] = - 0; // Remove newline character - printf("Username updated.\n"); - save_password(password, password->website, "tmp", "user_password"); - break; - } - case 3: { - int length; - do { - printf("Enter new password length: "); - scanf("%d", &length); - clear_input_buffer(); - if (length < 8 || length > MAX_PASSWORD_LENGTH - 1) { - printf("Invalid password length. Please enter a length between 8 and " - "%d.\n", - MAX_PASSWORD_LENGTH - 1); - } - } while (length < 8 || length > MAX_PASSWORD_LENGTH - 1); - generate_password(password->password, length); - printf("Password updated.\n"); - save_password(password, password->website, "tmp", "user_password"); - break; - } - case 4: - printf("Canceled.\n"); - break; - default: - printf("Invalid choice.\n"); - break; - } -} - -void load_passwords(const char *dir, const char *user_password) { - printf("Available Passwords:\n"); - DIR *d; - struct dirent *dir_entry; - d = opendir(dir); - if (d) { - int count = 0; - while ((dir_entry = readdir(d)) != NULL) { - if (dir_entry->d_type == DT_REG) { // If it's a regular file - char filename[256]; - snprintf(filename, sizeof(filename), "%s/%s", dir, dir_entry->d_name); - FILE *file = fopen(filename, "rb"); - if (file != NULL) { - struct Password password; - - unsigned char iv[AES_BLOCK_SIZE]; - fread(iv, sizeof(iv), 1, file); - - unsigned char encrypted_password[MAX_PASSWORD_LENGTH]; - int len = fread(encrypted_password, 1, MAX_PASSWORD_LENGTH, file); - - unsigned char key[AES_KEY_SIZE]; - derive_key(key, user_password, iv); - - unsigned char decrypted_password[MAX_PASSWORD_LENGTH]; - EVP_CIPHER_CTX *ctx = EVP_CIPHER_CTX_new(); - EVP_DecryptInit_ex(ctx, EVP_aes_256_gcm(), NULL, key, iv); - - int outlen, tmplen; - EVP_DecryptUpdate(ctx, decrypted_password, &outlen, - encrypted_password, len); - EVP_DecryptFinal_ex(ctx, decrypted_password + outlen, &tmplen); - - decrypted_password[outlen + tmplen] = '\0'; - - fclose(file); - EVP_CIPHER_CTX_free(ctx); - - // Extract website and username from filename - char website[MAX_WEBSITE_LENGTH]; - char username[MAX_USERNAME_LENGTH]; - sscanf(dir_entry->d_name, "%[^_]_%[^.]", website, username); - - // Copy decrypted password to password field - strncpy(password.website, website, MAX_WEBSITE_LENGTH - 1); - password.website[MAX_WEBSITE_LENGTH - 1] = '\0'; - strncpy(password.username, username, MAX_USERNAME_LENGTH - 1); - password.username[MAX_USERNAME_LENGTH - 1] = '\0'; - strncpy(password.password, decrypted_password, - MAX_PASSWORD_LENGTH - 1); - password.password[MAX_PASSWORD_LENGTH - 1] = '\0'; - - passwords[count++] = password; - printf("%d. Website: %s, Username: %s\n", count, password.website, - password.username); - } - } - } - numPasswords = count; - closedir(d); - if (count == 0) { - printf("No passwords found.\n"); - } - } -} - -void copy_to_clipboard(char *text) { - // Use xclip command to copy text to clipboard - FILE *pipe = popen("xclip -selection clipboard", "w"); - if (pipe != NULL) { - fprintf(pipe, "%s", text); - pclose(pipe); - printf("Copied to clipboard: %s\n", text); - } else { - printf("Error: Could not copy to clipboard.\n"); - } -} - -void print_ascii_art(const char *filename) { - FILE *file = fopen(filename, "r"); - if (file == NULL) { - printf("Unable to open ASCII art file.\n"); - return; - } - - // Get console width - struct winsize w; - ioctl(STDOUT_FILENO, TIOCGWINSZ, &w); - int console_width = w.ws_col; - - char line[256]; - while (fgets(line, sizeof(line), file)) { - // Calculate padding for centering - int padding = (console_width - strlen(line)) / 2; - if (padding > 0) { - for (int i = 0; i < padding; i++) { - printf(" "); - } - } - printf("%s", line); - } - - fclose(file); -} - -int main() { - initialize_log(); - - const char *dir = "tmp"; // Directory to store password files - mkdir(dir, 0700); // Create directory if it doesn't exist - - char user_password[MAX_PASSWORD_LENGTH]; - - FILE *key_file = fopen("master.key", "rb"); - if (key_file == NULL) { - printf("Enter your new master password: "); - fgets(user_password, sizeof(user_password), stdin); - user_password[strcspn(user_password, "\n")] = 0; - - // Save master password to file - key_file = fopen("master.key", "wb"); - if (key_file == NULL) { - printf("Error: Could not create master password file.\n"); - return 1; - } - fwrite(user_password, sizeof(user_password), 1, key_file); - fclose(key_file); - } else { - // If master key exists, ask for the password - printf("Enter your master password: "); - fgets(user_password, sizeof(user_password), stdin); - user_password[strcspn(user_password, "\n")] = 0; // Remove newline character - - // Check if the entered password matches the stored master password - char stored_password[MAX_PASSWORD_LENGTH]; - fread(stored_password, sizeof(stored_password), 1, key_file); - fclose(key_file); - if (strcmp(user_password, stored_password) != 0) { - printf("Invalid master password. Exiting...\n"); - return 1; - } - } - - // Print ASCII art - print_ascii_art("ascii-art.txt"); - - int choice; - while (1) { - printf("\n1. Create New Password\n"); - printf("2. Show Passwords\n"); - printf("3. Exit\n"); - printf("Enter your choice: "); - scanf("%d", &choice); - clear_input_buffer(); // Clear input buffer after scanf - - switch (choice) { - case 1: { - struct Password password; - int length; - do { - printf("Enter password length: "); - scanf("%d", &length); - clear_input_buffer(); - if (length < 8 || length > MAX_PASSWORD_LENGTH - 1) { - printf("Invalid password length. Please enter a length between 8 and " - "%d.\n", - MAX_PASSWORD_LENGTH - 1); - } - } while (length < 8 || length > MAX_PASSWORD_LENGTH - 1); - generate_password(password.password, length); - printf("Enter website: "); - fgets(password.website, sizeof(password.website), stdin); - password.website[strcspn(password.website, "\n")] = - 0; // Remove newline character - printf("Enter username: "); - fgets(password.username, sizeof(password.username), stdin); - password.username[strcspn(password.username, "\n")] = - 0; // Remove newline character - save_password(&password, password.website, dir, user_password); - printf("Password information saved to %s\n", password.website); - break; - } - case 2: { - load_passwords(dir, user_password); - printf("Enter the number of the password to view or 0 to cancel: "); - int selection; - scanf("%d", &selection); - clear_input_buffer(); - if (selection > 0 && selection <= numPasswords) { - struct Password selected_password = passwords[selection - 1]; - printf("Selected Password:\n"); - printf("- Website: %s\n", selected_password.website); - printf("- Username: %s\n", selected_password.username); - printf("- Password: %s\n", selected_password.password); - printf("1. Copy Username and/or Password\n"); - printf("2. Edit the Password\n"); - printf("3. Delete the Password\n"); - printf("0. Cancel\n"); - printf("Enter your choice: "); - int copy_choice; - scanf("%d", ©_choice); - clear_input_buffer(); - switch (copy_choice) { - case 1: - // Provide options for copying username and/or password - printf("1. Copy Username and Password\n"); - printf("2. Copy just the Password\n"); - printf("3. Copy just the Username\n"); - printf("0. Cancel\n"); - printf("Enter your choice: "); - int sub_copy_choice; - scanf("%d", &sub_copy_choice); - clear_input_buffer(); - switch (sub_copy_choice) { - case 1: - // Copy both username and password - { - char copy_text[MAX_PASSWORD_LENGTH * 2 + - 2]; // Max length of both plus space - snprintf(copy_text, sizeof(copy_text), - "Username: %s, Password: %s", selected_password.username, - selected_password.password); - copy_to_clipboard(copy_text); - printf("Username and Password copied to clipboard.\n"); - } - break; - case 2: - // Copy just the password - copy_to_clipboard(selected_password.password); - printf("Password copied to clipboard.\n"); - break; - case 3: - // Copy just the username - copy_to_clipboard(selected_password.username); - printf("Username copied to clipboard.\n"); - break; - case 0: - printf("Canceled.\n"); - break; - default: - printf("Invalid choice.\n"); - } - break; - case 2: - edit_password(&selected_password); - save_password(&selected_password, selected_password.website, dir, - user_password); - printf("Password updated.\n"); - break; - case 3: { - struct Password selected_password = passwords[selection - 1]; - printf("Are you sure you want to delete this password? (y/n): "); - char confirm; - scanf(" %c", &confirm); - clear_input_buffer(); - if (confirm == 'y' || confirm == 'Y') { - char filename[256]; - snprintf(filename, sizeof(filename), "%s/%s_%s.dat", dir, - selected_password.website, selected_password.username); - if (remove(filename) != 0) { - printf("Error deleting password file.\n"); - } else { - printf("Password deleted.\n"); - // Shift remaining passwords to fill the gap - for (int i = selection - 1; i < numPasswords - 1; i++) { - passwords[i] = passwords[i + 1]; - } - numPasswords--; - } - } else { - printf("Password not deleted.\n"); - } - break; - } - - case 0: - printf("Canceled.\n"); - break; - default: - printf("Invalid choice.\n"); - } - } else if (selection != 0) { - printf("Invalid selection.\n"); - } - break; - } - case 3: - printf("Exiting program...\n"); - close_log(); - return 0; - default: - printf("Invalid choice. Please enter 1, 2, or 3.\n"); - } - } - - close_log(); - return 0; -} @@ -1,47 +0,0 @@ -#ifndef MAIN_H -#define MAIN_H - -#include <ctype.h> -#include <dirent.h> -#include <openssl/aes.h> -#include <openssl/err.h> -#include <openssl/evp.h> -#include <openssl/rand.h> -#include <stdbool.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <sys/io.h> -#include <sys/ioctl.h> -#include <sys/stat.h> -#include <time.h> -#include <unistd.h> - -#define SALT_SIZE 16 -#define PBKDF2_ITERATIONS 10000 -#define AES_KEY_SIZE 32 // AES-256 -#define AES_BLOCK_SIZE 16 - -#define MAX_PASSWORD_LENGTH 128 -#define MAX_WEBSITE_LENGTH 128 -#define MAX_USERNAME_LENGTH 128 -#define MAX_PASSWORDS 100 - -struct Password { - char website[MAX_WEBSITE_LENGTH]; - char username[MAX_USERNAME_LENGTH]; - char password[MAX_PASSWORD_LENGTH]; -}; - -extern struct Password passwords[MAX_PASSWORDS]; -extern int numPasswords; - -void generate_password(char *password, int length); -void save_password(struct Password *password, const char *website, - const char *dir, const char *user_password); -void edit_password(struct Password *password); -void load_passwords(const char *dir, const char *user_password); -void copy_to_clipboard(char *text); -void print_ascii_art(const char *filename); - -#endif diff --git a/native/manifests/chrome_com.cerberus.pm.json b/native/manifests/chrome_com.cerberus.pm.json new file mode 100644 index 0000000..7eec080 --- /dev/null +++ b/native/manifests/chrome_com.cerberus.pm.json @@ -0,0 +1,9 @@ +{ + "name": "com.cerberus.pm", + "description": "Cerberus Password Manager Native Messaging Host (dev)", + "path": "/usr/local/bin/cerberus-native-host", + "type": "stdio", + "allowed_origins": [ + "chrome-extension://REPLACE_WITH_EXTENSION_ID/" + ] +} diff --git a/native/manifests/firefox_com.cerberus.pm.json b/native/manifests/firefox_com.cerberus.pm.json new file mode 100644 index 0000000..fda6d6e --- /dev/null +++ b/native/manifests/firefox_com.cerberus.pm.json @@ -0,0 +1,9 @@ +{ + "name": "com.cerberus.pm", + "description": "Cerberus Password Manager Native Messaging Host (dev)", + "path": "/usr/local/bin/cerberus-native-host", + "type": "stdio", + "allowed_extensions": [ + "cerberus@example.com" + ] +} diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..27303f1 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,70 @@ +[build-system] +requires = ["setuptools>=42", "wheel", "cffi>=1.15.0", "cmake"] +build-backend = "setuptools.build_meta" + +[project] +name = "cerberus-pm" +version = "0.1.0" +description = "A secure password manager with a high-performance C core" +readme = "README.md" +requires-python = ">=3.8" +license = {text = "MIT"} +authors = [ + {name = "srdusr", email = "trevorgray@srdusr.com"} +] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: End Users/Desktop", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Topic :: Security", + "Topic :: Security :: Cryptography", +] +dependencies = [ + "cffi>=1.15.0", + "cryptography>=3.4.0", + "playwright>=1.15.0", + "python-dotenv>=0.19.0", + "rich>=13.0.0", + "click>=8.0.0", + "pyperclip>=1.8.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=6.0.0", + "black>=21.0", + "mypy>=0.9.0", + "flake8>=3.9.0", +] +automation-selenium = [ + "selenium>=4.10.0", +] +ui-tui = [ + "textual>=0.40.0", + "rich>=13.0.0", +] +ui-gui = [ + "PyQt6>=6.5.0", +] + +[project.scripts] +cerberus = "cerberus.cli.main:cli" +cerberus-gui = "cerberus.cli.main:gui" +cerberus-native-host = "cerberus.native.host:main" + +[tool.setuptools] +package-dir = {"" = "src"} +packages = { find = { where = ["src"], include = ["cerberus*"] } } +include-package-data = true + +[tool.setuptools.package-data] +"cerberus.core" = ["*.h", "*.c"] + +[tool.black] +line-length = 88 +target-version = ['py38'] diff --git a/src/cerberus/__init__.py b/src/cerberus/__init__.py new file mode 100644 index 0000000..2064933 --- /dev/null +++ b/src/cerberus/__init__.py @@ -0,0 +1,11 @@ +# Avoid importing heavy submodules at top-level to prevent side effects +__all__ = ["PasswordManager", "PasswordEntry"] + +def __getattr__(name): + if name == "PasswordManager": + from .core.password_manager import PasswordManager + return PasswordManager + if name == "PasswordEntry": + from .core.models import PasswordEntry + return PasswordEntry + raise AttributeError(name) diff --git a/src/cerberus/__pycache__/__init__.cpython-313.pyc b/src/cerberus/__pycache__/__init__.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..d06ce4e --- /dev/null +++ b/src/cerberus/__pycache__/__init__.cpython-313.pyc diff --git a/src/cerberus/automation/__init__.py b/src/cerberus/automation/__init__.py new file mode 100644 index 0000000..9defa53 --- /dev/null +++ b/src/cerberus/automation/__init__.py @@ -0,0 +1,19 @@ +"""Automation layer for password rotation via the web. + +This package provides abstractions and engines (Playwright/Selenium) to automate +site-specific password change flows, along with runners and policy helpers. +""" + +from .types import AutomationResult, AutomationStatus +from .engine import AutomationEngine +from .runner import RotationRunner, RotationSelector + +__all__ = [ + 'AutomationEngine', + 'AutomationResult', + 'AutomationStatus', + 'RotationRunner', + 'RotationSelector', +] + + diff --git a/src/cerberus/automation/__pycache__/__init__.cpython-313.pyc b/src/cerberus/automation/__pycache__/__init__.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..81e9bb0 --- /dev/null +++ b/src/cerberus/automation/__pycache__/__init__.cpython-313.pyc diff --git a/src/cerberus/automation/__pycache__/discovery.cpython-313.pyc b/src/cerberus/automation/__pycache__/discovery.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..a7be35d --- /dev/null +++ b/src/cerberus/automation/__pycache__/discovery.cpython-313.pyc diff --git a/src/cerberus/automation/__pycache__/engine.cpython-313.pyc b/src/cerberus/automation/__pycache__/engine.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..7d6272e --- /dev/null +++ b/src/cerberus/automation/__pycache__/engine.cpython-313.pyc diff --git a/src/cerberus/automation/__pycache__/playwright_engine.cpython-313.pyc b/src/cerberus/automation/__pycache__/playwright_engine.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..82a546b --- /dev/null +++ b/src/cerberus/automation/__pycache__/playwright_engine.cpython-313.pyc diff --git a/src/cerberus/automation/__pycache__/policy.cpython-313.pyc b/src/cerberus/automation/__pycache__/policy.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..5887ac4 --- /dev/null +++ b/src/cerberus/automation/__pycache__/policy.cpython-313.pyc diff --git a/src/cerberus/automation/__pycache__/runner.cpython-313.pyc b/src/cerberus/automation/__pycache__/runner.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..c8e90b2 --- /dev/null +++ b/src/cerberus/automation/__pycache__/runner.cpython-313.pyc diff --git a/src/cerberus/automation/__pycache__/selenium_engine.cpython-313.pyc b/src/cerberus/automation/__pycache__/selenium_engine.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..d14e6d7 --- /dev/null +++ b/src/cerberus/automation/__pycache__/selenium_engine.cpython-313.pyc diff --git a/src/cerberus/automation/__pycache__/types.cpython-313.pyc b/src/cerberus/automation/__pycache__/types.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..f2e48d4 --- /dev/null +++ b/src/cerberus/automation/__pycache__/types.cpython-313.pyc diff --git a/src/cerberus/automation/discovery.py b/src/cerberus/automation/discovery.py new file mode 100644 index 0000000..d03c3af --- /dev/null +++ b/src/cerberus/automation/discovery.py @@ -0,0 +1,252 @@ +""" +Heuristic discovery for password change and reset flows. + +This module attempts to dynamically locate "Change password" or "Forgot/Reset password" +paths and, when possible, automatically submit a password rotation using best-effort +selectors. It works with any engine that implements the AutomationEngine Protocol. +""" +from __future__ import annotations + +from dataclasses import dataclass +import logging +from typing import List, Optional, Dict, Any, Protocol + +from .types import AutomationResult, AutomationStatus +from ..core.models import PasswordEntry + +logger = logging.getLogger("cerberus") + + +DISCOVERY_KEYWORDS = [ + # English + "change password", + "password change", + "update password", + "reset password", + "forgot password", + "forgot your password", + "security", + "account settings", + # Common non-English hints (basic) + "contraseña", "senha", "mot de passe", + "passwort", "lozinka", "hasło", +] + +# Common input name/id candidates +OLD_PW_CANDIDATES = [ + "current_password", + "old_password", + "password_current", + "passwordOld", + "password_old", + "existing_password", +] +NEW_PW_CANDIDATES = [ + "new_password", + "password_new", + "password1", + "password", + "newPassword", +] +CONFIRM_PW_CANDIDATES = [ + "confirm_password", + "password_confirm", + "password2", + "confirmNewPassword", +] + +SUBMIT_CANDIDATES = [ + 'button[type="submit"]', + 'input[type="submit"]', + 'button.primary', + 'button.save', + 'button.update', +] + + +@dataclass +class DiscoveredEndpoint: + label: str + href: str + + +def _js_find_links_script() -> str: + # Returns JSON array of {text, href} + return ( + "(() => {" + " const matches = [];" + " const anchors = Array.from(document.querySelectorAll('a, button'));" + " const kws = new Set([" + ",".join([f"'{' '.join(k.split())}'" for k in DISCOVERY_KEYWORDS]) + "]);" + " for (const el of anchors) {" + " const text = (el.textContent || '').trim().toLowerCase();" + " const aria = (el.getAttribute('aria-label') || '').trim().toLowerCase();" + " const title = (el.getAttribute('title') || '').trim().toLowerCase();" + " for (const kw of kws) {" + " if (text.includes(kw) || aria.includes(kw) || title.includes(kw)) {" + " const href = el.getAttribute('href') || '';" + " matches.push({text, href});" + " break;" + " }" + " }" + " return matches;" + "})()" + ) + + +def discover_password_change(engine, base_url: Optional[str]) -> List[DiscoveredEndpoint]: + """Attempt to discover password change or reset endpoints from a base URL. + + Heuristic approach: scan DOM for anchors/buttons whose text matches discovery keywords. + """ + try: + # Ensure we're on the base URL first + if base_url: + engine.goto(base_url) + logger.debug("[discovery] scanning links/buttons for keywords") + except Exception: + logger.debug("[discovery] failed to navigate to base URL") + matches = engine.evaluate(_js_find_links_script()) + endpoints: List[DiscoveredEndpoint] = [] + if isinstance(matches, list): + for m in matches: + text = (m.get("text") or "").strip() + href = (m.get("href") or "").strip() + if href: + endpoints.append(DiscoveredEndpoint(text=text, href=href)) + # Deduplicate by href + unique: Dict[str, DiscoveredEndpoint] = {} + for e in endpoints: + unique[e.href] = e + logger.debug(f"[discovery] found {len(unique)} unique endpoints") + return list(unique.values()) + + +def _try_type(engine, selector: str, value: str) -> bool: + try: + engine.wait_for(selector, timeout_ms=1000) + engine.type(selector, value) + logger.debug(f"[discovery] typed into {selector}") + return True + except Exception: + logger.debug(f"[discovery] could not type into {selector}") + return False + + +def _try_click(engine, selector: str) -> bool: + try: + engine.wait_for(selector, timeout_ms=1000) + engine.click(selector) + logger.debug(f"[discovery] clicked {selector}") + return True + except Exception: + logger.debug(f"[discovery] could not click {selector}") + return False + + +def _try_login_if_present(engine, entry: PasswordEntry) -> bool: + """Best-effort login if a login form is present on the current page.""" + try: + has_form = engine.evaluate( + "(() => { const p = document.querySelector('form input[type=\\'password\\']'); return !!p; })()" + ) + except Exception: + has_form = False + if not has_form: + logger.debug("[discovery] no login form present on page") + return False + candidates_user = [ + "input[type='email']", + "input[type='text']", + "input[name*='user']", + "input[id*='user']", + "input[name*='email']", + "input[id*='email']", + "input[name*='login']", + "input[id*='login']", + ] + _ = any(_try_type(engine, sel, entry.username) for sel in candidates_user) + typed_pass = _try_type(engine, "input[type='password']", entry.password) + submitted = any(_try_click(engine, sel) for sel in SUBMIT_CANDIDATES) + if not submitted: + try: + engine.evaluate("(() => { const p = document.querySelector('input[type=\\'password\\']'); if (p && p.form) { p.form.submit(); return true;} return false; })()") + submitted = True + except Exception: + pass + ok = typed_pass and submitted + logger.debug(f"[discovery] login attempt result ok={ok}") + return ok + + +def try_submit_password_change(engine, entry: PasswordEntry, new_password: str) -> AutomationResult: + """Attempt to submit a password change on the current page using common selectors.""" + def candidates(names: List[str]) -> List[str]: + sels: List[str] = [] + for n in names: + sels.append(f"input[name='{n}']") + sels.append(f"input[id='{n}']") + sels.append(f"input[type='password'][name='{n}']") + sels.append(f"input[type='password'][id='{n}']") + sels.append(f"input[placeholder*='{n}']") + sels.append(f"input[aria-label*='{n}']") + return sels + + success_old = any(_try_type(engine, sel, entry.password) for sel in candidates(OLD_PW_CANDIDATES)) + success_new = any(_try_type(engine, sel, new_password) for sel in candidates(NEW_PW_CANDIDATES)) + success_confirm = any(_try_type(engine, sel, new_password) for sel in candidates(CONFIRM_PW_CANDIDATES)) or success_new + + submitted = any(_try_click(engine, sel) for sel in SUBMIT_CANDIDATES) + if not submitted: + try: + engine.evaluate("(() => { const el = document.querySelector('input[type=\\'password\\']'); if (el) { el.dispatchEvent(new KeyboardEvent('keydown', {key: 'Enter', bubbles: true})); el.form && el.form.submit && el.form.submit(); return true;} return false; })()") + submitted = True + except Exception: + pass + + if (success_new and success_confirm) and (submitted or success_old): + return AutomationResult(status=AutomationStatus.SUCCESS, message="Submitted change attempt") + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Could not auto-submit on page") + + +def auto_change_flow(engine, entry: PasswordEntry, new_password: str) -> AutomationResult: + """End-to-end best-effort flow: optional login, discover change endpoint, attempt update.""" + base = entry.url or (("https://" + entry.website) if entry.website and not entry.website.startswith("http") else entry.website) or "" + if base: + try: + logger.debug(f"[discovery] navigating to base URL: {base}") + engine.goto(base) + _try_login_if_present(engine, entry) + except Exception: + logger.debug("[discovery] failed to navigate or login at base URL") + + endpoints = discover_password_change(engine, base if base else None) + logger.debug(f"[discovery] candidate endpoints: {len(endpoints)}") + + # Also try common guesses relative to base + guesses = [ + "/account/security", + "/settings/security", + "/settings/password", + "/profile/security", + "/user/security", + ] + for g in guesses: + endpoints.append(DiscoveredEndpoint(label=f"guess:{g}", href=(base + g) if base else g)) + + for ep in endpoints: + try: + target = ep.href + if not target.startswith("http") and base: + target = base + ep.href + if not target: + continue + logger.debug(f"[discovery] trying endpoint: {target}") + engine.goto(target) + res = try_submit_password_change(engine, entry, new_password) + if res.status == AutomationStatus.SUCCESS: + return res + except Exception: + logger.debug("[discovery] endpoint attempt failed, trying next") + continue + + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="No automated flow succeeded") diff --git a/src/cerberus/automation/engine.py b/src/cerberus/automation/engine.py new file mode 100644 index 0000000..ade3e53 --- /dev/null +++ b/src/cerberus/automation/engine.py @@ -0,0 +1,26 @@ +from typing import Protocol, Optional, Dict, Any + + +class AutomationEngine(Protocol): + def start(self, headless: bool = True, user_data_dir: Optional[str] = None) -> None: + ... + + def stop(self) -> None: + ... + + def goto(self, url: str, wait_until: str = "networkidle") -> None: + ... + + def type(self, selector: str, value: str, clear: bool = True) -> None: + ... + + def click(self, selector: str) -> None: + ... + + def wait_for(self, selector: str, timeout_ms: int = 15000) -> None: + ... + + def evaluate(self, script: str, arg: Optional[Dict[str, Any]] = None) -> Any: + ... + + diff --git a/src/cerberus/automation/playwright_engine.py b/src/cerberus/automation/playwright_engine.py new file mode 100644 index 0000000..a61920b --- /dev/null +++ b/src/cerberus/automation/playwright_engine.py @@ -0,0 +1,60 @@ +from typing import Optional, Dict, Any + +from playwright.sync_api import sync_playwright, Page, Browser, BrowserContext + + +class PlaywrightEngine: + def __init__(self): + self._pw = None + self._browser: Optional[Browser] = None + self._context: Optional[BrowserContext] = None + self._page: Optional[Page] = None + + def start(self, headless: bool = True, user_data_dir: Optional[str] = None) -> None: + self._pw = sync_playwright().start() + launch_args = {"headless": headless} + if user_data_dir: + self._context = self._pw.chromium.launch_persistent_context(user_data_dir, **launch_args) + pages = self._context.pages + self._page = pages[0] if pages else self._context.new_page() + else: + self._browser = self._pw.chromium.launch(**launch_args) + self._context = self._browser.new_context() + self._page = self._context.new_page() + + def stop(self) -> None: + if self._context: + self._context.close() + if self._browser: + self._browser.close() + if self._pw: + self._pw.stop() + self._page = None + self._context = None + self._browser = None + self._pw = None + + def goto(self, url: str, wait_until: str = "networkidle") -> None: + assert self._page is not None + self._page.goto(url, wait_until=wait_until) + + def type(self, selector: str, value: str, clear: bool = True) -> None: + assert self._page is not None + locator = self._page.locator(selector) + if clear: + locator.fill("") + locator.type(value) + + def click(self, selector: str) -> None: + assert self._page is not None + self._page.click(selector) + + def wait_for(self, selector: str, timeout_ms: int = 15000) -> None: + assert self._page is not None + self._page.wait_for_selector(selector, timeout=timeout_ms) + + def evaluate(self, script: str, arg: Optional[Dict[str, Any]] = None) -> Any: + assert self._page is not None + return self._page.evaluate(script, arg) + + diff --git a/src/cerberus/automation/policy.py b/src/cerberus/automation/policy.py new file mode 100644 index 0000000..e4bab80 --- /dev/null +++ b/src/cerberus/automation/policy.py @@ -0,0 +1,25 @@ +from typing import Dict + +from ..core.password_manager import PasswordManager +from ..core.models import PasswordEntry + + +SITE_POLICY: Dict[str, Dict] = { + # domain_substring: policy + "github.com": {"length": 20, "use_upper": True, "use_lower": True, "use_digits": True, "use_special": True}, + "google.com": {"length": 20, "use_upper": True, "use_lower": True, "use_digits": True, "use_special": False}, +} + + +def generate_for_entry(pm: PasswordManager, entry: PasswordEntry) -> str: + url = entry.url or entry.website or "" + selected = None + for key, policy in SITE_POLICY.items(): + if key in url: + selected = policy + break + if not selected: + selected = {"length": 20, "use_upper": True, "use_lower": True, "use_digits": True, "use_special": True} + return pm.generate_password(**selected) + + diff --git a/src/cerberus/automation/runner.py b/src/cerberus/automation/runner.py new file mode 100644 index 0000000..841e89e --- /dev/null +++ b/src/cerberus/automation/runner.py @@ -0,0 +1,59 @@ +from dataclasses import dataclass +from typing import List, Optional, Callable +from datetime import datetime + +from ..core.password_manager import PasswordManager +from ..core.models import PasswordEntry +from .types import AutomationResult, AutomationStatus +from .discovery import auto_change_flow + + +@dataclass +class RotationSelector: + all: bool = False + compromised_only: bool = False + tag: Optional[str] = None + domain: Optional[str] = None + + +class RotationRunner: + def __init__(self, engine, site_flows: List, password_manager: PasswordManager): + self.engine = engine + self.site_flows = site_flows + self.pm = password_manager + + def _filter_entries(self, selector: RotationSelector) -> List[PasswordEntry]: + entries = self.pm.list_passwords() + filtered: List[PasswordEntry] = [] + for e in entries: + if selector.compromised_only and not e.compromised: + continue + if selector.tag and selector.tag not in (e.tags or []): + continue + if selector.domain and selector.domain not in (e.url or e.website or ""): + continue + filtered.append(e) + return filtered + + def rotate(self, selector: RotationSelector, generate_password: Callable[[PasswordEntry], str], dry_run: bool = True) -> List[AutomationResult]: + results: List[AutomationResult] = [] + targets = self._filter_entries(selector) + for entry in targets: + new_password = generate_password(entry) + if dry_run: + results.append(AutomationResult(status=AutomationStatus.SUCCESS, message="dry-run", changed_at=datetime.utcnow())) + continue + + flow = next((f for f in self.site_flows if f.match(entry)), None) + if flow: + res = flow.perform_change(self.engine, entry, new_password) + else: + # Fallback to heuristic auto discovery/change + res = auto_change_flow(self.engine, entry, new_password) + if res.status == AutomationStatus.SUCCESS: + self.pm.update_password(entry.id, password=new_password) + results.append(res) + + return results + + diff --git a/src/cerberus/automation/selenium_engine.py b/src/cerberus/automation/selenium_engine.py new file mode 100644 index 0000000..3fbdaac --- /dev/null +++ b/src/cerberus/automation/selenium_engine.py @@ -0,0 +1,64 @@ +from typing import Optional, Dict, Any + +try: + from selenium import webdriver + from selenium.webdriver.common.by import By + from selenium.webdriver.chrome.options import Options as ChromeOptions + SELENIUM_AVAILABLE = True +except Exception: # pragma: no cover - optional dependency + SELENIUM_AVAILABLE = False + + +class SeleniumEngine: + def __init__(self): + if not SELENIUM_AVAILABLE: + raise RuntimeError("Selenium is not installed. Install with: pip install selenium") + self._driver: Optional[webdriver.Chrome] = None + + def start(self, headless: bool = True, user_data_dir: Optional[str] = None) -> None: + options = ChromeOptions() + if headless: + options.add_argument("--headless=new") + if user_data_dir: + options.add_argument(f"--user-data-dir={user_data_dir}") + self._driver = webdriver.Chrome(options=options) + + def stop(self) -> None: + if self._driver: + self._driver.quit() + self._driver = None + + def goto(self, url: str, wait_until: str = "networkidle") -> None: + assert self._driver is not None + self._driver.get(url) + + def type(self, selector: str, value: str, clear: bool = True) -> None: + assert self._driver is not None + elem = self._driver.find_element(By.CSS_SELECTOR, selector) + if clear: + elem.clear() + elem.send_keys(value) + + def click(self, selector: str) -> None: + assert self._driver is not None + elem = self._driver.find_element(By.CSS_SELECTOR, selector) + elem.click() + + def wait_for(self, selector: str, timeout_ms: int = 15000) -> None: + assert self._driver is not None + # Simple polling; users can replace with WebDriverWait if desired + import time + end = time.time() + timeout_ms / 1000.0 + while time.time() < end: + try: + self._driver.find_element(By.CSS_SELECTOR, selector) + return + except Exception: + time.sleep(0.1) + raise TimeoutError(f"Timeout waiting for selector: {selector}") + + def evaluate(self, script: str, arg: Optional[Dict[str, Any]] = None) -> Any: + assert self._driver is not None + return self._driver.execute_script(script, arg) + + diff --git a/src/cerberus/automation/sites/__pycache__/apple.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/apple.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..65c1943 --- /dev/null +++ b/src/cerberus/automation/sites/__pycache__/apple.cpython-313.pyc diff --git a/src/cerberus/automation/sites/__pycache__/base_site.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/base_site.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..28a72e3 --- /dev/null +++ b/src/cerberus/automation/sites/__pycache__/base_site.cpython-313.pyc diff --git a/src/cerberus/automation/sites/__pycache__/facebook.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/facebook.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..fabaf7c --- /dev/null +++ b/src/cerberus/automation/sites/__pycache__/facebook.cpython-313.pyc diff --git a/src/cerberus/automation/sites/__pycache__/github.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/github.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..776dc1c --- /dev/null +++ b/src/cerberus/automation/sites/__pycache__/github.cpython-313.pyc diff --git a/src/cerberus/automation/sites/__pycache__/google.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/google.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..ca05800 --- /dev/null +++ b/src/cerberus/automation/sites/__pycache__/google.cpython-313.pyc diff --git a/src/cerberus/automation/sites/__pycache__/linkedin.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/linkedin.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..56f9f97 --- /dev/null +++ b/src/cerberus/automation/sites/__pycache__/linkedin.cpython-313.pyc diff --git a/src/cerberus/automation/sites/__pycache__/microsoft.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/microsoft.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..4b579a5 --- /dev/null +++ b/src/cerberus/automation/sites/__pycache__/microsoft.cpython-313.pyc diff --git a/src/cerberus/automation/sites/__pycache__/twitter.cpython-313.pyc b/src/cerberus/automation/sites/__pycache__/twitter.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..976b44f --- /dev/null +++ b/src/cerberus/automation/sites/__pycache__/twitter.cpython-313.pyc diff --git a/src/cerberus/automation/sites/apple.py b/src/cerberus/automation/sites/apple.py new file mode 100644 index 0000000..1f32946 --- /dev/null +++ b/src/cerberus/automation/sites/apple.py @@ -0,0 +1,64 @@ +from typing import Optional, Dict, Any +from datetime import datetime + +from ...core.models import PasswordEntry +from ..types import AutomationResult, AutomationStatus +from .base_site import SiteFlow + + +class AppleFlow(SiteFlow): + def match(self, entry: PasswordEntry) -> bool: + target = (entry.url or entry.website or "").lower() + return "apple.com" in target or "appleid.apple.com" in target + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + try: + # Apple frequently enforces MFA; treat as best-effort stub + engine.goto("https://appleid.apple.com/") + engine.wait_for("input[type=email], input[name=email], input[id=email]") + try: + engine.type("input[type=email], input[name=email], input[id=email]", entry.username) + except Exception: + pass + try: + engine.wait_for("input[type=password]", timeout_ms=8000) + engine.type("input[type=password]", entry.password) + except Exception: + return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="Apple ID requires MFA or device approval") + + # Direct to password change if possible (likely gated by MFA) + engine.goto("https://appleid.apple.com/account/manage/password") + try: + engine.wait_for("input[type=password]", timeout_ms=8000) + except Exception: + return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="Password settings gated by MFA") + + # Attempt to fill current/new/confirm + for sel in ["input[name='currentPassword']", "input[id='currentPassword']", "input[type='password']"]: + try: + engine.type(sel, entry.password) + break + except Exception: + continue + for sel in ["input[name='newPassword']", "input[id='newPassword']"]: + try: + engine.type(sel, new_password) + break + except Exception: + continue + for sel in ["input[name='confirmPassword']", "input[id='confirmPassword']"]: + try: + engine.type(sel, new_password) + break + except Exception: + continue + for sel in ["button[type=submit]", "button"]: + try: + engine.click(sel) + break + except Exception: + continue + + return AutomationResult(status=AutomationStatus.SUCCESS, message="Password change attempted", changed_at=datetime.utcnow()) + except Exception as e: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e)) diff --git a/src/cerberus/automation/sites/base_site.py b/src/cerberus/automation/sites/base_site.py new file mode 100644 index 0000000..a33dc5b --- /dev/null +++ b/src/cerberus/automation/sites/base_site.py @@ -0,0 +1,14 @@ +from typing import Protocol, Optional, Dict, Any + +from ...core.models import PasswordEntry +from ..types import AutomationResult + + +class SiteFlow(Protocol): + def match(self, entry: PasswordEntry) -> bool: + ... + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + ... + + diff --git a/src/cerberus/automation/sites/facebook.py b/src/cerberus/automation/sites/facebook.py new file mode 100644 index 0000000..f7b0535 --- /dev/null +++ b/src/cerberus/automation/sites/facebook.py @@ -0,0 +1,56 @@ +from typing import Optional, Dict, Any +from datetime import datetime + +from ...core.models import PasswordEntry +from ..types import AutomationResult, AutomationStatus +from .base_site import SiteFlow + + +class FacebookFlow(SiteFlow): + def match(self, entry: PasswordEntry) -> bool: + target = (entry.url or entry.website or "").lower() + return "facebook.com" in target or "fb.com" in target + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + try: + engine.goto("https://www.facebook.com/login/") + engine.wait_for("input[name=email]") + engine.type("input[name=email]", entry.username) + engine.type("input[name=pass]", entry.password) + engine.click("button[name=login]") + + # Password settings + engine.goto("https://www.facebook.com/settings?tab=security") + # New UI is dynamic; try common selectors + try: + engine.wait_for("input[type=password]", timeout_ms=8000) + except Exception: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Security page requires interaction or MFA") + + # Attempt common current/new/confirm fields + try: + engine.type("input[name='current_password']", entry.password) + except Exception: + pass + for sel in ["input[name='new_password']", "input[name='password_new']", "input[name='new']"]: + try: + engine.type(sel, new_password) + break + except Exception: + continue + for sel in ["input[name='confirm_password']", "input[name='password_confirm']", "input[name='confirm']"]: + try: + engine.type(sel, new_password) + break + except Exception: + continue + for sel in ["button[type=submit]", "[data-testid='sec_settings_save']", "button"]: + try: + engine.click(sel) + break + except Exception: + continue + + return AutomationResult(status=AutomationStatus.SUCCESS, message="Password change attempted", changed_at=datetime.utcnow()) + except Exception as e: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e)) diff --git a/src/cerberus/automation/sites/github.py b/src/cerberus/automation/sites/github.py new file mode 100644 index 0000000..36e25f2 --- /dev/null +++ b/src/cerberus/automation/sites/github.py @@ -0,0 +1,53 @@ +from typing import Optional, Dict, Any +from datetime import datetime + +from ...core.models import PasswordEntry +from ..types import AutomationResult, AutomationStatus +from .base_site import SiteFlow + + +class GithubFlow(SiteFlow): + def match(self, entry: PasswordEntry) -> bool: + target = (entry.url or entry.website or "").lower() + return "github.com" in target + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + try: + # Login + engine.goto("https://github.com/login") + engine.wait_for("input#login_field") + engine.type("input#login_field", entry.username) + engine.type("input#password", entry.password) + engine.click("input[type=submit]") + + # Detect 2FA requirement + try: + engine.wait_for("input#otp", timeout_ms=3000) + return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="2FA required") + except Exception: + pass + + # Navigate to password change + engine.goto("https://github.com/settings/security") + engine.wait_for("a[href='/settings/password']") + engine.click("a[href='/settings/password']") + + engine.wait_for("input#old_password") + engine.type("input#old_password", entry.password) + engine.type("input#new_password", new_password) + engine.type("input#confirm_new_password", new_password) + engine.click("button[type=submit]") + + # Verify success: check for flash notice + try: + engine.wait_for(".flash-notice, .flash-success", timeout_ms=5000) + except Exception: + # As fallback, assume success if no error shown + pass + + return AutomationResult(status=AutomationStatus.SUCCESS, message="Password changed", changed_at=datetime.utcnow()) + + except Exception as e: + return AutomationResult(status=AutomationStatus.FAILED, message="Error during change", error=str(e)) + + diff --git a/src/cerberus/automation/sites/google.py b/src/cerberus/automation/sites/google.py new file mode 100644 index 0000000..4d651b7 --- /dev/null +++ b/src/cerberus/automation/sites/google.py @@ -0,0 +1,45 @@ +from typing import Optional, Dict, Any +from datetime import datetime + +from ...core.models import PasswordEntry +from ..types import AutomationResult, AutomationStatus +from .base_site import SiteFlow + + +class GoogleFlow(SiteFlow): + def match(self, entry: PasswordEntry) -> bool: + target = (entry.url or entry.website or "").lower() + return "google.com" in target + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + try: + # Login flow + engine.goto("https://accounts.google.com/signin/v2/identifier") + engine.wait_for("input[type=email], input#identifierId") + try: + engine.type("input[type=email], input#identifierId", entry.username) + except Exception: + pass + engine.click("#identifierNext, button[type=submit]") + try: + engine.wait_for("input[type=password]", timeout_ms=7000) + except Exception: + return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="MFA or challenge present") + engine.type("input[type=password]", entry.password) + engine.click("#passwordNext, button[type=submit]") + + # Security settings - direct link (subject to change) + engine.goto("https://myaccount.google.com/signinoptions/password") + engine.wait_for("input[type=password]") + engine.type("input[type=password]", entry.password) + engine.click("button[type=submit], #passwordNext") + + # New password fields + engine.wait_for("input[name=password]") + engine.type("input[name=password]", new_password) + engine.type("input[name=confirmation_password]", new_password) + engine.click("button[type=submit]") + + return AutomationResult(status=AutomationStatus.SUCCESS, message="Password changed", changed_at=datetime.utcnow()) + except Exception as e: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e)) diff --git a/src/cerberus/automation/sites/linkedin.py b/src/cerberus/automation/sites/linkedin.py new file mode 100644 index 0000000..cb8ecea --- /dev/null +++ b/src/cerberus/automation/sites/linkedin.py @@ -0,0 +1,56 @@ +from typing import Optional, Dict, Any +from datetime import datetime + +from ...core.models import PasswordEntry +from ..types import AutomationResult, AutomationStatus +from .base_site import SiteFlow + + +class LinkedInFlow(SiteFlow): + def match(self, entry: PasswordEntry) -> bool: + target = (entry.url or entry.website or "").lower() + return "linkedin.com" in target + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + try: + engine.goto("https://www.linkedin.com/login") + engine.wait_for("input#username") + engine.type("input#username", entry.username) + engine.type("input#password", entry.password) + engine.click("button[type=submit]") + + engine.goto("https://www.linkedin.com/psettings/change-password") + try: + engine.wait_for("input[type=password]", timeout_ms=8000) + except Exception: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Password settings gated or MFA required") + + # Fill current/new/confirm + for sel in ["input[name='currentPassword']", "input#current-password", "input[type='password']"]: + try: + engine.type(sel, entry.password) + break + except Exception: + continue + for sel in ["input[name='newPassword']", "input#new-password"]: + try: + engine.type(sel, new_password) + break + except Exception: + continue + for sel in ["input[name='confirmPassword']", "input#confirm-password"]: + try: + engine.type(sel, new_password) + break + except Exception: + continue + for sel in ["button[type=submit]", "button"]: + try: + engine.click(sel) + break + except Exception: + continue + + return AutomationResult(status=AutomationStatus.SUCCESS, message="Password change attempted", changed_at=datetime.utcnow()) + except Exception as e: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e)) diff --git a/src/cerberus/automation/sites/microsoft.py b/src/cerberus/automation/sites/microsoft.py new file mode 100644 index 0000000..8ddee76 --- /dev/null +++ b/src/cerberus/automation/sites/microsoft.py @@ -0,0 +1,40 @@ +from typing import Optional, Dict, Any +from datetime import datetime + +from ...core.models import PasswordEntry +from ..types import AutomationResult, AutomationStatus +from .base_site import SiteFlow + + +class MicrosoftFlow(SiteFlow): + def match(self, entry: PasswordEntry) -> bool: + target = (entry.url or entry.website or "").lower() + return "microsoft.com" in target or "live.com" in target or "office.com" in target + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + try: + # Login page + engine.goto("https://login.live.com/") + engine.wait_for("input[type=email], input[name=loginfmt]") + engine.type("input[type=email], input[name=loginfmt]", entry.username) + engine.click("input[type=submit], button[type=submit]") + + # Password step + try: + engine.wait_for("input[type=password]", timeout_ms=8000) + except Exception: + return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="MFA or challenge present") + engine.type("input[type=password]", entry.password) + engine.click("input[type=submit], button[type=submit]") + + # Navigate to security/password change + engine.goto("https://account.live.com/password/change") + engine.wait_for("input[name=OldPassword], input[name=CurrentPassword]") + engine.type("input[name=OldPassword], input[name=CurrentPassword]", entry.password) + engine.type("input[name=NewPassword], input[name=NewPasswordBox]", new_password) + engine.type("input[name=ConfirmPassword], input[name=ConfirmNewPasswordBox]", new_password) + engine.click("button[type=submit], input[type=submit]") + + return AutomationResult(status=AutomationStatus.SUCCESS, message="Password changed", changed_at=datetime.utcnow()) + except Exception as e: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e)) diff --git a/src/cerberus/automation/sites/twitter.py b/src/cerberus/automation/sites/twitter.py new file mode 100644 index 0000000..609eda7 --- /dev/null +++ b/src/cerberus/automation/sites/twitter.py @@ -0,0 +1,42 @@ +from typing import Optional, Dict, Any +from datetime import datetime + +from ...core.models import PasswordEntry +from ..types import AutomationResult, AutomationStatus +from .base_site import SiteFlow + + +class TwitterFlow(SiteFlow): + def match(self, entry: PasswordEntry) -> bool: + target = (entry.url or entry.website or "").lower() + return "twitter.com" in target or "x.com" in target + + def perform_change(self, engine, entry: PasswordEntry, new_password: str, options: Optional[Dict[str, Any]] = None) -> AutomationResult: + try: + # Login flow + engine.goto("https://twitter.com/i/flow/login") + engine.wait_for("input[name='text']") + engine.type("input[name='text']", entry.username) + engine.click("div[role='button'][data-testid='LoginForm_Login_Button'], div[role='button']") + try: + engine.wait_for("input[name='password']", timeout_ms=8000) + except Exception: + return AutomationResult(status=AutomationStatus.NEEDS_MFA, message="MFA or challenge present") + engine.type("input[name='password']", entry.password) + engine.click("div[role='button'][data-testid='LoginForm_Login_Button']") + + # Settings (paths change often; best-effort direct link) + engine.goto("https://twitter.com/settings/password") + engine.wait_for("input[name='current_password'], input[type='password']") + # Try to fill typical current/new/confirm fields + try: + engine.type("input[name='current_password']", entry.password) + except Exception: + pass + engine.type("input[name='new_password']", new_password) + engine.type("input[name='password_confirmation']", new_password) + engine.click("div[role='button'], button[type='submit']") + + return AutomationResult(status=AutomationStatus.SUCCESS, message="Password changed", changed_at=datetime.utcnow()) + except Exception as e: + return AutomationResult(status=AutomationStatus.NEEDS_MANUAL, message="Flow failed or blocked", error=str(e)) diff --git a/src/cerberus/automation/types.py b/src/cerberus/automation/types.py new file mode 100644 index 0000000..f236a28 --- /dev/null +++ b/src/cerberus/automation/types.py @@ -0,0 +1,22 @@ +from dataclasses import dataclass, field +from enum import Enum +from typing import Optional, Dict, Any +from datetime import datetime + + +class AutomationStatus(str, Enum): + SUCCESS = "success" + NEEDS_MFA = "needs_mfa" + NEEDS_MANUAL = "needs_manual" + FAILED = "failed" + + +@dataclass +class AutomationResult: + status: AutomationStatus + message: str = "" + changed_at: Optional[datetime] = None + error: Optional[str] = None + details: Dict[str, Any] = field(default_factory=dict) + + diff --git a/src/cerberus/cli.py b/src/cerberus/cli.py new file mode 100644 index 0000000..902d4be --- /dev/null +++ b/src/cerberus/cli.py @@ -0,0 +1,53 @@ +import argparse +from typing import List + +from cerberus.core.password_manager import PasswordManager +from cerberus.automation.playwright_engine import PlaywrightEngine +from cerberus.automation.selenium_engine import SeleniumEngine, SELENIUM_AVAILABLE +from cerberus.automation.runner import RotationRunner, RotationSelector +from cerberus.automation.policy import generate_for_entry +from cerberus.automation.sites.github import GithubFlow + + +def cli(): + parser = argparse.ArgumentParser(prog="cerberus") + sub = parser.add_subparsers(dest="command") + + rotate = sub.add_parser("rotate", help="Rotate passwords via web automation") + rotate.add_argument("--engine", choices=["playwright", "selenium"], default="playwright") + rotate.add_argument("--data-dir", default=None, help="Password manager data dir") + rotate.add_argument("--master", required=True, help="Master password") + rotate.add_argument("--all", action="store_true", help="Rotate all entries") + rotate.add_argument("--compromised", action="store_true", help="Rotate only compromised entries") + rotate.add_argument("--tag", default=None) + rotate.add_argument("--domain", default=None) + rotate.add_argument("--dry-run", action="store_true") + + args = parser.parse_args() + + if args.command == "rotate": + pm = PasswordManager(data_dir=args.data_dir, master_password=args.master) + + engine = PlaywrightEngine() if args.engine == "playwright" else None + if args.engine == "selenium": + if not SELENIUM_AVAILABLE: + raise SystemExit("Selenium not installed. Install extra: pip install .[automation-selenium]") + engine = SeleniumEngine() + + engine.start(headless=True) + try: + flows = [GithubFlow()] + runner = RotationRunner(engine, flows, pm) + selector = RotationSelector( + all=args.all, + compromised_only=args.compromised, + tag=args.tag, + domain=args.domain, + ) + results = runner.rotate(selector, lambda e: generate_for_entry(pm, e), dry_run=args.dry_run) + for r in results: + print(f"{r.status}: {r.message}") + finally: + engine.stop() + else: + parser.print_help() diff --git a/src/cerberus/cli/__init__.py b/src/cerberus/cli/__init__.py new file mode 100644 index 0000000..d3e17ef --- /dev/null +++ b/src/cerberus/cli/__init__.py @@ -0,0 +1,305 @@ +""" +Cerberus CLI - Command Line Interface for Cerberus Password Manager. +""" +from typing import Optional, List, Dict, Any +import logging +import sys +import json +from pathlib import Path +from datetime import datetime +import getpass + +import click +from rich.console import Console +from rich.table import Table +from rich.progress import Progress, SpinnerColumn, TextColumn + +from ..core.password_manager import PasswordManager, VaultError +from ..core.models import PasswordEntry +from ..tui import main as tui_main + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(message)s') +logger = logging.getLogger(__name__) + +# Create console for rich output +console = Console() + +class CerberusCLI: + """Main CLI application for Cerberus Password Manager.""" + + def __init__(self, data_dir: Optional[str] = None, debug: bool = False): + """Initialize the CLI.""" + self.data_dir = data_dir + self.debug = debug + self.pm: Optional[PasswordManager] = None + + if debug: + logging.getLogger().setLevel(logging.DEBUG) + logger.debug("Debug mode enabled") + + def ensure_initialized(self) -> None: + """Ensure the password manager is initialized.""" + if not self.pm: + raise click.UsageError("Vault is not unlocked. Use 'cerberus unlock' first.") + + def unlock_vault(self, master_password: Optional[str] = None) -> None: + """Unlock the password vault.""" + try: + if not master_password: + master_password = getpass.getpass("Master password: ") + + with self._progress_spinner("Unlocking vault..."): + self.pm = PasswordManager(data_dir=self.data_dir, master_password=master_password) + + console.print("[green]✓[/] Vault unlocked successfully!") + + except VaultError as e: + raise click.ClickException(f"Failed to unlock vault: {e}") + except Exception as e: + if self.debug: + logger.exception("Error unlocking vault") + raise click.ClickException(f"An error occurred: {e}") + + def _progress_spinner(self, description: str): + """Create a progress spinner context manager.""" + return Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + transient=True + ) + + def list_entries(self, search: Optional[str] = None) -> List[PasswordEntry]: + """List password entries, optionally filtered by search term.""" + self.ensure_initialized() + + try: + entries = self.pm.get_entries() + + if search: + search = search.lower() + entries = [ + e for e in entries + if search in e.website.lower() or + search in (e.username or "").lower() or + search in (e.notes or "").lower() or + any(search in tag.lower() for tag in (e.tags or [])) + ] + + return entries + + except Exception as e: + if self.debug: + logger.exception("Error listing entries") + raise click.ClickException(f"Failed to list entries: {e}") + + def get_entry(self, identifier: str) -> PasswordEntry: + """Get a specific password entry by ID or website.""" + self.ensure_initialized() + + try: + # Try to get by ID first + try: + return self.pm.get_entry(identifier) + except (ValueError, KeyError): + # If not found by ID, try by website + entries = [e for e in self.pm.get_entries() if e.website.lower() == identifier.lower()] + if not entries: + raise ValueError(f"No entry found with ID or website: {identifier}") + if len(entries) > 1: + raise ValueError(f"Multiple entries found for website: {identifier}. Please use the entry ID instead.") + return entries[0] + + except Exception as e: + if self.debug: + logger.exception(f"Error getting entry: {identifier}") + raise click.ClickException(str(e)) + + def add_entry( + self, + website: str, + username: str, + password: Optional[str] = None, + url: str = "", + notes: str = "", + tags: Optional[List[str]] = None, + generate: bool = False, + length: int = 16, + special_chars: bool = True + ) -> PasswordEntry: + """Add a new password entry.""" + self.ensure_initialized() + + try: + if generate: + with self._progress_spinner("Generating strong password..."): + password = self.pm.generate_password_easy(length=length, special=special_chars) + elif not password: + password = click.prompt("Password", hide_input=True, confirmation_prompt=True) + + entry = PasswordEntry( + id=self.pm.generate_id(), + website=website, + username=username, + password=password, + url=url, + notes=notes, + tags=tags or [], + created_at=datetime.utcnow(), + updated_at=datetime.utcnow() + ) + + with self._progress_spinner("Saving entry..."): + self.pm.add_entry(entry) + + return entry + + except Exception as e: + if self.debug: + logger.exception("Error adding entry") + raise click.ClickException(f"Failed to add entry: {e}") + + def update_entry(self, entry_id: str, **updates) -> PasswordEntry: + """Update an existing password entry.""" + self.ensure_initialized() + + try: + entry = self.get_entry(entry_id) + + # Apply updates + for key, value in updates.items(): + if value is not None and hasattr(entry, key): + setattr(entry, key, value) + + entry.updated_at = datetime.utcnow() + + with self._progress_spinner("Updating entry..."): + self.pm.update_entry(entry) + + return entry + + except Exception as e: + if self.debug: + logger.exception(f"Error updating entry: {entry_id}") + raise click.ClickException(f"Failed to update entry: {e}") + + def delete_entry(self, entry_id: str) -> None: + """Delete a password entry.""" + self.ensure_initialized() + + try: + entry = self.get_entry(entry_id) + + if click.confirm(f"Are you sure you want to delete the entry for {entry.website}?"): + with self._progress_spinner("Deleting entry..."): + self.pm.delete_entry(entry.id) + + console.print(f"[green]✓[/] Deleted entry for {entry.website}") + + except Exception as e: + if self.debug: + logger.exception(f"Error deleting entry: {entry_id}") + raise click.ClickException(f"Failed to delete entry: {e}") + + def rotate_password( + self, + entry_id: str, + length: int = 32, + special_chars: bool = True + ) -> PasswordEntry: + """Generate a new password for an entry.""" + self.ensure_initialized() + + try: + entry = self.get_entry(entry_id) + + with self._progress_spinner("Generating new password..."): + new_password = self.pm.generate_password_easy(length=length, special=special_chars) + + entry.password = new_password + entry.updated_at = datetime.utcnow() + + with self._progress_spinner("Saving updated entry..."): + self.pm.update_entry(entry) + + return entry + + except Exception as e: + if self.debug: + logger.exception(f"Error rotating password for entry: {entry_id}") + raise click.ClickException(f"Failed to rotate password: {e}") + + def export_entries(self, output_file: str, format: str = "json") -> None: + """Export password entries to a file.""" + self.ensure_initialized() + + try: + entries = self.pm.get_entries() + output_path = Path(output_file).expanduser().resolve() + + with self._progress_spinner(f"Exporting entries to {output_path}..."): + if format.lower() == "json": + data = [e.to_dict() for e in entries] + output_path.write_text(json.dumps(data, indent=2, default=str)) + else: + raise ValueError(f"Unsupported export format: {format}") + + console.print(f"[green]✓[/] Exported {len(entries)} entries to {output_path}") + + except Exception as e: + if self.debug: + logger.exception("Error exporting entries") + raise click.ClickException(f"Failed to export entries: {e}") + + def import_entries(self, input_file: str, format: str = "json") -> None: + """Import password entries from a file.""" + self.ensure_initialized() + + try: + input_path = Path(input_file).expanduser().resolve() + + if not input_path.exists(): + raise FileNotFoundError(f"Input file not found: {input_path}") + + with self._progress_spinner(f"Importing entries from {input_path}..."): + if format.lower() == "json": + data = json.loads(input_path.read_text()) + for item in data: + entry = PasswordEntry.from_dict(item) + # Ensure we don't overwrite existing entries + entry.id = self.pm.generate_id() + self.pm.add_entry(entry) + else: + raise ValueError(f"Unsupported import format: {format}") + + console.print(f"[green]✓[/] Imported {len(data)} entries from {input_path}") + + except Exception as e: + if self.debug: + logger.exception("Error importing entries") + raise click.ClickException(f"Failed to import entries: {e}") + +def print_entry_table(entries: List[PasswordEntry]) -> None: + """Print a table of password entries.""" + if not entries: + console.print("[yellow]No entries found.[/]") + return + + table = Table(show_header=True, header_style="bold magenta") + table.add_column("ID", style="dim", width=8) + table.add_column("Website") + table.add_column("Username") + table.add_column("Last Used", style="dim") + table.add_column("Updated", style="dim") + + for entry in entries: + table.add_row( + entry.id[:8], + entry.website, + entry.username, + entry.last_used.strftime("%Y-%m-%d") if entry.last_used else "Never", + entry.updated_at.strftime("%Y-%m-%d") if entry.updated_at else "" + ) + + console.print(table) diff --git a/src/cerberus/cli/__pycache__/__init__.cpython-313.pyc b/src/cerberus/cli/__pycache__/__init__.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..093e1af --- /dev/null +++ b/src/cerberus/cli/__pycache__/__init__.cpython-313.pyc diff --git a/src/cerberus/cli/__pycache__/main.cpython-313.pyc b/src/cerberus/cli/__pycache__/main.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..3663a5a --- /dev/null +++ b/src/cerberus/cli/__pycache__/main.cpython-313.pyc diff --git a/src/cerberus/cli/main.py b/src/cerberus/cli/main.py new file mode 100644 index 0000000..c40c411 --- /dev/null +++ b/src/cerberus/cli/main.py @@ -0,0 +1,611 @@ +""" +Cerberus CLI - Command Line Interface for Cerberus Password Manager. +""" +import os +import sys +import logging +from typing import Optional + +import click +from rich.console import Console +from rich.logging import RichHandler + +from . import CerberusCLI, print_entry_table +from ..automation.playwright_engine import PlaywrightEngine +from ..automation.selenium_engine import SeleniumEngine, SELENIUM_AVAILABLE +from ..automation.runner import RotationRunner, RotationSelector +from ..automation.sites.github import GithubFlow +from ..automation.sites.google import GoogleFlow +from ..automation.sites.microsoft import MicrosoftFlow +from ..automation.sites.twitter import TwitterFlow +from ..automation.sites.facebook import FacebookFlow +from ..automation.sites.linkedin import LinkedInFlow +from ..automation.sites.apple import AppleFlow +from ..automation.policy import generate_for_entry +from ..automation.types import AutomationStatus + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(message)s", + datefmt="[%X]", + handlers=[RichHandler(rich_tracebacks=True)] +) +logger = logging.getLogger("cerberus") + +# Create console for rich output +console = Console() + +# Default data directory +DEFAULT_DATA_DIR = os.path.expanduser("~/.cerberus") + +@click.group(invoke_without_command=True) +@click.option( + "--data-dir", + type=click.Path(file_okay=False, dir_okay=True, writable=True), + default=DEFAULT_DATA_DIR, + help="Directory to store password data", + show_default=True +) +@click.option( + "--debug/--no-debug", + default=False, + help="Enable debug output", + show_default=True +) +@click.pass_context +def cli(ctx: click.Context, data_dir: str, debug: bool) -> None: + """Cerberus Password Manager - Secure and user-friendly password management.""" + # Set debug logging if enabled + if debug: + logging.getLogger().setLevel(logging.DEBUG) + logger.debug("Debug mode enabled") + + # Create data directory if it doesn't exist + os.makedirs(data_dir, exist_ok=True) + + # Store the CLI instance in the context + ctx.obj = CerberusCLI(data_dir=data_dir, debug=debug) + + # If no command is provided, show help + if ctx.invoked_subcommand is None: + click.echo(ctx.get_help()) + +@cli.command() +@click.pass_obj +def init(cli: CerberusCLI) -> None: + """Initialize a new password vault.""" + master_password = click.prompt( + "Enter a master password", + hide_input=True, + confirmation_prompt=True + ) + + try: + cli.unlock_vault(master_password) + console.print("[green]✓[/] Vault initialized and unlocked!") + except Exception as e: + console.print(f"[red]✗[/] Failed to initialize vault: {e}") + sys.exit(1) + +@cli.command() +@click.option("--engine", type=click.Choice(["playwright", "selenium"]), default="playwright", show_default=True) +@click.option("--all", "all_", is_flag=True, default=False, help="Include all entries") +@click.option("--compromised", is_flag=True, default=False, help="Only compromised entries") +@click.option("--tag", default=None, help="Filter by tag") +@click.option("--domain", default=None, help="Filter by domain") +@click.pass_obj +def reliability_report( + cli: CerberusCLI, + engine: str, + all_: bool, + compromised: bool, + tag: Optional[str], + domain: Optional[str], +) -> None: + """Run a dry-run rotate across selected entries and report SUCCESS/NEEDS_MANUAL/FAILED counts.""" + try: + try: + cli.ensure_initialized() + except Exception: + # Attempt interactive unlock + password = click.prompt("Master password", hide_input=True) + cli.unlock_vault(password) + if engine == "playwright": + eng = PlaywrightEngine() + else: + if not SELENIUM_AVAILABLE: + raise click.ClickException("Selenium not installed. Install extra: pip install .[automation-selenium]") + eng = SeleniumEngine() + + eng.start(headless=True) + try: + flows = [ + GithubFlow(), GoogleFlow(), MicrosoftFlow(), TwitterFlow(), + FacebookFlow(), LinkedInFlow(), AppleFlow() + ] + runner = RotationRunner(eng, flows, cli.pm) # type: ignore[arg-type] + selector = RotationSelector( + all=all_, + compromised_only=compromised, + tag=tag, + domain=domain, + ) + results = runner.rotate(selector, lambda e: generate_for_entry(cli.pm, e), dry_run=True) # type: ignore[arg-type] + counts = {AutomationStatus.SUCCESS: 0, AutomationStatus.NEEDS_MANUAL: 0, AutomationStatus.FAILED: 0} + for r in results: + counts[r.status] = counts.get(r.status, 0) + 1 + console.print("[bold]Reliability Report[/bold]") + console.print(f"SUCCESS: {counts.get(AutomationStatus.SUCCESS, 0)}") + console.print(f"NEEDS_MANUAL: {counts.get(AutomationStatus.NEEDS_MANUAL, 0)}") + console.print(f"FAILED: {counts.get(AutomationStatus.FAILED, 0)}") + finally: + eng.stop() + except Exception as e: + console.print(f"[red]✗[/] Reliability report failed: {e}") + if cli.debug: + import traceback + console.print(traceback.format_exc()) + sys.exit(1) + +@cli.command() +@click.option( + "--password", + help="Master password (prompt if not provided)", + default=None +) +@click.pass_obj +def unlock(cli: CerberusCLI, password: Optional[str]) -> None: + """Unlock the password vault.""" + try: + cli.unlock_vault(password) + console.print("[green]✓[/] Vault unlocked!") + except Exception as e: + console.print(f"[red]✗[/] Failed to unlock vault: {e}") + sys.exit(1) + +@cli.command() +@click.option( + "--search", + "-s", + help="Filter entries by search term" +) +@click.pass_obj +def list(cli: CerberusCLI, search: Optional[str]) -> None: + """List all password entries.""" + try: + entries = cli.list_entries(search) + print_entry_table(entries) + except Exception as e: + console.print(f"[red]✗[/] Failed to list entries: {e}") + sys.exit(1) + +@cli.command() +@click.argument("identifier") +@click.pass_obj +def show(cli: CerberusCLI, identifier: str) -> None: + """Show details for a specific password entry.""" + try: + entry = cli.get_entry(identifier) + + console.print(f"[bold]Entry:[/bold] {entry.id}") + console.print(f"[bold]Website:[/bold] {entry.website}") + console.print(f"[bold]Username:[/bold] {entry.username}") + console.print(f"[bold]Password:[/bold] {'*' * 12} (use 'cerberus copy-password {entry.id}' to copy)") + + if entry.url: + console.print(f"[bold]URL:[/bold] {entry.url}") + if entry.notes: + console.print("[bold]Notes:[/bold]") + console.print(entry.notes) + if entry.tags: + console.print(f"[bold]Tags:[/bold] {', '.join(entry.tags)}") + + console.print(f"[dim]Created: {entry.created_at}") + console.print(f"[dim]Updated: {entry.updated_at}") + if entry.last_used: + console.print(f"[dim]Last Used: {entry.last_used}") + + except Exception as e: + console.print(f"[red]✗[/] Failed to get entry: {e}") + sys.exit(1) + +@cli.command() +@click.option( + "--website", + "-w", + required=True, + help="Website or service name" +) +@click.option( + "--username", + "-u", + required=True, + help="Username or email" +) +@click.option( + "--password", + "-p", + help="Password (prompt if not provided)" +) +@click.option( + "--url", + help="Website URL" +) +@click.option( + "--notes", + "-n", + help="Additional notes" +) +@click.option( + "--tag", + "-t", + "tags", + multiple=True, + help="Tags for organization (can be used multiple times)" +) +@click.option( + "--generate/--no-generate", + "-g", + default=False, + help="Generate a strong password" +) +@click.option( + "--length", + "-l", + type=int, + default=16, + help="Length of generated password" +) +@click.option( + "--no-special-chars", + is_flag=True, + default=False, + help="Exclude special characters from generated password" +) +@click.pass_obj +def add( + cli: CerberusCLI, + website: str, + username: str, + password: Optional[str], + url: str, + notes: str, + tags: list, + generate: bool, + length: int, + no_special_chars: bool +) -> None: + """Add a new password entry.""" + try: + entry = cli.add_entry( + website=website, + username=username, + password=password, + url=url, + notes=notes, + tags=list(tags) if tags else None, + generate=generate, + length=length, + special_chars=not no_special_chars + ) + + console.print(f"[green]✓[/] Added entry for [bold]{entry.website}[/bold]") + if generate: + console.print(f"Generated password: [yellow]{entry.password}[/]") + + except Exception as e: + console.print(f"[red]✗[/] Failed to add entry: {e}") + sys.exit(1) + +@cli.command() +@click.argument("identifier") +@click.option( + "--website", + "-w", + help="Update website name" +) +@click.option( + "--username", + "-u", + help="Update username" +) +@click.option( + "--password", + "-p", + help="Update password (prompt if not provided)" +) +@click.option( + "--url", + help="Update website URL" +) +@click.option( + "--notes", + "-n", + help="Update notes" +) +@click.option( + "--tag", + "-t", + "tags", + multiple=True, + help="Update tags (use --tag=clear to remove all tags)" +) +@click.pass_obj +def edit( + cli: CerberusCLI, + identifier: str, + website: str, + username: str, + password: str, + url: str, + notes: str, + tags: list +) -> None: + """Edit an existing password entry.""" + try: + # Get the current entry + entry = cli.get_entry(identifier) + + # Prepare updates + updates = {} + + if website is not None: + updates["website"] = website + if username is not None: + updates["username"] = username + if password is not None: + if password == "": + password = click.prompt("New password", hide_input=True, confirmation_prompt=True) + updates["password"] = password + if url is not None: + updates["url"] = url + if notes is not None: + updates["notes"] = notes + if tags: + if tags == ("clear",): + updates["tags"] = [] + else: + updates["tags"] = list(tags) + + if not updates: + console.print("[yellow]No changes specified.[/]") + return + + # Apply updates + updated_entry = cli.update_entry(entry.id, **updates) + console.print(f"[green]✓[/] Updated entry for [bold]{updated_entry.website}[/]") + + except Exception as e: + console.print(f"[red]✗[/] Failed to update entry: {e}") + sys.exit(1) + +@cli.command() +@click.argument("identifier") +@click.option( + "--length", + "-l", + type=int, + default=32, + help="Length of the new password" +) +@click.option( + "--no-special-chars", + is_flag=True, + default=False, + help="Exclude special characters from the new password" +) +@click.pass_obj +def rotate( + cli: CerberusCLI, + identifier: str, + length: int, + no_special_chars: bool +) -> None: + """Generate a new password for an entry.""" + try: + entry = cli.rotate_password( + identifier, + length=length, + special_chars=not no_special_chars + ) + + console.print(f"[green]✓[/] Rotated password for [bold]{entry.website}[/]") + console.print(f"New password: [yellow]{entry.password}[/]") + + except Exception as e: + console.print(f"[red]✗[/] Failed to rotate password: {e}") + sys.exit(1) + +@cli.command() +@click.argument("identifier") +@click.pass_obj +def delete(cli: CerberusCLI, identifier: str) -> None: + """Delete a password entry.""" + try: + cli.delete_entry(identifier) + except Exception as e: + console.print(f"[red]✗[/] Failed to delete entry: {e}") + sys.exit(1) + +@cli.command() +@click.argument("identifier") +@click.pass_obj +def copy_username(cli: CerberusCLI, identifier: str) -> None: + """Copy username to clipboard.""" + try: + entry = cli.get_entry(identifier) + + # Use platform-specific clipboard handling + import pyperclip + pyperclip.copy(entry.username) + + console.print(f"[green]✓[/] Copied username for [bold]{entry.website}[/] to clipboard") + + except Exception as e: + console.print(f"[red]✗[/] Failed to copy username: {e}") + sys.exit(1) + +@cli.command() +@click.argument("identifier") +@click.pass_obj +def copy_password(cli: CerberusCLI, identifier: str) -> None: + """Copy password to clipboard.""" + try: + entry = cli.get_entry(identifier) + + # Use platform-specific clipboard handling + import pyperclip + pyperclip.copy(entry.password) + + console.print(f"[green]✓[/] Copied password for [bold]{entry.website}[/] to clipboard") + + # Clear clipboard after 30 seconds + import threading + import time + + def clear_clipboard(): + time.sleep(30) + if pyperclip.paste() == entry.password: + pyperclip.copy("") + console.print("[yellow]✓[/] Clipboard cleared") + + threading.Thread(target=clear_clipboard, daemon=True).start() + + except Exception as e: + console.print(f"[red]✗[/] Failed to copy password: {e}") + sys.exit(1) + +@cli.command() +@click.argument("output_file", type=click.Path()) +@click.option( + "--format", + "-f", + type=click.Choice(["json"], case_sensitive=False), + default="json", + help="Export format" +) +@click.pass_obj +def export(cli: CerberusCLI, output_file: str, format: str) -> None: + """Export password entries to a file.""" + try: + cli.export_entries(output_file, format=format) + except Exception as e: + console.print(f"[red]✗[/] Failed to export entries: {e}") + sys.exit(1) + +@cli.command() +@click.argument("input_file", type=click.Path(exists=True)) +@click.option( + "--format", + "-f", + type=click.Choice(["json"], case_sensitive=False), + default="json", + help="Import format" +) +@click.pass_obj +def import_entries(cli: CerberusCLI, input_file: str, format: str) -> None: + """Import password entries from a file.""" + try: + cli.import_entries(input_file, format=format) + except Exception as e: + console.print(f"[red]✗[/] Failed to import entries: {e}") + sys.exit(1) + +@cli.command() +@click.pass_obj +def tui(cli: CerberusCLI) -> None: + """Launch the Terminal User Interface.""" + from ..tui import main as tui_main + tui_main() + +@cli.command() +@click.pass_obj +def gui(cli: CerberusCLI) -> None: + """Launch the Graphical User Interface.""" + from ..gui import run_app + run_app() + +@cli.command() +@click.argument("identifier", required=False) +@click.option("--engine", type=click.Choice(["playwright", "selenium"]), default="playwright", show_default=True) +@click.option("--all", "all_", is_flag=True, default=False, help="Rotate all entries") +@click.option("--compromised", is_flag=True, default=False, help="Only compromised entries") +@click.option("--tag", default=None, help="Filter by tag") +@click.option("--domain", default=None, help="Filter by domain") +@click.option("--dry-run", is_flag=True, default=False, help="Do not perform changes, simulate only") +@click.option("--user-data-dir", type=str, default=None, help="Browser user data dir for persistent sessions") +@click.option("--no-headless", is_flag=True, default=False, help="Run browser with a visible window") +@click.pass_obj +def web_rotate( + cli: CerberusCLI, + identifier: Optional[str], + engine: str, + all_: bool, + compromised: bool, + tag: Optional[str], + domain: Optional[str], + dry_run: bool, + user_data_dir: Optional[str], + no_headless: bool, +) -> None: + """Rotate password(s) on websites via web automation with dynamic discovery. + + If IDENTIFIER is provided, attempts to rotate only that entry (by id or website). + Otherwise uses filters (--all/--tag/--domain/--compromised). + """ + try: + cli.ensure_initialized() + # Create automation engine + if engine == "playwright": + eng = PlaywrightEngine() + else: + if not SELENIUM_AVAILABLE: + raise click.ClickException("Selenium not installed. Install extra: pip install .[automation-selenium]") + eng = SeleniumEngine() + + eng.start(headless=not no_headless, user_data_dir=user_data_dir) + try: + flows = [ + GithubFlow(), GoogleFlow(), MicrosoftFlow(), TwitterFlow(), + FacebookFlow(), LinkedInFlow(), AppleFlow() + ] + runner = RotationRunner(eng, flows, cli.pm) # type: ignore[arg-type] + if identifier: + # Build selector to target specific entry + selector = RotationSelector(all=False) + # Temporarily filter entries by overriding internals using domain/website matching + # We'll rely on runner._filter_entries via domain filter + # Best-effort: put identifier into domain filter + domain = identifier + selector = RotationSelector( + all=all_, + compromised_only=compromised, + tag=tag, + domain=domain, + ) + results = runner.rotate(selector, lambda e: generate_for_entry(cli.pm, e), dry_run=dry_run) # type: ignore[arg-type] + for r in results: + console.print(f"[bold]{r.status.value}[/]: {r.message}") + finally: + eng.stop() + except Exception as e: + console.print(f"[red]✗[/] Web rotate failed: {e}") + if cli.debug: + import traceback + console.print(traceback.format_exc()) + sys.exit(1) + +def main() -> None: + """Entry point for the Cerberus CLI.""" + try: + cli() + except Exception as e: + console.print(f"[red]Error:[/red] {e}") + if cli.obj and cli.obj.debug: + import traceback + console.print(traceback.format_exc()) + sys.exit(1) + +if __name__ == "__main__": + main() diff --git a/src/cerberus/core/Makefile b/src/cerberus/core/Makefile new file mode 100644 index 0000000..14569a9 --- /dev/null +++ b/src/cerberus/core/Makefile @@ -0,0 +1,31 @@ +# Compiler and flags +CC = gcc +CFLAGS = -fPIC -O2 -Wall -Wextra +LDFLAGS = -shared -fPIC -lssl -lcrypto -luuid +TARGET = libcerberus.so +SOURCES = cerberus.c +OBJECTS = $(SOURCES:.c=.o) + +.PHONY: all clean + +all: $(TARGET) + +$(TARGET): $(OBJECTS) + $(CC) $(LDFLAGS) -o $@ $^ + +%.o: %.c + $(CC) $(CFLAGS) -c -o $@ $< + +clean: + rm -f $(TARGET) $(OBJECTS) + +install: $(TARGET) + cp $(TARGET) /usr/local/lib/ + ldconfig + +uninstall: + rm -f /usr/local/lib/$(TARGET) + ldconfig + +test: $(TARGET) + python3 -m pytest tests/ diff --git a/src/cerberus/core/__init__.py b/src/cerberus/core/__init__.py new file mode 100644 index 0000000..5541b05 --- /dev/null +++ b/src/cerberus/core/__init__.py @@ -0,0 +1,127 @@ +"""Cerberus Core - Core functionality for the Cerberus password manager. + +This module provides the core functionality for the Cerberus password manager, +including the C core bindings and high-level password management interfaces. +""" + +import os +import cffi +from pathlib import Path +from typing import Optional, Any + +# Initialize CFFI (exported for callers that need to manage buffers) +ffi = cffi.FFI() + +def _load_header(): + cdef_src = ''' + typedef unsigned int uint32_t; + typedef unsigned long size_t; + typedef long time_t; + typedef int bool; + + typedef struct cerb_vault_t cerb_vault_t; + typedef struct cerb_entry_t cerb_entry_t; + + typedef struct { + char id[37]; + char website[256]; + char username[256]; + char password[1024]; + char notes[4096]; + char url[1024]; + time_t created_at; + time_t updated_at; + } cerb_entry_basic_t; + + int cerb_crypto_init(void); + void cerb_crypto_cleanup(void); + + int cerb_vault_create(const char *master_password, cerb_vault_t **vault); + int cerb_vault_open(const char *master_password, const char *vault_path, cerb_vault_t **vault); + int cerb_vault_save(cerb_vault_t *vault, const char *vault_path); + void cerb_vault_close(cerb_vault_t *vault); + + int cerb_vault_add_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry); + int cerb_vault_update_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry); + int cerb_vault_delete_entry(cerb_vault_t *vault, const char *entry_id); + int cerb_vault_get_entry_basic(cerb_vault_t *vault, const char *entry_id, cerb_entry_basic_t *entry); + int cerb_vault_get_entries_basic(cerb_vault_t *vault, cerb_entry_basic_t **entries, size_t *count); + int cerb_vault_search_basic(cerb_vault_t *vault, const char *query, cerb_entry_basic_t **results, size_t *count); + + int cerb_generate_password(uint32_t length, bool use_upper, bool use_lower, bool use_digits, bool use_special, char *buffer, size_t buffer_size); + void cerb_generate_uuid(char *uuid); + time_t cerb_current_timestamp(void); + ''' + ffi.cdef(cdef_src) + +# Load the header +_load_header() + +# Try to load the compiled library +_lib = None + +def init() -> bool: + """Initialize the Cerberus C core. + + Returns: + bool: True if initialization was successful, False otherwise + """ + global _lib + + if _lib is not None: + return True + + # Try multiple candidate names + candidates = [ + Path(__file__).parent / 'libcerberus.so', + Path(__file__).parent / 'cerberus.so' + ] + for lib_path in candidates: + try: + _lib = ffi.dlopen(str(lib_path)) + return True + except OSError: + continue + _lib = None + return False + +# Initialize on import +if not init(): + class _DummyLib: + def __getattribute__(self, name: str) -> Any: + raise RuntimeError( + "Cerberus C core not initialized. " + "Please ensure the core is compiled and in your library path." + ) + _lib = _DummyLib() + CORE_AVAILABLE = False +else: + CORE_AVAILABLE = True + +# Re-export the C functions with proper typing +for name in dir(_lib): + if name.startswith('cerb_'): + globals()[name] = getattr(_lib, name) + +# Error code constants (must match cerberus.h enum) +CERB_OK = 0 +CERB_ERROR = -1 + +# Clean up the namespace (keep ffi exported) +try: + del _DummyLib +except NameError: + pass +del os, Path, _load_header, init + +# Export high-level interfaces +from .password_manager import PasswordManager +from .models import PasswordEntry + +__all__ = [ + 'PasswordManager', + 'PasswordEntry', + 'VaultError', + 'CoreNotAvailableError', + 'ffi' +] diff --git a/src/cerberus/core/__pycache__/__init__.cpython-313.pyc b/src/cerberus/core/__pycache__/__init__.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..1807bd2 --- /dev/null +++ b/src/cerberus/core/__pycache__/__init__.cpython-313.pyc diff --git a/src/cerberus/core/__pycache__/models.cpython-313.pyc b/src/cerberus/core/__pycache__/models.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..ffc2dd6 --- /dev/null +++ b/src/cerberus/core/__pycache__/models.cpython-313.pyc diff --git a/src/cerberus/core/__pycache__/password_manager.cpython-313.pyc b/src/cerberus/core/__pycache__/password_manager.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..813cf2b --- /dev/null +++ b/src/cerberus/core/__pycache__/password_manager.cpython-313.pyc diff --git a/src/cerberus/core/cerberus.c b/src/cerberus/core/cerberus.c new file mode 100644 index 0000000..8511e3a --- /dev/null +++ b/src/cerberus/core/cerberus.c @@ -0,0 +1,530 @@ +#include "cerberus.h" +#include <string.h> +#include <stdlib.h> +#include <stdio.h> +#include <openssl/evp.h> +#include <openssl/rand.h> +#include <openssl/err.h> +// uuid/uuid.h not required; implement UUID v4 using RAND_bytes + +// Vault structure +typedef struct { + uint8_t salt[SALT_LEN]; + uint8_t key[KEY_LEN]; + bool key_initialized; + cerb_entry_t *entries; + size_t num_entries; + size_t capacity; +} cerb_vault_internal_t; + +// Initialize crypto +cerb_error_t cerb_crypto_init(void) { + OpenSSL_add_all_algorithms(); + ERR_load_crypto_strings(); + return RAND_poll() ? CERB_OK : CERB_CRYPTO_ERROR; +} + +// Cleanup crypto +void cerb_crypto_cleanup(void) { + EVP_cleanup(); + ERR_free_strings(); +} + +// Create new vault +cerb_error_t cerb_vault_create(const char *master_password, cerb_vault_t **vault) { + if (!master_password || !vault) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = calloc(1, sizeof(cerb_vault_internal_t)); + if (!v) return CERB_MEMORY_ERROR; + + if (RAND_bytes(v->salt, SALT_LEN) != 1) { + free(v); + return CERB_CRYPTO_ERROR; + } + + // Derive key from password and salt + if (!PKCS5_PBKDF2_HMAC(master_password, (int)strlen(master_password), + v->salt, SALT_LEN, PBKDF2_ITERATIONS, + EVP_sha256(), KEY_LEN, v->key)) { + free(v); + return CERB_CRYPTO_ERROR; + } + v->key_initialized = true; + + v->capacity = 32; + v->entries = calloc(v->capacity, sizeof(cerb_entry_t)); + if (!v->entries) { + free(v); + return CERB_MEMORY_ERROR; + } + + *vault = (cerb_vault_t *)v; + return CERB_OK; +} + +// Save vault to file (AES-256-GCM encrypted blob) +cerb_error_t cerb_vault_save(cerb_vault_t *vault, const char *vault_path) { + if (!vault || !vault_path) return CERB_INVALID_ARG; + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + FILE *fp = fopen(vault_path, "wb"); + if (!fp) return CERB_STORAGE_ERROR; + + // Serialize entries: [num_entries][entries...] + size_t plain_len = sizeof(uint32_t) + v->num_entries * sizeof(cerb_entry_t); + unsigned char *plaintext = malloc(plain_len); + if (!plaintext) { fclose(fp); return CERB_MEMORY_ERROR; } + + uint32_t n = (uint32_t)v->num_entries; + memcpy(plaintext, &n, sizeof(uint32_t)); + if (v->num_entries > 0) { + memcpy(plaintext + sizeof(uint32_t), v->entries, v->num_entries * sizeof(cerb_entry_t)); + } + + // Prepare AES-GCM + unsigned char iv[IV_LEN]; + if (RAND_bytes(iv, IV_LEN) != 1) { free(plaintext); fclose(fp); return CERB_CRYPTO_ERROR; } + unsigned char *ciphertext = malloc(plain_len); + if (!ciphertext) { free(plaintext); fclose(fp); return CERB_MEMORY_ERROR; } + int len = 0, ciphertext_len = 0; + unsigned char tag[16]; + + EVP_CIPHER_CTX *ctx = EVP_CIPHER_CTX_new(); + if (!ctx) { free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; } + + if (EVP_EncryptInit_ex(ctx, EVP_aes_256_gcm(), NULL, NULL, NULL) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + if (EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_IVLEN, IV_LEN, NULL) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + if (EVP_EncryptInit_ex(ctx, NULL, NULL, v->key, iv) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + + if (EVP_EncryptUpdate(ctx, ciphertext, &len, plaintext, (int)plain_len) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + ciphertext_len = len; + if (EVP_EncryptFinal_ex(ctx, ciphertext + len, &len) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + ciphertext_len += len; + if (EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_GET_TAG, 16, tag) != 1) { + EVP_CIPHER_CTX_free(ctx); free(plaintext); free(ciphertext); fclose(fp); return CERB_CRYPTO_ERROR; + } + EVP_CIPHER_CTX_free(ctx); + + // Write file: MAGIC, VERSION, SALT, IV, TAG, CIPHERTEXT_LEN, CIPHERTEXT + const char magic[8] = { 'C','E','R','B','E','R','U','S' }; + uint32_t version = 1; + uint32_t clen = (uint32_t)ciphertext_len; + + if (fwrite(magic, 1, sizeof(magic), fp) != sizeof(magic) || + fwrite(&version, 1, sizeof(version), fp) != sizeof(version) || + fwrite(v->salt, 1, SALT_LEN, fp) != SALT_LEN || + fwrite(iv, 1, IV_LEN, fp) != IV_LEN || + fwrite(tag, 1, sizeof(tag), fp) != sizeof(tag) || + fwrite(&clen, 1, sizeof(clen), fp) != sizeof(clen) || + fwrite(ciphertext, 1, ciphertext_len, fp) != (size_t)ciphertext_len) { + free(plaintext); free(ciphertext); fclose(fp); return CERB_STORAGE_ERROR; + } + + free(plaintext); + free(ciphertext); + fclose(fp); + return CERB_OK; +} + +// Open vault from file +cerb_error_t cerb_vault_open(const char *master_password, const char *vault_path, cerb_vault_t **vault) { + if (!master_password || !vault_path || !vault) return CERB_INVALID_ARG; + FILE *fp = fopen(vault_path, "rb"); + if (!fp) return CERB_STORAGE_ERROR; + + const char expected_magic[8] = { 'C','E','R','B','E','R','U','S' }; + char magic[8]; + uint32_t version = 0; + unsigned char salt[SALT_LEN]; + unsigned char iv[IV_LEN]; + unsigned char tag[16]; + uint32_t clen = 0; + + if (fread(magic, 1, sizeof(magic), fp) != sizeof(magic) || + memcmp(magic, expected_magic, sizeof(magic)) != 0 || + fread(&version, 1, sizeof(version), fp) != sizeof(version) || + fread(salt, 1, SALT_LEN, fp) != SALT_LEN || + fread(iv, 1, IV_LEN, fp) != IV_LEN || + fread(tag, 1, sizeof(tag), fp) != sizeof(tag) || + fread(&clen, 1, sizeof(clen), fp) != sizeof(clen)) { + fclose(fp); + return CERB_STORAGE_ERROR; + } + + unsigned char *ciphertext = malloc(clen); + if (!ciphertext) { fclose(fp); return CERB_MEMORY_ERROR; } + if (fread(ciphertext, 1, clen, fp) != clen) { free(ciphertext); fclose(fp); return CERB_STORAGE_ERROR; } + fclose(fp); + + // Derive key + unsigned char key[KEY_LEN]; + if (!PKCS5_PBKDF2_HMAC(master_password, (int)strlen(master_password), + salt, SALT_LEN, PBKDF2_ITERATIONS, + EVP_sha256(), KEY_LEN, key)) { + free(ciphertext); + return CERB_CRYPTO_ERROR; + } + + // Decrypt + unsigned char *plaintext = malloc(clen); // ciphertext_len >= plaintext_len + if (!plaintext) { free(ciphertext); return CERB_MEMORY_ERROR; } + int len = 0, plain_len = 0; + cerb_error_t status = CERB_OK; + EVP_CIPHER_CTX *ctx = EVP_CIPHER_CTX_new(); + if (!ctx) { free(ciphertext); free(plaintext); return CERB_CRYPTO_ERROR; } + if (EVP_DecryptInit_ex(ctx, EVP_aes_256_gcm(), NULL, NULL, NULL) != 1) status = CERB_CRYPTO_ERROR; + if (status == CERB_OK && EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_IVLEN, IV_LEN, NULL) != 1) status = CERB_CRYPTO_ERROR; + if (status == CERB_OK && EVP_DecryptInit_ex(ctx, NULL, NULL, key, iv) != 1) status = CERB_CRYPTO_ERROR; + if (status == CERB_OK && EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, (int)clen) != 1) status = CERB_CRYPTO_ERROR; + plain_len = len; + if (status == CERB_OK && EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_TAG, 16, tag) != 1) status = CERB_CRYPTO_ERROR; + if (status == CERB_OK && EVP_DecryptFinal_ex(ctx, plaintext + len, &len) != 1) status = CERB_CRYPTO_ERROR; + plain_len += len; + EVP_CIPHER_CTX_free(ctx); + if (status != CERB_OK) { free(ciphertext); free(plaintext); return CERB_CRYPTO_ERROR; } + + // Deserialize + if ((size_t)plain_len < sizeof(uint32_t)) { free(ciphertext); free(plaintext); return CERB_STORAGE_ERROR; } + uint32_t n = 0; memcpy(&n, plaintext, sizeof(uint32_t)); + size_t expected = sizeof(uint32_t) + (size_t)n * sizeof(cerb_entry_t); + if ((size_t)plain_len != expected) { free(ciphertext); free(plaintext); return CERB_STORAGE_ERROR; } + + cerb_vault_internal_t *v = calloc(1, sizeof(cerb_vault_internal_t)); + if (!v) { free(ciphertext); free(plaintext); return CERB_MEMORY_ERROR; } + memcpy(v->salt, salt, SALT_LEN); + memcpy(v->key, key, KEY_LEN); + v->key_initialized = true; + v->capacity = n > 0 ? n : 32; + v->entries = calloc(v->capacity, sizeof(cerb_entry_t)); + if (!v->entries) { free(v); free(ciphertext); free(plaintext); return CERB_MEMORY_ERROR; } + v->num_entries = n; + if (n > 0) { + memcpy(v->entries, plaintext + sizeof(uint32_t), (size_t)n * sizeof(cerb_entry_t)); + } + + *vault = (cerb_vault_t *)v; + free(ciphertext); + free(plaintext); + return CERB_OK; +} + +// Add entry to vault +cerb_error_t cerb_vault_add_entry(cerb_vault_t *vault, const cerb_entry_t *entry) { + if (!vault || !entry) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + // Check for duplicates + for (size_t i = 0; i < v->num_entries; i++) { + if (strcmp(v->entries[i].id, entry->id) == 0) { + return CERB_DUPLICATE; + } + } + + // Resize if needed + if (v->num_entries >= v->capacity) { + size_t new_capacity = v->capacity * 2; + cerb_entry_t *new_entries = realloc(v->entries, new_capacity * sizeof(cerb_entry_t)); + if (!new_entries) return CERB_MEMORY_ERROR; + v->entries = new_entries; + v->capacity = new_capacity; + } + + // Add entry + v->entries[v->num_entries++] = *entry; + return CERB_OK; +} + +// Update existing entry +cerb_error_t cerb_vault_update_entry(cerb_vault_t *vault, const cerb_entry_t *entry) { + if (!vault || !entry) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + for (size_t i = 0; i < v->num_entries; i++) { + if (strcmp(v->entries[i].id, entry->id) == 0) { + v->entries[i] = *entry; + return CERB_OK; + } + } + + return CERB_NOT_FOUND; +} + +// Delete entry by ID +cerb_error_t cerb_vault_delete_entry(cerb_vault_t *vault, const char *entry_id) { + if (!vault || !entry_id) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + for (size_t i = 0; i < v->num_entries; i++) { + if (strcmp(v->entries[i].id, entry_id) == 0) { + // Move last entry into this slot to keep array compact + if (i != v->num_entries - 1) { + v->entries[i] = v->entries[v->num_entries - 1]; + } + memset(&v->entries[v->num_entries - 1], 0, sizeof(cerb_entry_t)); + v->num_entries--; + return CERB_OK; + } + } + + return CERB_NOT_FOUND; +} + +// Get entry by ID +cerb_error_t cerb_vault_get_entry(cerb_vault_t *vault, const char *entry_id, cerb_entry_t *entry) { + if (!vault || !entry_id || !entry) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + for (size_t i = 0; i < v->num_entries; i++) { + if (strcmp(v->entries[i].id, entry_id) == 0) { + *entry = v->entries[i]; + return CERB_OK; + } + } + + return CERB_NOT_FOUND; +} + +// Get all entries (returns a newly allocated array the caller must free) +cerb_error_t cerb_vault_get_entries(cerb_vault_t *vault, cerb_entry_t **entries, size_t *count) { + if (!vault || !entries || !count) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + if (v->num_entries == 0) { + *entries = NULL; + *count = 0; + return CERB_OK; + } + + cerb_entry_t *out = calloc(v->num_entries, sizeof(cerb_entry_t)); + if (!out) return CERB_MEMORY_ERROR; + + memcpy(out, v->entries, v->num_entries * sizeof(cerb_entry_t)); + *entries = out; + *count = v->num_entries; + return CERB_OK; +} + +// Basic substring search across website, username, and url +cerb_error_t cerb_vault_search(cerb_vault_t *vault, const char *query, cerb_entry_t **results, size_t *count) { + if (!vault || !results || !count) return CERB_INVALID_ARG; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + if (!query || *query == '\0') { + return cerb_vault_get_entries(vault, results, count); + } + + size_t matched = 0; + // First pass: count + for (size_t i = 0; i < v->num_entries; i++) { + if ((strstr(v->entries[i].website, query) != NULL) || + (strstr(v->entries[i].username, query) != NULL) || + (strstr(v->entries[i].url, query) != NULL)) { + matched++; + } + } + + if (matched == 0) { + *results = NULL; + *count = 0; + return CERB_OK; + } + + cerb_entry_t *out = calloc(matched, sizeof(cerb_entry_t)); + if (!out) return CERB_MEMORY_ERROR; + + size_t idx = 0; + for (size_t i = 0; i < v->num_entries; i++) { + if ((strstr(v->entries[i].website, query) != NULL) || + (strstr(v->entries[i].username, query) != NULL) || + (strstr(v->entries[i].url, query) != NULL)) { + out[idx++] = v->entries[i]; + } + } + + *results = out; + *count = matched; + return CERB_OK; +} + +// Basic variants for FFI (flattened struct) +static void entry_to_basic(const cerb_entry_t *in, cerb_entry_basic_t *out) { + memset(out, 0, sizeof(*out)); + strncpy(out->id, in->id, sizeof(out->id)-1); + strncpy(out->website, in->website, sizeof(out->website)-1); + strncpy(out->username, in->username, sizeof(out->username)-1); + strncpy(out->password, in->password, sizeof(out->password)-1); + strncpy(out->notes, in->notes, sizeof(out->notes)-1); + strncpy(out->url, in->url, sizeof(out->url)-1); + out->created_at = in->created_at; + out->updated_at = in->updated_at; +} + +static void basic_to_entry(const cerb_entry_basic_t *in, cerb_entry_t *out) { + memset(out, 0, sizeof(*out)); + strncpy(out->id, in->id, sizeof(out->id)-1); + strncpy(out->website, in->website, sizeof(out->website)-1); + strncpy(out->username, in->username, sizeof(out->username)-1); + strncpy(out->password, in->password, sizeof(out->password)-1); + strncpy(out->notes, in->notes, sizeof(out->notes)-1); + strncpy(out->url, in->url, sizeof(out->url)-1); + out->created_at = in->created_at; + out->updated_at = in->updated_at; +} + +cerb_error_t cerb_vault_add_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry) { + cerb_entry_t full; + basic_to_entry(entry, &full); + return cerb_vault_add_entry(vault, &full); +} + +cerb_error_t cerb_vault_update_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry) { + cerb_entry_t full; + basic_to_entry(entry, &full); + return cerb_vault_update_entry(vault, &full); +} + +cerb_error_t cerb_vault_get_entry_basic(cerb_vault_t *vault, const char *entry_id, cerb_entry_basic_t *entry) { + cerb_entry_t full; + cerb_error_t r = cerb_vault_get_entry(vault, entry_id, &full); + if (r != CERB_OK) return r; + entry_to_basic(&full, entry); + return CERB_OK; +} + +cerb_error_t cerb_vault_get_entries_basic(cerb_vault_t *vault, cerb_entry_basic_t **entries, size_t *count) { + cerb_entry_t *full = NULL; + cerb_error_t r = cerb_vault_get_entries(vault, &full, count); + if (r != CERB_OK) return r; + if (*count == 0) { *entries = NULL; return CERB_OK; } + cerb_entry_basic_t *out = calloc(*count, sizeof(cerb_entry_basic_t)); + if (!out) return CERB_MEMORY_ERROR; + for (size_t i = 0; i < *count; i++) entry_to_basic(&full[i], &out[i]); + *entries = out; + free(full); + return CERB_OK; +} + +cerb_error_t cerb_vault_search_basic(cerb_vault_t *vault, const char *query, cerb_entry_basic_t **results, size_t *count) { + cerb_entry_t *full = NULL; + cerb_error_t r = cerb_vault_search(vault, query, &full, count); + if (r != CERB_OK) return r; + if (*count == 0) { *results = NULL; return CERB_OK; } + cerb_entry_basic_t *out = calloc(*count, sizeof(cerb_entry_basic_t)); + if (!out) return CERB_MEMORY_ERROR; + for (size_t i = 0; i < *count; i++) entry_to_basic(&full[i], &out[i]); + *results = out; + free(full); + return CERB_OK; +} + +// Generate password +cerb_error_t cerb_generate_password( + uint32_t length, + bool use_upper, + bool use_lower, + bool use_digits, + bool use_special, + char *buffer, + size_t buffer_size +) { + if (!buffer || length < 8 || length > MAX_PASSWORD_LEN || buffer_size < length + 1) { + return CERB_INVALID_ARG; + } + + const char *lower = "abcdefghijklmnopqrstuvwxyz"; + const char *upper = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + const char *digits = "0123456789"; + const char *special = "!@#$%^&*()-_=+[]{}|;:,.<>?"; + + char charset[256] = {0}; + size_t pos = 0; + + if (use_lower) { strcpy(charset + pos, lower); pos += strlen(lower); } + if (use_upper) { strcpy(charset + pos, upper); pos += strlen(upper); } + if (use_digits) { strcpy(charset + pos, digits); pos += strlen(digits); } + if (use_special) { strcpy(charset + pos, special); pos += strlen(special); } + + if (pos == 0) return CERB_INVALID_ARG; + + // Generate random password + for (size_t i = 0; i < length; i++) { + unsigned char byte; + do { + if (RAND_bytes(&byte, 1) != 1) { + return CERB_CRYPTO_ERROR; + } + } while (byte >= (256 / pos) * pos); + + buffer[i] = charset[byte % pos]; + } + + buffer[length] = '\0'; + return CERB_OK; +} + +// Generate UUID v4 (xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx) +void cerb_generate_uuid(char *uuid) { + unsigned char bytes[16]; + if (RAND_bytes(bytes, sizeof(bytes)) != 1) { + // Fallback to zeroed UUID on failure + memset(uuid, '0', 36); + uuid[8] = uuid[13] = uuid[18] = uuid[23] = '-'; + uuid[36] = '\0'; + return; + } + // Set version (4) + bytes[6] = (bytes[6] & 0x0F) | 0x40; + // Set variant (10xx) + bytes[8] = (bytes[8] & 0x3F) | 0x80; + + static const char *hex = "0123456789abcdef"; + int p = 0; + for (int i = 0; i < 16; i++) { + if (i == 4 || i == 6 || i == 8 || i == 10) { + uuid[p++] = '-'; + } + uuid[p++] = hex[(bytes[i] >> 4) & 0x0F]; + uuid[p++] = hex[bytes[i] & 0x0F]; + } + uuid[p] = '\0'; +} + +// Get current timestamp +time_t cerb_current_timestamp(void) { + return time(NULL); +} + +// Cleanup vault +void cerb_vault_close(cerb_vault_t *vault) { + if (!vault) return; + + cerb_vault_internal_t *v = (cerb_vault_internal_t *)vault; + + // Securely wipe sensitive data + memset(v->key, 0, KEY_LEN); + memset(v->salt, 0, SALT_LEN); + + // Wipe entries + for (size_t i = 0; i < v->num_entries; i++) { + memset(&v->entries[i], 0, sizeof(cerb_entry_t)); + } + + free(v->entries); + free(v); +} diff --git a/src/cerberus/core/cerberus.h b/src/cerberus/core/cerberus.h new file mode 100644 index 0000000..06c736a --- /dev/null +++ b/src/cerberus/core/cerberus.h @@ -0,0 +1,144 @@ +#ifndef CERBERUS_CORE_H +#define CERBERUS_CORE_H + +#include <stdint.h> +#include <stdbool.h> +#include <time.h> + +// Maximum lengths for fields +#define MAX_WEBSITE_LEN 256 +#define MAX_USERNAME_LEN 256 +#define MAX_PASSWORD_LEN 1024 +#define MAX_NOTES_LEN 4096 +#define MAX_TAGS 32 +#define MAX_TAG_LEN 64 +#define MAX_CUSTOM_FIELDS 32 +#define MAX_CUSTOM_KEY_LEN 64 +#define MAX_CUSTOM_VALUE_LEN 1024 +#define MAX_ENTRIES 65536 +#define SALT_LEN 32 +#define KEY_LEN 32 // 256 bits for AES-256 +#define IV_LEN 16 // 128 bits for AES block size +#define PBKDF2_ITERATIONS 100000 + +// Error codes +typedef enum { + CERB_OK = 0, + CERB_ERROR = -1, + CERB_INVALID_ARG = -2, + CERB_NOT_FOUND = -3, + CERB_DUPLICATE = -4, + CERB_STORAGE_ERROR = -5, + CERB_CRYPTO_ERROR = -6, + CERB_MEMORY_ERROR = -7, + CERB_INVALID_STATE = -8, + CERB_NOT_IMPLEMENTED = -9 +} cerb_error_t; + +// Custom field structure +typedef struct { + char key[MAX_CUSTOM_KEY_LEN]; + char value[MAX_CUSTOM_VALUE_LEN]; +} cerb_custom_field_t; + +// Password entry structure +typedef struct { + char id[37]; // UUID string (36 chars + null terminator) + char website[MAX_WEBSITE_LEN]; + char username[MAX_USERNAME_LEN]; + char password[MAX_PASSWORD_LEN]; + char notes[MAX_NOTES_LEN]; + char url[1024]; + char tags[MAX_TAGS][MAX_TAG_LEN]; + size_t num_tags; + time_t created_at; + time_t updated_at; + time_t last_used; + bool favorite; + cerb_custom_field_t custom_fields[MAX_CUSTOM_FIELDS]; + size_t num_custom_fields; +} cerb_entry_t; + +// Vault structure +typedef struct cerb_vault_t cerb_vault_t; + +// Basic entry struct for FFI bindings (avoids nested arrays) +typedef struct { + char id[37]; + char website[MAX_WEBSITE_LEN]; + char username[MAX_USERNAME_LEN]; + char password[MAX_PASSWORD_LEN]; + char notes[MAX_NOTES_LEN]; + char url[1024]; + time_t created_at; + time_t updated_at; +} cerb_entry_basic_t; + +// Core API + +// Initialize the crypto subsystem +cerb_error_t cerb_crypto_init(void); + +// Cleanup the crypto subsystem +void cerb_crypto_cleanup(void); + +// Initialize a new vault +cerb_error_t cerb_vault_create(const char *master_password, cerb_vault_t **vault); + +// Open an existing vault +cerb_error_t cerb_vault_open(const char *master_password, const char *vault_path, cerb_vault_t **vault); + +// Save vault to file +cerb_error_t cerb_vault_save(cerb_vault_t *vault, const char *vault_path); + +// Close and free a vault +void cerb_vault_close(cerb_vault_t *vault); + +// Add a new entry to the vault +cerb_error_t cerb_vault_add_entry(cerb_vault_t *vault, const cerb_entry_t *entry); +cerb_error_t cerb_vault_add_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry); + +// Update an existing entry +cerb_error_t cerb_vault_update_entry(cerb_vault_t *vault, const cerb_entry_t *entry); +cerb_error_t cerb_vault_update_entry_basic(cerb_vault_t *vault, const cerb_entry_basic_t *entry); + +// Delete an entry by ID +cerb_error_t cerb_vault_delete_entry(cerb_vault_t *vault, const char *entry_id); + +// Get an entry by ID +cerb_error_t cerb_vault_get_entry(cerb_vault_t *vault, const char *entry_id, cerb_entry_t *entry); +cerb_error_t cerb_vault_get_entry_basic(cerb_vault_t *vault, const char *entry_id, cerb_entry_basic_t *entry); + +// Get all entries +cerb_error_t cerb_vault_get_entries(cerb_vault_t *vault, cerb_entry_t **entries, size_t *count); +cerb_error_t cerb_vault_get_entries_basic(cerb_vault_t *vault, cerb_entry_basic_t **entries, size_t *count); + +// Search entries by query string +cerb_error_t cerb_vault_search(cerb_vault_t *vault, const char *query, cerb_entry_t **results, size_t *count); +cerb_error_t cerb_vault_search_basic(cerb_vault_t *vault, const char *query, cerb_entry_basic_t **results, size_t *count); + +// Generate a secure random password +cerb_error_t cerb_generate_password( + uint32_t length, + bool use_upper, + bool use_lower, + bool use_digits, + bool use_special, + char *buffer, + size_t buffer_size +); + +// Import from other password managers +cerb_error_t cerb_import_bitwarden_json(cerb_vault_t *vault, const char *json_path); +cerb_error_t cerb_import_lastpass_csv(cerb_vault_t *vault, const char *csv_path); +cerb_error_t cerb_import_chrome_csv(cerb_vault_t *vault, const char *csv_path); + +// Export to various formats +cerb_error_t cerb_export_json(cerb_vault_t *vault, const char *json_path); +cerb_error_t cerb_export_csv(cerb_vault_t *vault, const char *csv_path); + +// Utility functions +void cerb_generate_uuid(char *uuid); +time_t cerb_current_timestamp(void); + +#endif // CERBERUS_CORE_H diff --git a/src/cerberus/core/cerberus.so b/src/cerberus/core/cerberus.so Binary files differnew file mode 100755 index 0000000..2cdd65a --- /dev/null +++ b/src/cerberus/core/cerberus.so diff --git a/src/cerberus/core/models.py b/src/cerberus/core/models.py new file mode 100644 index 0000000..ecdcb3c --- /dev/null +++ b/src/cerberus/core/models.py @@ -0,0 +1,58 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import Optional, List, Dict, Any +import json + +@dataclass +class PasswordEntry: + """Represents a single password entry in the password manager.""" + id: str + website: str + username: str + password: str + url: str = "" + notes: str = "" + tags: List[str] = field(default_factory=list) + created_at: datetime = field(default_factory=datetime.utcnow) + updated_at: datetime = field(default_factory=datetime.utcnow) + last_used: Optional[datetime] = None + password_strength: Optional[float] = None + compromised: bool = False + custom_fields: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + """Convert the password entry to a dictionary for serialization.""" + return { + 'id': self.id, + 'website': self.website, + 'username': self.username, + 'password': self.password, + 'url': self.url, + 'notes': self.notes, + 'tags': self.tags, + 'created_at': self.created_at.isoformat(), + 'updated_at': self.updated_at.isoformat(), + 'last_used': self.last_used.isoformat() if self.last_used else None, + 'password_strength': self.password_strength, + 'compromised': self.compromised, + 'custom_fields': self.custom_fields + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'PasswordEntry': + """Create a PasswordEntry from a dictionary.""" + return cls( + id=data.get('id', ''), + website=data['website'], + username=data['username'], + password=data['password'], + url=data.get('url', ''), + notes=data.get('notes', ''), + tags=data.get('tags', []), + created_at=datetime.fromisoformat(data['created_at']), + updated_at=datetime.fromisoformat(data['updated_at']), + last_used=datetime.fromisoformat(data['last_used']) if data.get('last_used') else None, + password_strength=data.get('password_strength'), + compromised=data.get('compromised', False), + custom_fields=data.get('custom_fields', {}) + ) diff --git a/src/cerberus/core/password_manager.py b/src/cerberus/core/password_manager.py new file mode 100644 index 0000000..018c727 --- /dev/null +++ b/src/cerberus/core/password_manager.py @@ -0,0 +1,462 @@ +import os +import json +import logging +from pathlib import Path +from typing import Dict, List, Optional, Union, Any +from datetime import datetime + +logger = logging.getLogger(__name__) + +# Import the C core from our package +try: + from . import ( + cerb_crypto_init, cerb_crypto_cleanup, + cerb_vault_create, cerb_vault_open, cerb_vault_save, cerb_vault_close, + cerb_vault_add_entry_basic, cerb_vault_update_entry_basic, cerb_vault_delete_entry, + cerb_vault_get_entry_basic, cerb_vault_get_entries_basic, cerb_vault_search_basic, + cerb_generate_password, cerb_generate_uuid, cerb_current_timestamp, + CERB_OK, CERB_ERROR, ffi + ) + CORE_AVAILABLE = True +except (ImportError, OSError) as e: + CORE_AVAILABLE = False + logger.warning("Cerberus C core not available. Using Python fallback.") + +from .models import PasswordEntry +from ..integrations import get_integration, IntegrationError + +class VaultError(Exception): + """Base exception for vault-related errors.""" + pass + +class CoreNotAvailableError(VaultError): + """Raised when the C core is not available.""" + pass + +class PasswordManager: + """Core password manager with C-based encryption and integration support.""" + + def __init__(self, data_dir: str = None, master_password: str = None): + """Initialize the password manager. + + Args: + data_dir: Directory to store password data + master_password: Master password for encryption + """ + if not CORE_AVAILABLE: + raise CoreNotAvailableError( + "Cerberus C core not available. Please compile it first." + ) + + self.data_dir = Path(data_dir or os.path.expanduser("~/.cerberus_pm")) + self.data_dir.mkdir(parents=True, exist_ok=True) + + self.master_password = master_password + self.vault_file = self.data_dir / "vault.cerb" + self._vault = ffi.NULL + + # Initialize crypto + if cerb_crypto_init() != CERB_OK: + raise VaultError("Failed to initialize crypto") + + # Create or load vault if master password is provided + if master_password: + if self.vault_file.exists(): + self._open_vault() + else: + self._create_vault() + + def _create_vault(self): + """Create a new vault.""" + if not self.master_password: + raise VaultError("Master password is required to create a vault") + + vault_ptr = ffi.new("cerb_vault_t**") + result = cerb_vault_create( + self.master_password.encode('utf-8'), + vault_ptr + ) + + if result != CERB_OK: + raise VaultError(f"Failed to create vault: error code {result}") + + self._vault = vault_ptr[0] + logger.info("Created new vault") + + def _open_vault(self): + """Open an existing vault.""" + if not self.vault_file.exists(): + raise VaultError(f"Vault file not found: {self.vault_file}") + + if not self.master_password: + raise VaultError("Master password is required to open the vault") + + # Open existing vault from file using C core + vault_ptr = ffi.new("cerb_vault_t**") + result = cerb_vault_open( + self.master_password.encode('utf-8'), + str(self.vault_file).encode('utf-8'), + vault_ptr + ) + if result != CERB_OK: + raise VaultError(f"Failed to open vault: error code {result}") + self._vault = vault_ptr[0] + + def save_vault(self): + """Save the vault to disk.""" + if self._vault == ffi.NULL: + raise VaultError("No vault is open") + + result = cerb_vault_save(self._vault, str(self.vault_file).encode('utf-8')) + if result != CERB_OK: + raise VaultError(f"Failed to save vault: error code {result}") + + def initialize(self, master_password: str) -> bool: + """Initialize the password manager with a master password. + + Args: + master_password: The master password + + Returns: + bool: True if initialization was successful + """ + self.master_password = master_password + self._create_vault() + return True + + def unlock(self, master_password: str) -> bool: + """Unlock the password manager with the master password. + + Args: + master_password: The master password + + Returns: + bool: True if unlock was successful + """ + try: + self.master_password = master_password + self._open_vault() + return True + except VaultError as e: + logger.error(f"Failed to unlock password manager: {e}") + return False + + def _find_entries(self, query: str = None) -> List[PasswordEntry]: + """Find entries matching the query. + + Args: + query: Search query (e.g., 'website:example.com', 'tag:work') + + Returns: + List of matching PasswordEntry objects + """ + if self._vault == ffi.NULL: + raise VaultError("No vault is open") + + entries_ptr = ffi.new("cerb_entry_basic_t**") + count_ptr = ffi.new("size_t*") + if query: + result = cerb_vault_search_basic(self._vault, query.encode('utf-8'), entries_ptr, count_ptr) + else: + result = cerb_vault_get_entries_basic(self._vault, entries_ptr, count_ptr) + if result != CERB_OK: + raise VaultError(f"Search failed: error code {result}") + + results: List[PasswordEntry] = [] + count = int(count_ptr[0]) + if count == 0: + return results + + entries_array = entries_ptr[0] + for i in range(count): + c_entry = entries_array[i] + results.append(PasswordEntry( + id=ffi.string(c_entry.id).decode('utf-8'), + website=ffi.string(c_entry.website).decode('utf-8'), + username=ffi.string(c_entry.username).decode('utf-8'), + password=ffi.string(c_entry.password).decode('utf-8'), + url=ffi.string(c_entry.url).decode('utf-8'), + notes=ffi.string(c_entry.notes).decode('utf-8'), + )) + + # Caller-owned memory: free via C if a free function exists; else rely on C API contract + return results + + def add_password(self, entry: PasswordEntry) -> str: + """Add a new password entry to the vault. + + Args: + entry: The password entry to add + + Returns: + str: The ID of the new entry + + Raises: + VaultError: If the vault is not open or an error occurs + """ + if self._vault == ffi.NULL: + raise VaultError("No vault is open") + + # Create a new C entry + c_entry = ffi.new("cerb_entry_basic_t*") + + # Generate a new UUID if not provided + if entry.id: + entry_id = entry.id + else: + uuid_buf = ffi.new("char[37]") + cerb_generate_uuid(uuid_buf) + entry_id = ffi.string(uuid_buf).decode('utf-8') + + # Set fields + ffi.memmove(c_entry.id, entry_id.encode('utf-8'), len(entry_id)) + c_entry.id[len(entry_id)] = b'\0' + for field_name in ["website", "username", "password", "notes", "url"]: + val = (getattr(entry, field_name) or '').encode('utf-8') + buf = getattr(c_entry, field_name) + ffi.memmove(buf, val, len(val)) + buf[len(val)] = b'\0' + c_entry.created_at = int(entry.created_at.timestamp()) if entry.created_at else cerb_current_timestamp() + c_entry.updated_at = int(entry.updated_at.timestamp()) if entry.updated_at else c_entry.created_at + + # Add to vault + result = cerb_vault_add_entry_basic(self._vault, c_entry) + if result != CERB_OK: + raise VaultError(f"Failed to add entry: error code {result}") + + # Save the vault + self.save_vault() + + return entry_id + + def update_password(self, entry_id: str, **updates) -> bool: + """Update an existing password entry. + + Args: + entry_id: The ID of the entry to update + **updates: Fields to update + + Returns: + bool: True if the update was successful + + Raises: + VaultError: If the vault is not open or an error occurs + ValueError: If the entry is not found + """ + if self._vault == ffi.NULL: + raise VaultError("No vault is open") + + # Fetch, modify, send to C update + c_existing = ffi.new("cerb_entry_basic_t*") + res = cerb_vault_get_entry_basic(self._vault, entry_id.encode('utf-8'), c_existing) + if res != CERB_OK: + raise ValueError(f"Entry with ID {entry_id} not found") + + # Apply updates + def _set_field(dst, value: str, max_len: int): + data = (value or '').encode('utf-8') + ln = min(len(data), max_len - 1) + ffi.memmove(dst, data, ln) + dst[ln] = b'\0' + + for key, value in updates.items(): + if key == 'website': _set_field(c_existing.website, value, 256) + elif key == 'username': _set_field(c_existing.username, value, 256) + elif key == 'password': _set_field(c_existing.password, value, 1024) + elif key == 'notes': _set_field(c_existing.notes, value, 4096) + elif key == 'url': _set_field(c_existing.url, value, 1024) + + c_existing.updated_at = int(datetime.utcnow().timestamp()) + + res = cerb_vault_update_entry_basic(self._vault, c_existing) + if res != CERB_OK: + raise VaultError(f"Failed to update entry: error code {res}") + self.save_vault() + return True + + def delete_password(self, entry_id: str) -> bool: + """Delete a password entry. + + Args: + entry_id: The ID of the entry to delete + + Returns: + bool: True if the deletion was successful + + Raises: + VaultError: If the vault is not open or an error occurs + """ + if self._vault == ffi.NULL: + raise VaultError("No vault is open") + + result = cerb_vault_delete_entry(self._vault, entry_id.encode('utf-8')) + if result != CERB_OK: + raise VaultError(f"Failed to delete entry: error code {result}") + + # Save the vault + self.save_vault() + return True + + def get_password(self, entry_id: str) -> Optional[PasswordEntry]: + """Get a password entry by ID. + + Args: + entry_id: The ID of the entry to retrieve + + Returns: + PasswordEntry if found, None otherwise + """ + c_entry = ffi.new("cerb_entry_basic_t*") + res = cerb_vault_get_entry_basic(self._vault, entry_id.encode('utf-8'), c_entry) + if res != CERB_OK: + return None + return PasswordEntry( + id=ffi.string(c_entry.id).decode('utf-8'), + website=ffi.string(c_entry.website).decode('utf-8'), + username=ffi.string(c_entry.username).decode('utf-8'), + password=ffi.string(c_entry.password).decode('utf-8'), + url=ffi.string(c_entry.url).decode('utf-8'), + notes=ffi.string(c_entry.notes).decode('utf-8'), + ) + + def list_passwords(self) -> List[PasswordEntry]: + """List all password entries. + + Returns: + List of all PasswordEntry objects + """ + return self._find_entries() + + def search_passwords(self, query: str) -> List[PasswordEntry]: + """Search password entries by website, username, or tags. + + Args: + query: Search query + + Returns: + List of matching PasswordEntry objects + """ + return self._find_entries(query) + + @staticmethod + def generate_password( + length: int = 16, + use_upper: bool = True, + use_lower: bool = True, + use_digits: bool = True, + use_special: bool = True + ) -> str: + """Generate a secure random password using the C core. + + Args: + length: Length of the password (8-1024) + use_upper: Include uppercase letters + use_lower: Include lowercase letters + use_digits: Include digits + use_special: Include special characters + + Returns: + The generated password + + Raises: + VaultError: If password generation fails + """ + if not CORE_AVAILABLE: + raise CoreNotAvailableError("C core not available for password generation") + + # Validate length + if length < 8 or length > 1024: + raise ValueError("Password length must be between 8 and 1024 characters") + + # At least one character set must be selected + if not (use_upper or use_lower or use_digits or use_special): + raise ValueError("At least one character set must be selected") + + # Allocate buffer for the password (+1 for null terminator) + buffer = ffi.new(f"char[{length + 1}]") # +1 for null terminator + + # Generate the password + result = cerb_generate_password( + length, + use_upper, + use_lower, + use_digits, + use_special, + buffer, + length + 1 # Include space for null terminator + ) + + if result != CERB_OK: + raise VaultError(f"Failed to generate password: error code {result}") + + # Convert from C string to Python string + password = ffi.string(buffer).decode('utf-8') + return password + + # ---- Convenience methods for CLI/TUI compatibility ---- + def generate_id(self) -> str: + """Generate a UUID string using the C core.""" + if not CORE_AVAILABLE: + raise CoreNotAvailableError("C core not available for ID generation") + uuid_buf = ffi.new("char[37]") + cerb_generate_uuid(uuid_buf) + return ffi.string(uuid_buf).decode("utf-8") + + def get_entries(self) -> List[PasswordEntry]: + """Alias for list_passwords().""" + return self.list_passwords() + + def add_entry(self, entry: PasswordEntry) -> str: + """Alias for add_password(entry).""" + return self.add_password(entry) + + def update_entry(self, entry: PasswordEntry) -> bool: + """Update an existing entry using the entry object.""" + fields = { + "website": entry.website, + "username": entry.username, + "password": entry.password, + "notes": entry.notes, + "url": entry.url, + } + return self.update_password(entry.id, **fields) + + def delete_entry(self, entry_id: str) -> bool: + """Alias for delete_password(entry_id).""" + return self.delete_password(entry_id) + + def get_entry(self, identifier: str) -> PasswordEntry: + """Get an entry by ID, or by exact website match as a fallback.""" + entry = self.get_password(identifier) + if entry is not None: + return entry + # Fallback: search by exact website name + matches = [e for e in self.list_passwords() if (e.website or "").lower() == (identifier or "").lower()] + if not matches: + raise ValueError(f"Entry not found: {identifier}") + if len(matches) > 1: + raise ValueError(f"Multiple entries found for website '{identifier}'. Use the entry ID instead.") + return matches[0] + + def generate_password_easy( + self, + length: int = 16, + special: bool = True, + upper: Optional[bool] = None, + lower: Optional[bool] = None, + digits: Optional[bool] = None, + ) -> str: + """Friendly wrapper for password generation used by new CLI/TUI. + + Args: + length: desired length + special: include special characters (maps to use_special) + upper, lower, digits: if None use defaults (True). If bool provided, override. + """ + return PasswordManager.generate_password( + length=length, + use_upper=True if upper is None else upper, + use_lower=True if lower is None else lower, + use_digits=True if digits is None else digits, + use_special=special, + ) diff --git a/src/cerberus/gui/__init__.py b/src/cerberus/gui/__init__.py new file mode 100644 index 0000000..5cc77e9 --- /dev/null +++ b/src/cerberus/gui/__init__.py @@ -0,0 +1,6 @@ +""" +Graphical User Interface (GUI) for Cerberus Password Manager. +""" +from .main_window import run_app + +__all__ = ["run_app"] diff --git a/src/cerberus/gui/main_window.py b/src/cerberus/gui/main_window.py new file mode 100644 index 0000000..a7cac07 --- /dev/null +++ b/src/cerberus/gui/main_window.py @@ -0,0 +1,269 @@ +""" +PyQt6-based GUI for Cerberus Password Manager. +""" +from __future__ import annotations + +import sys +from dataclasses import asdict +from typing import Optional, List + +from PyQt6.QtCore import Qt +from PyQt6.QtWidgets import ( + QApplication, + QMainWindow, + QWidget, + QVBoxLayout, + QHBoxLayout, + QListWidget, + QListWidgetItem, + QPushButton, + QLabel, + QLineEdit, + QTextEdit, + QMessageBox, + QInputDialog, + QFileDialog, +) + +from ..core.password_manager import PasswordManager, VaultError +from ..core.models import PasswordEntry + + +class MainWindow(QMainWindow): + def __init__(self, pm: PasswordManager): + super().__init__() + self.pm = pm + self.setWindowTitle("Cerberus Password Manager") + self.resize(900, 600) + + # Root container + root = QWidget() + layout = QHBoxLayout() + root.setLayout(layout) + self.setCentralWidget(root) + + # Left: list + left = QWidget() + left_layout = QVBoxLayout() + left.setLayout(left_layout) + layout.addWidget(left, 1) + + self.search_input = QLineEdit() + self.search_input.setPlaceholderText("Search website, username, tags...") + self.search_input.textChanged.connect(self.refresh_list) + left_layout.addWidget(self.search_input) + + self.list_widget = QListWidget() + self.list_widget.itemSelectionChanged.connect(self.on_selection_changed) + left_layout.addWidget(self.list_widget, 1) + + btn_bar = QHBoxLayout() + self.btn_add = QPushButton("Add") + self.btn_edit = QPushButton("Edit") + self.btn_rotate = QPushButton("Rotate") + self.btn_delete = QPushButton("Delete") + self.btn_export = QPushButton("Export") + self.btn_import = QPushButton("Import") + btn_bar.addWidget(self.btn_add) + btn_bar.addWidget(self.btn_edit) + btn_bar.addWidget(self.btn_rotate) + btn_bar.addWidget(self.btn_delete) + btn_bar.addWidget(self.btn_export) + btn_bar.addWidget(self.btn_import) + left_layout.addLayout(btn_bar) + + # Right: detail + right = QWidget() + right_layout = QVBoxLayout() + right.setLayout(right_layout) + layout.addWidget(right, 2) + + self.lbl_website = QLineEdit() + self.lbl_username = QLineEdit() + self.lbl_password = QLineEdit() + self.lbl_password.setEchoMode(QLineEdit.EchoMode.Password) + self.lbl_url = QLineEdit() + self.txt_notes = QTextEdit() + + right_layout.addWidget(QLabel("Website")) + right_layout.addWidget(self.lbl_website) + right_layout.addWidget(QLabel("Username")) + right_layout.addWidget(self.lbl_username) + right_layout.addWidget(QLabel("Password")) + right_layout.addWidget(self.lbl_password) + right_layout.addWidget(QLabel("URL")) + right_layout.addWidget(self.lbl_url) + right_layout.addWidget(QLabel("Notes")) + right_layout.addWidget(self.txt_notes, 1) + + act_bar = QHBoxLayout() + self.btn_save = QPushButton("Save") + self.btn_copy_user = QPushButton("Copy User") + self.btn_copy_pass = QPushButton("Copy Pass") + act_bar.addWidget(self.btn_save) + act_bar.addWidget(self.btn_copy_user) + act_bar.addWidget(self.btn_copy_pass) + right_layout.addLayout(act_bar) + + # Wire actions + self.btn_add.clicked.connect(self.on_add) + self.btn_edit.clicked.connect(self.on_edit) + self.btn_rotate.clicked.connect(self.on_rotate) + self.btn_delete.clicked.connect(self.on_delete) + self.btn_export.clicked.connect(self.on_export) + self.btn_import.clicked.connect(self.on_import) + self.btn_save.clicked.connect(self.on_save) + self.btn_copy_user.clicked.connect(self.on_copy_user) + self.btn_copy_pass.clicked.connect(self.on_copy_pass) + + # Data + self.entries: List[PasswordEntry] = [] + self.current: Optional[PasswordEntry] = None + self.refresh_list() + + def refresh_list(self) -> None: + query = self.search_input.text().lower().strip() + try: + self.entries = self.pm.get_entries() + except Exception as e: + QMessageBox.critical(self, "Error", f"Failed to load entries: {e}") + self.entries = [] + self.list_widget.clear() + for e in self.entries: + if query and not ( + query in (e.website or '').lower() + or query in (e.username or '').lower() + or query in (e.notes or '').lower() + or any(query in t.lower() for t in (e.tags or [])) + ): + continue + item = QListWidgetItem(f"{e.website} — {e.username}") + item.setData(Qt.ItemDataRole.UserRole, e.id) + self.list_widget.addItem(item) + + def on_selection_changed(self) -> None: + item = self.list_widget.currentItem() + if not item: + self.current = None + return + entry_id = item.data(Qt.ItemDataRole.UserRole) + try: + # get_entry may accept id or website; ensure id fetch + entry = self.pm.get_entry(entry_id) + except Exception: + # fallback: find in self.entries + entry = next((x for x in self.entries if x.id == entry_id), None) + self.current = entry + if entry: + self.lbl_website.setText(entry.website) + self.lbl_username.setText(entry.username) + self.lbl_password.setText(entry.password) + self.lbl_url.setText(entry.url) + self.txt_notes.setText(entry.notes) + + def on_add(self) -> None: + website, ok = QInputDialog.getText(self, "Add Entry", "Website:") + if not ok or not website: + return + username, ok = QInputDialog.getText(self, "Add Entry", "Username:") + if not ok or not username: + return + # Generate or prompt for password + choice = QMessageBox.question(self, "Password", "Generate a strong password?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No) + if choice == QMessageBox.StandardButton.Yes: + password = self.pm.generate_password() + else: + password, ok = QInputDialog.getText(self, "Add Entry", "Password:") + if not ok or not password: + return + try: + entry = PasswordEntry( + id=self.pm.generate_id(), + website=website, + username=username, + password=password, + ) + self.pm.add_entry(entry) + self.refresh_list() + except Exception as e: + QMessageBox.critical(self, "Error", f"Failed to add entry: {e}") + + def on_edit(self) -> None: + if not self.current: + return + self.lbl_website.setFocus() + + def on_save(self) -> None: + if not self.current: + return + try: + self.current.website = self.lbl_website.text() + self.current.username = self.lbl_username.text() + self.current.password = self.lbl_password.text() + self.current.url = self.lbl_url.text() + self.current.notes = self.txt_notes.toPlainText() + self.pm.update_entry(self.current) + self.refresh_list() + except Exception as e: + QMessageBox.critical(self, "Error", f"Failed to save entry: {e}") + + def on_rotate(self) -> None: + if not self.current: + return + try: + new_password = self.pm.generate_password(length=24) + self.current.password = new_password + self.pm.update_entry(self.current) + self.lbl_password.setText(new_password) + QMessageBox.information(self, "Rotated", "Password rotated and saved.") + except Exception as e: + QMessageBox.critical(self, "Error", f"Failed to rotate: {e}") + + def on_delete(self) -> None: + if not self.current: + return + if QMessageBox.question(self, "Delete", f"Delete entry for {self.current.website}?") == QMessageBox.StandardButton.Yes: + try: + self.pm.delete_entry(self.current.id) + self.current = None + self.refresh_list() + except Exception as e: + QMessageBox.critical(self, "Error", f"Failed to delete: {e}") + + def on_copy_user(self) -> None: + if not self.current: + return + try: + import pyperclip + pyperclip.copy(self.current.username) + QMessageBox.information(self, "Copied", "Username copied to clipboard.") + except Exception as e: + QMessageBox.critical(self, "Error", f"Clipboard failed: {e}") + + def on_copy_pass(self) -> None: + if not self.current: + return + try: + import pyperclip + pyperclip.copy(self.current.password) + QMessageBox.information(self, "Copied", "Password copied to clipboard.") + except Exception as e: + QMessageBox.critical(self, "Error", f"Clipboard failed: {e}") + + +def run_app() -> None: + # Prompt for master password + app = QApplication(sys.argv) + from PyQt6.QtWidgets import QInputDialog + + pw, ok = QInputDialog.getText(None, "Cerberus", "Master password:") + if not ok or not pw: + return + try: + pm = PasswordManager(master_password=pw) + except VaultError as e: + QMessageBox.critical(None, "Error", f"Failed to unlock vault: {e}") + return + win = MainWindow(pm) + win.show() + sys.exit(app.exec()) diff --git a/src/cerberus/integrations/__init__.py b/src/cerberus/integrations/__init__.py new file mode 100644 index 0000000..2d81122 --- /dev/null +++ b/src/cerberus/integrations/__init__.py @@ -0,0 +1,111 @@ +"""Password manager integrations for Cerberus.""" + +from typing import Dict, Type, List, Optional, Any +from pathlib import Path +import importlib +import json + +from ..core.models import PasswordEntry + +class IntegrationError(Exception): + """Base exception for integration errors.""" + pass + +class BaseIntegration: + """Base class for password manager integrations.""" + + def __init__(self, **kwargs): + """Initialize the integration with any required parameters.""" + self.connected = False + + def connect(self, **kwargs) -> bool: + """Connect to the password manager. + + Returns: + bool: True if connection was successful + """ + raise NotImplementedError + + def disconnect(self): + """Disconnect from the password manager.""" + self.connected = False + + def list_entries(self) -> List[PasswordEntry]: + """List all password entries. + + Returns: + List of PasswordEntry objects + """ + raise NotImplementedError + + def export_entries(self, output_path: Path) -> bool: + """Export entries to a file. + + Args: + output_path: Path to save the exported data + + Returns: + bool: True if export was successful + """ + raise NotImplementedError + + def import_entries(self, input_path: Path) -> List[PasswordEntry]: + """Import entries from a file. + + Args: + input_path: Path to the file to import from + + Returns: + List of imported PasswordEntry objects + """ + raise NotImplementedError + +# Dictionary of available integrations +INTEGRATIONS: Dict[str, Type[BaseIntegration]] = {} + +def register_integration(name: str): + """Decorator to register an integration class.""" + def decorator(cls: Type[BaseIntegration]) -> Type[BaseIntegration]: + INTEGRATIONS[name.lower()] = cls + return cls + return decorator + +def get_integration(name: str, **kwargs) -> BaseIntegration: + """Get an instance of the specified integration. + + Args: + name: Name of the integration + **kwargs: Additional arguments to pass to the integration + + Returns: + An instance of the specified integration + + Raises: + IntegrationError: If the integration is not found + """ + name = name.lower() + if name not in INTEGRATIONS: + raise IntegrationError(f"Integration '{name}' not found") + + return INTEGRATIONS[name](**kwargs) + +def list_available_integrations() -> List[str]: + """List all available integrations. + + Returns: + List of integration names + """ + return list(INTEGRATIONS.keys()) + +# Import all integration modules to register them +# This will be populated by the individual integration modules +# that use the @register_integration decorator + +try: + from . import bitwarden # noqa + from . import lastpass # noqa + from . import keepass # noqa + from . import chrome # noqa +except ImportError as e: + # Some integrations may have additional dependencies + pass diff --git a/src/cerberus/integrations/__pycache__/__init__.cpython-313.pyc b/src/cerberus/integrations/__pycache__/__init__.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..fbceef5 --- /dev/null +++ b/src/cerberus/integrations/__pycache__/__init__.cpython-313.pyc diff --git a/src/cerberus/integrations/__pycache__/bitwarden.cpython-313.pyc b/src/cerberus/integrations/__pycache__/bitwarden.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..cb25ecf --- /dev/null +++ b/src/cerberus/integrations/__pycache__/bitwarden.cpython-313.pyc diff --git a/src/cerberus/integrations/__pycache__/chrome.cpython-313.pyc b/src/cerberus/integrations/__pycache__/chrome.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..02a7447 --- /dev/null +++ b/src/cerberus/integrations/__pycache__/chrome.cpython-313.pyc diff --git a/src/cerberus/integrations/__pycache__/keepass.cpython-313.pyc b/src/cerberus/integrations/__pycache__/keepass.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..ca99f53 --- /dev/null +++ b/src/cerberus/integrations/__pycache__/keepass.cpython-313.pyc diff --git a/src/cerberus/integrations/__pycache__/lastpass.cpython-313.pyc b/src/cerberus/integrations/__pycache__/lastpass.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..fac83f5 --- /dev/null +++ b/src/cerberus/integrations/__pycache__/lastpass.cpython-313.pyc diff --git a/src/cerberus/integrations/bitwarden.py b/src/cerberus/integrations/bitwarden.py new file mode 100644 index 0000000..7a4cd73 --- /dev/null +++ b/src/cerberus/integrations/bitwarden.py @@ -0,0 +1,268 @@ +import subprocess +import json +import logging +from typing import List, Dict, Optional, Any +from pathlib import Path +import os + +from ..core.models import PasswordEntry + +logger = logging.getLogger(__name__) + +class BitwardenCLIError(Exception): + """Exception raised for errors in the Bitwarden CLI.""" + pass + +class BitwardenIntegration: + """Integration with Bitwarden password manager.""" + + def __init__(self, email: str = None, password: str = None, session: str = None): + """Initialize the Bitwarden integration. + + Args: + email: Bitwarden account email + password: Bitwarden master password + session: Existing Bitwarden session key + """ + self.email = email + self.password = password + self.session = session + self.bw_path = self._find_bw() + + def _run_command(self, command: List[str], input_data: str = None) -> Dict: + """Run a Bitwarden CLI command and return the result.""" + try: + env = os.environ.copy() + if self.session: + env['BW_SESSION'] = self.session + + result = subprocess.run( + [self.bw_path] + command, + input=input_data.encode() if input_data else None, + capture_output=True, + check=True, + env=env + ) + + if result.stdout: + try: + return json.loads(result.stdout) + except json.JSONDecodeError: + return result.stdout.decode().strip() + return {} + + except subprocess.CalledProcessError as e: + error_msg = e.stderr.decode().strip() if e.stderr else str(e) + logger.error(f"Bitwarden CLI error: {error_msg}") + raise BitwardenCLIError(f"Bitwarden command failed: {error_msg}") + + @staticmethod + def _find_bw() -> str: + """Find the Bitwarden CLI executable.""" + # Check common locations + possible_paths = [ + '/usr/local/bin/bw', + '/usr/bin/bw', + 'bw' # Try PATH + ] + + for path in possible_paths: + try: + result = subprocess.run( + [path, '--version'], + capture_output=True, + text=True + ) + if result.returncode == 0: + logger.info(f"Found Bitwarden CLI at {path}") + return path + except (FileNotFoundError, subprocess.CalledProcessError): + continue + + raise BitwardenCLIError( + "Bitwarden CLI not found. Please install it from " + "https://bitwarden.com/help/cli/" + ) + + def login(self) -> bool: + """Log in to Bitwarden and get a session key.""" + if not self.email or not self.password: + raise BitwardenCLIError("Email and password are required for login") + + try: + # Log in and get the session key + result = subprocess.run( + [self.bw_path, 'login', self.email, self.password, '--raw'], + capture_output=True, + check=True, + text=True + ) + + self.session = result.stdout.strip() + return True + + except subprocess.CalledProcessError as e: + error_msg = e.stderr.decode().strip() if e.stderr else str(e) + logger.error(f"Bitwarden login failed: {error_msg}") + return False + + def logout(self) -> bool: + """Log out of Bitwarden.""" + try: + self._run_command(['logout']) + self.session = None + return True + except BitwardenCLIError: + return False + + def sync(self) -> bool: + """Sync with Bitwarden server.""" + try: + self._run_command(['sync']) + return True + except BitwardenCLIError: + return False + + def export_vault(self, output_file: str, format: str = 'encrypted_json') -> bool: + """Export the Bitwarden vault. + + Args: + output_file: Path to save the exported file + format: Export format ('encrypted_json', 'json', 'csv', 'encrypted_json') + + Returns: + bool: True if export was successful + """ + try: + result = self._run_command(['export', '--format', format, '--output', output_file]) + return True + except BitwardenCLIError: + return False + + def get_items(self, search: str = None) -> List[Dict]: + """Get items from the vault, optionally filtered by search term.""" + try: + if search: + return self._run_command(['list', 'items', '--search', search]) + return self._run_command(['list', 'items']) + except BitwardenCLIError: + return [] + + def get_item(self, item_id: str) -> Optional[Dict]: + """Get a specific item by ID.""" + try: + return self._run_command(['get', 'item', item_id]) + except BitwardenCLIError: + return None + + def create_item(self, item_data: Dict) -> Optional[Dict]: + """Create a new item in the vault.""" + try: + return self._run_command( + ['create', 'item'], + input_data=json.dumps(item_data) + ) + except BitwardenCLIError: + return None + + def update_item(self, item_id: str, item_data: Dict) -> Optional[Dict]: + """Update an existing item.""" + try: + return self._run_command( + ['edit', 'item', item_id], + input_data=json.dumps(item_data) + ) + except BitwardenCLIError: + return None + + def delete_item(self, item_id: str) -> bool: + """Delete an item from the vault.""" + try: + self._run_command(['delete', 'item', item_id]) + return True + except BitwardenCLIError: + return False + + def import_from_bitwarden(self) -> List[PasswordEntry]: + """Import passwords from Bitwarden to Cerberus format.""" + try: + items = self.get_items() + entries = [] + + for item in items: + try: + entry = PasswordEntry( + id=item.get('id'), + website=item.get('name', ''), + username=next( + (field['value'] for field in item.get('login', {}).get('uris', [{}]) + if field.get('name', '').lower() == 'username'), + '' + ), + password=item.get('login', {}).get('password', ''), + url=next( + (uri.get('uri', '') for uri in item.get('login', {}).get('uris', []) + if uri.get('uri')), + '' + ), + notes=item.get('notes', ''), + tags=item.get('collectionIds', []), + custom_fields={ + 'folderId': item.get('folderId'), + 'organizationId': item.get('organizationId'), + 'favorite': item.get('favorite', False), + 'reprompt': item.get('reprompt', 0), + 'revisionDate': item.get('revisionDate') + } + ) + entries.append(entry) + except Exception as e: + logger.error(f"Error converting Bitwarden item to PasswordEntry: {e}") + + return entries + + except Exception as e: + logger.error(f"Error importing from Bitwarden: {e}") + return [] + + def export_to_bitwarden(self, entries: List[PasswordEntry], folder_id: str = None) -> List[str]: + """Export passwords from Cerberus format to Bitwarden. + + Args: + entries: List of PasswordEntry objects to export + folder_id: Optional Bitwarden folder ID to place items in + + Returns: + List of Bitwarden item IDs that were created + """ + created_ids = [] + + for entry in entries: + try: + item_data = { + 'type': 1, # Login type + 'name': entry.website, + 'notes': entry.notes, + 'favorite': entry.custom_fields.get('favorite', False), + 'login': { + 'username': entry.username, + 'password': entry.password, + 'uris': [ + { + 'match': None, + 'uri': entry.url + } + ] if entry.url else [] + }, + 'collectionIds': entry.tags, + 'folderId': folder_id or entry.custom_fields.get('folderId') + } + + result = self.create_item(item_data) + if result and 'id' in result: + created_ids.append(result['id']) + + except Exception as e: + logger.error(f"Error exporting entry to Bitwarden: {e}") + + return created_ids diff --git a/src/cerberus/integrations/chrome.py b/src/cerberus/integrations/chrome.py new file mode 100644 index 0000000..ddc6acc --- /dev/null +++ b/src/cerberus/integrations/chrome.py @@ -0,0 +1,135 @@ +"""Chrome/Chromium password export integration for Cerberus.""" + +import csv +import json +import sqlite3 +import shutil +import tempfile +from pathlib import Path +from typing import List, Optional, Dict, Any + +from ..core.models import PasswordEntry +from . import BaseIntegration, register_integration, IntegrationError + +class ChromeIntegration(BaseIntegration): + """Integration with Chrome/Chromium password exports.""" + + def __init__(self, export_path: Optional[Path] = None): + """Initialize the Chrome integration. + + Args: + export_path: Path to Chrome passwords CSV export + """ + super().__init__() + self.export_path = export_path + + def connect(self, export_path: Optional[Path] = None, **kwargs) -> bool: + """Load the Chrome passwords export file. + + Args: + export_path: Path to the Chrome passwords CSV export + + Returns: + bool: True if the export file exists and is accessible + """ + if export_path: + self.export_path = Path(export_path) + + if not self.export_path or not self.export_path.exists(): + raise IntegrationError("Chrome passwords export file not found") + + self.connected = True + return True + + def list_entries(self) -> List[PasswordEntry]: + """List all password entries from the Chrome export. + + Returns: + List of PasswordEntry objects + + Raises: + IntegrationError: If not connected or error reading the export file + """ + if not self.connected: + raise IntegrationError("Not connected to Chrome passwords") + + entries: List[PasswordEntry] = [] + + try: + with open(self.export_path, 'r', encoding='utf-8') as f: + # Chrome CSV format: name,url,username,password + reader = csv.reader(f) + + # Skip header if it exists + header = next(reader, None) + if not header or len(header) < 4: + # Try without skipping header + f.seek(0) + + for row in reader: + if len(row) < 4: # Ensure we have enough columns + continue + + name, url, username, password = row[:4] + + # Create a PasswordEntry + entry = PasswordEntry( + website=name or url or "Unknown", + username=username, + password=password, + url=url + ) + + entries.append(entry) + + except Exception as e: + raise IntegrationError(f"Error reading Chrome passwords export: {e}") + + return entries + + def import_entries(self, input_path: Optional[Path] = None) -> List[PasswordEntry]: + """Import entries from a Chrome passwords export. + + Args: + input_path: Path to the Chrome passwords export file + + Returns: + List of imported PasswordEntry objects + """ + if input_path: + self.export_path = Path(input_path) + + if not self.export_path or not self.export_path.exists(): + raise IntegrationError("Chrome passwords export file not found") + + return self.list_entries() + + def export_entries(self, output_path: Path) -> bool: + """Export entries to a Chrome-compatible CSV file. + + Args: + output_path: Path to save the exported data + + Returns: + bool: True if export was successful + """ + # This would require converting from our format to Chrome's format + # For now, we'll just raise a NotImplementedError + raise NotImplementedError("Export to Chrome format is not yet implemented") + + @classmethod + def export_help(cls) -> str: + """Get instructions for exporting from Chrome. + + Returns: + str: Instructions for exporting from Chrome + """ + return """To export passwords from Chrome: + 1. Open Chrome and go to: chrome://settings/passwords + 2. Click the three dots menu next to 'Saved Passwords' + 3. Select 'Export passwords...' + 4. Follow the prompts to save the passwords to a CSV file + """ + +# Register the integration with the name 'chrome' +register_integration("chrome")(ChromeIntegration) diff --git a/src/cerberus/integrations/keepass.py b/src/cerberus/integrations/keepass.py new file mode 100644 index 0000000..f445826 --- /dev/null +++ b/src/cerberus/integrations/keepass.py @@ -0,0 +1,208 @@ +"""KeePass integration for Cerberus.""" + +import xml.etree.ElementTree as ET +from pathlib import Path +from typing import List, Optional, Dict, Any + +from ..core.models import PasswordEntry +from . import BaseIntegration, register_integration, IntegrationError + +try: + import pykeepass + KEEPASS_AVAILABLE = True +except ImportError: + KEEPASS_AVAILABLE = False + +@register_integration("keepass") +class KeePassIntegration(BaseIntegration): + """Integration with KeePass password manager.""" + + def __init__(self, database_path: Optional[Path] = None, keyfile: Optional[Path] = None): + """Initialize the KeePass integration. + + Args: + database_path: Path to the KeePass database file (.kdbx) + keyfile: Path to the keyfile (if used) + """ + super().__init__() + if not KEEPASS_AVAILABLE: + raise IntegrationError( + "pykeepass package is required for KeePass integration. " + "Install with: pip install pykeepass" + ) + + self.database_path = database_path + self.keyfile = keyfile + self.kp = None + + def connect(self, password: str, database_path: Optional[Path] = None, + keyfile: Optional[Path] = None, **kwargs) -> bool: + """Connect to a KeePass database. + + Args: + password: Database password + database_path: Path to the KeePass database file + keyfile: Path to the keyfile (if used) + + Returns: + bool: True if connection was successful + """ + if database_path: + self.database_path = Path(database_path) + if keyfile: + self.keyfile = Path(keyfile) + + if not self.database_path or not self.database_path.exists(): + raise IntegrationError("KeePass database file not found") + + try: + self.kp = pykeepass.PyKeePass( + self.database_path, + password=password, + keyfile=str(self.keyfile) if self.keyfile and self.keyfile.exists() else None + ) + self.connected = True + return True + except Exception as e: + raise IntegrationError(f"Failed to open KeePass database: {e}") + + def disconnect(self): + """Close the KeePass database.""" + self.kp = None + self.connected = False + + def list_entries(self) -> List[PasswordEntry]: + """List all password entries from the KeePass database. + + Returns: + List of PasswordEntry objects + + Raises: + IntegrationError: If not connected or error reading the database + """ + if not self.connected or not self.kp: + raise IntegrationError("Not connected to KeePass database") + + entries: List[PasswordEntry] = [] + + try: + for entry in self.kp.entries: + # Skip entries without URLs or usernames + if not (entry.url or entry.title) or not entry.username: + continue + + # Get entry notes and custom fields + notes = entry.notes or "" + custom_fields = {} + + # Add any custom fields + for key, value in entry.custom_properties.items(): + if key and value and key.lower() not in ['notes', 'password']: + custom_fields[key] = value + + # Create a PasswordEntry + entry_obj = PasswordEntry( + website=entry.url or entry.title, + username=entry.username, + password=entry.password, + notes=notes, + url=entry.url, + tags=[entry.group.name] if entry.group else [], + custom_fields=custom_fields if custom_fields else None + ) + + entries.append(entry_obj) + + except Exception as e: + raise IntegrationError(f"Error reading KeePass database: {e}") + + return entries + + def import_entries(self, input_path: Optional[Path] = None, password: str = None, + keyfile: Optional[Path] = None) -> List[PasswordEntry]: + """Import entries from a KeePass database. + + Args: + input_path: Path to the KeePass database file + password: Database password + keyfile: Path to the keyfile (if used) + + Returns: + List of imported PasswordEntry objects + """ + if input_path: + self.database_path = Path(input_path) + if keyfile: + self.keyfile = Path(keyfile) + + if not password: + raise IntegrationError("Password is required to open KeePass database") + + self.connect(password=password, database_path=self.database_path, keyfile=self.keyfile) + return self.list_entries() + + def export_entries(self, output_path: Path, entries: List[PasswordEntry] = None) -> bool: + """Export entries to a new KeePass database. + + Args: + output_path: Path to save the new KeePass database + entries: List of PasswordEntry objects to export + + Returns: + bool: True if export was successful + """ + if not self.connected or not self.kp: + raise IntegrationError("Not connected to KeePass database") + + try: + # Create a new KeePass database + new_kp = pykeepass.create_database(str(output_path)) + + # Add a group for the imported entries + imported_group = new_kp.add_group(new_kp.root_group, 'Imported') + + # Add each entry to the database + for entry in (entries or self.list_entries()): + # Skip entries without URLs or usernames + if not (entry.website or entry.url) or not entry.username: + continue + + # Add the entry to the database + new_entry = new_kp.add_entry( + imported_group, + title=entry.website or entry.url, + username=entry.username, + password=entry.password, + url=entry.url or entry.website, + notes=entry.notes, + tags=','.join(entry.tags) if entry.tags else None + ) + + # Add custom fields + if entry.custom_fields: + for key, value in entry.custom_fields.items(): + if key and value and key.lower() not in ['notes', 'password']: + new_entry.set_custom_property(key, str(value)) + + # Save the new database + new_kp.save() + return True + + except Exception as e: + raise IntegrationError(f"Error exporting to KeePass database: {e}") + + @classmethod + def export_help(cls) -> str: + """Get instructions for exporting from KeePass. + + Returns: + str: Instructions for exporting from KeePass + """ + return """To export from KeePass: + 1. Open your KeePass database + 2. Go to 'File' > 'Export' + 3. Choose a format (e.g., XML) + 4. Save the exported file + + Note: For better security, use the direct database import method. + """ diff --git a/src/cerberus/integrations/lastpass.py b/src/cerberus/integrations/lastpass.py new file mode 100644 index 0000000..9c587d2 --- /dev/null +++ b/src/cerberus/integrations/lastpass.py @@ -0,0 +1,126 @@ +"""LastPass integration for Cerberus.""" + +import csv +from pathlib import Path +from typing import List, Optional, Dict, Any + +from ..core.models import PasswordEntry +from . import BaseIntegration, register_integration, IntegrationError + +@register_integration("lastpass") +class LastPassIntegration(BaseIntegration): + """Integration with LastPass password manager.""" + + def __init__(self, export_path: Optional[Path] = None): + """Initialize the LastPass integration. + + Args: + export_path: Path to LastPass export file + """ + super().__init__() + self.export_path = export_path + + def connect(self, export_path: Optional[Path] = None, **kwargs) -> bool: + """Connect to LastPass (loads the export file). + + Args: + export_path: Path to LastPass export file + + Returns: + bool: True if the export file exists and is accessible + """ + if export_path: + self.export_path = Path(export_path) + + if not self.export_path or not self.export_path.exists(): + raise IntegrationError("LastPass export file not found") + + self.connected = True + return True + + def list_entries(self) -> List[PasswordEntry]: + """List all password entries from the LastPass export. + + Returns: + List of PasswordEntry objects + + Raises: + IntegrationError: If not connected or error reading the export file + """ + if not self.connected: + raise IntegrationError("Not connected to LastPass") + + entries: List[PasswordEntry] = [] + + try: + with open(self.export_path, 'r', encoding='utf-8') as f: + # Skip the first line (header) + next(f) + + reader = csv.reader(f) + for row in reader: + if len(row) < 7: # Ensure we have enough columns + continue + + url, username, password, extra, name, grouping, fav = row[:7] + + # Create a PasswordEntry + entry = PasswordEntry( + website=url or name or "Unknown", + username=username, + password=password, + notes=extra, + tags=[grouping] if grouping else [] + ) + + entries.append(entry) + + except Exception as e: + raise IntegrationError(f"Error reading LastPass export: {e}") + + return entries + + def import_entries(self, input_path: Optional[Path] = None) -> List[PasswordEntry]: + """Import entries from a LastPass export file. + + Args: + input_path: Path to the LastPass export file + + Returns: + List of imported PasswordEntry objects + """ + if input_path: + self.export_path = Path(input_path) + + if not self.export_path or not self.export_path.exists(): + raise IntegrationError("LastPass export file not found") + + return self.list_entries() + + def export_entries(self, output_path: Path) -> bool: + """Export entries to a LastPass-compatible CSV file. + + Args: + output_path: Path to save the exported data + + Returns: + bool: True if export was successful + """ + # This would require converting from our format to LastPass format + # For now, we'll just raise a NotImplementedError + raise NotImplementedError("Export to LastPass format is not yet implemented") + + @classmethod + def export_help(cls) -> str: + """Get instructions for exporting from LastPass. + + Returns: + str: Instructions for exporting from LastPass + """ + return """To export from LastPass: + 1. Log in to your LastPass account + 2. Click on your email in the bottom left + 3. Select 'Advanced' > 'Export' + 4. Enter your master password + 5. Save the exported file + """ diff --git a/src/cerberus/native/host.py b/src/cerberus/native/host.py new file mode 100644 index 0000000..75459b1 --- /dev/null +++ b/src/cerberus/native/host.py @@ -0,0 +1,138 @@ +""" +Native Messaging host for Cerberus Password Manager (development scaffold). + +WARNING: Development-only. For testing with the Firefox/Chrome extension. +Authentication: expects master password via CERB_MASTER environment variable. + +Protocol: newline-free JSON messages with 4-byte little-endian length prefix. +Messages: + {"type":"ping"} + {"type":"list_entries"} + {"type":"get_for_origin", "origin":"https://example.com", "include_password": false} + +Responses: + {"ok": true, "result": ...} or {"ok": false, "error": "..."} +""" +from __future__ import annotations + +import json +import os +import struct +import sys +from typing import Any, Dict, List +from urllib.parse import urlparse + +from cerberus.core.password_manager import PasswordManager, VaultError + + +def _read_msg() -> Dict[str, Any] | None: + raw_len = sys.stdin.buffer.read(4) + if not raw_len: + return None + msg_len = struct.unpack("<I", raw_len)[0] + data = sys.stdin.buffer.read(msg_len) + if not data: + return None + return json.loads(data.decode("utf-8")) + + +essential_fields = ["id", "website", "username", "url", "notes", "tags", "updated_at", "last_used"] + + +def _write_msg(obj: Dict[str, Any]) -> None: + data = json.dumps(obj, default=str, separators=(",", ":")).encode("utf-8") + sys.stdout.buffer.write(struct.pack("<I", len(data))) + sys.stdout.buffer.write(data) + sys.stdout.buffer.flush() + + +def _origin_host(origin: str) -> str: + try: + u = urlparse(origin) + return u.hostname or origin + except Exception: + return origin + + +def main() -> None: + master = os.environ.get("CERB_MASTER") + data_dir = os.environ.get("CERB_DATA_DIR") + pm: PasswordManager | None = None + + if master: + try: + pm = PasswordManager(data_dir=data_dir, master_password=master) + except VaultError as e: + _write_msg({"ok": False, "error": f"unlock_failed:{e}"}) + return + else: + # Defer unlock until first request arrives + pm = None + + allow_remote_unlock = os.environ.get("CERB_ALLOW_REMOTE_UNLOCK") == "1" + + while True: + msg = _read_msg() + if msg is None: + break + try: + mtype = msg.get("type") + if mtype == "ping": + _write_msg({"ok": True, "result": "pong"}) + continue + if mtype == "unlock": + if not allow_remote_unlock: + _write_msg({"ok": False, "error": "remote_unlock_disabled"}) + else: + master = msg.get("master") + if not master: + _write_msg({"ok": False, "error": "missing_master"}) + continue + try: + pm = PasswordManager(data_dir=data_dir, master_password=master) + _write_msg({"ok": True, "result": "unlocked"}) + except VaultError as e: + _write_msg({"ok": False, "error": f"unlock_failed:{e}"}) + continue + if pm is None: + _write_msg({"ok": False, "error": "locked"}) + continue + if mtype == "list_entries": + entries = pm.get_entries() + slim = [ + { + "id": e.id, + "website": e.website, + "username": e.username, + "url": e.url, + } + for e in entries + ] + _write_msg({"ok": True, "result": slim}) + continue + if mtype == "get_for_origin": + origin = msg.get("origin") or "" + include_password = bool(msg.get("include_password", False)) + host = _origin_host(origin).lower() + matches = [] + for e in pm.get_entries(): + target = (e.url or e.website or "").lower() + if host and host in target: + item = { + "id": e.id, + "website": e.website, + "username": e.username, + "url": e.url, + } + if include_password: + item["password"] = e.password + matches.append(item) + _write_msg({"ok": True, "result": matches}) + continue + _write_msg({"ok": False, "error": f"unknown_type:{mtype}"}) + except Exception as e: + _write_msg({"ok": False, "error": f"exception:{e}"}) + + +if __name__ == "__main__": + main() diff --git a/src/cerberus/native/manifests/chrome_com.cerberus.pm.json b/src/cerberus/native/manifests/chrome_com.cerberus.pm.json new file mode 100644 index 0000000..7eec080 --- /dev/null +++ b/src/cerberus/native/manifests/chrome_com.cerberus.pm.json @@ -0,0 +1,9 @@ +{ + "name": "com.cerberus.pm", + "description": "Cerberus Password Manager Native Messaging Host (dev)", + "path": "/usr/local/bin/cerberus-native-host", + "type": "stdio", + "allowed_origins": [ + "chrome-extension://REPLACE_WITH_EXTENSION_ID/" + ] +} diff --git a/src/cerberus/native/manifests/firefox_com.cerberus.pm.json b/src/cerberus/native/manifests/firefox_com.cerberus.pm.json new file mode 100644 index 0000000..fda6d6e --- /dev/null +++ b/src/cerberus/native/manifests/firefox_com.cerberus.pm.json @@ -0,0 +1,9 @@ +{ + "name": "com.cerberus.pm", + "description": "Cerberus Password Manager Native Messaging Host (dev)", + "path": "/usr/local/bin/cerberus-native-host", + "type": "stdio", + "allowed_extensions": [ + "cerberus@example.com" + ] +} diff --git a/src/cerberus/tui/__init__.py b/src/cerberus/tui/__init__.py new file mode 100644 index 0000000..75461d6 --- /dev/null +++ b/src/cerberus/tui/__init__.py @@ -0,0 +1,16 @@ +""" +Terminal User Interface (TUI) for Cerberus Password Manager. + +This module provides a rich, interactive terminal interface for managing passwords. +""" + +__all__ = ["main"] + +def main(): + """Launch the Cerberus TUI.""" + from .app import CerberusTUI + app = CerberusTUI() + app.run() + +if __name__ == "__main__": + main() diff --git a/src/cerberus/tui/__pycache__/__init__.cpython-313.pyc b/src/cerberus/tui/__pycache__/__init__.cpython-313.pyc Binary files differnew file mode 100644 index 0000000..bb0f474 --- /dev/null +++ b/src/cerberus/tui/__pycache__/__init__.cpython-313.pyc diff --git a/src/cerberus/tui/app.py b/src/cerberus/tui/app.py new file mode 100644 index 0000000..6e3d513 --- /dev/null +++ b/src/cerberus/tui/app.py @@ -0,0 +1,243 @@ +"" +Main TUI application for Cerberus Password Manager. +""" +from typing import Optional, List, Dict, Any +from pathlib import Path +import logging + +from rich.console import Console +from rich.panel import Panel +from rich.table import Table +from rich.prompt import Prompt, Confirm +from rich.progress import Progress, SpinnerColumn, TextColumn +from rich.traceback import install as install_rich_traceback +from textual.app import App, ComposeResult +from textual.containers import Container, VerticalScroll +from textual.widgets import ( + Header, Footer, Button, Static, Input, Label, + DataTable, Select, Switch, LoadingIndicator +) + +from ..core.password_manager import PasswordManager, VaultError +from ..core.models import PasswordEntry + +# Set up rich traceback for better error messages +install_rich_traceback(show_locals=True) +logger = logging.getLogger(__name__) + +class CerberusTUI(App): + """Main TUI application for Cerberus Password Manager.""" + + CSS = """ + Screen { + layout: vertical; + } + + #login-screen { + width: 100%; + height: 100%; + align: center middle; + } + + #main-screen { + width: 100%; + height: 100%; + layout: horizontal; + } + + #sidebar { + width: 30%; + height: 100%; + border: solid $accent; + } + + #content { + width: 70%; + height: 100%; + border: solid $accent; + } + + .entry-list { + width: 100%; + height: 100%; + } + + .entry-detail { + width: 100%; + height: 100%; + padding: 1; + } + """ + + BINDINGS = [ + ("q", "quit", "Quit"), + ("n", "new_password", "New Password"), + ("f", "find", "Find Password"), + ("r", "refresh", "Refresh"), + ("c", "copy_password", "Copy Password"), + ("u", "copy_username", "Copy Username"), + ("d", "delete", "Delete Entry"), + ] + + def __init__(self, data_dir: Optional[str] = None, **kwargs): + super().__init__(**kwargs) + self.console = Console() + self.pm: Optional[PasswordManager] = None + self.data_dir = data_dir + self.current_entry: Optional[PasswordEntry] = None + self.entries: List[PasswordEntry] = [] + + def compose(self) -> ComposeResult: + """Create child widgets for the app.""" + if not self.pm: + yield Container( + VerticalScroll( + Label("🔒 Cerberus Password Manager", id="login-title"), + Input(placeholder="Master Password", password=True, id="master-password"), + Button("Unlock", variant="primary", id="unlock-button"), + id="login-screen" + ) + ) + else: + with Container(id="main-screen"): + with Container(id="sidebar"): + yield DataTable(id="entry-list", cursor_type="row") + with Container(id="content"): + yield Static("Select an entry to view details", id="entry-detail") + + def on_mount(self) -> None: + """Initialize the UI after mounting.""" + if not self.pm: + self.query_one("#master-password", Input).focus() + else: + self.load_entries() + + def on_button_pressed(self, event: Button.Pressed) -> None: + """Handle button press events.""" + if event.button.id == "unlock-button": + self.unlock_vault() + + def on_input_submitted(self, event: Input.Submitted) -> None: + """Handle input submission.""" + if event.input.id == "master-password": + self.unlock_vault() + + def unlock_vault(self) -> None: + """Attempt to unlock the password vault.""" + password_input = self.query_one("#master-password", Input) + password = password_input.value + + if not password: + self.notify("Please enter a master password", severity="error") + return + + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + transient=True, + console=self.console + ) as progress: + task = progress.add_task("Unlocking vault...", total=None) + self.pm = PasswordManager(data_dir=self.data_dir, master_password=password) + progress.update(task, completed=1) + + # Clear the login screen and show the main interface + self.query("#login-screen").remove() + self.compose() + self.load_entries() + + except VaultError as e: + self.notify(f"Failed to unlock vault: {e}", severity="error") + except Exception as e: + logger.exception("Error unlocking vault") + self.notify(f"An error occurred: {e}", severity="error") + + def load_entries(self) -> None: + """Load password entries into the UI.""" + if not self.pm: + return + + try: + table = self.query_one("#entry-list", DataTable) + table.clear() + table.add_columns("Website", "Username", "Last Used") + + self.entries = self.pm.get_entries() + for entry in self.entries: + table.add_row( + entry.website, + entry.username, + entry.last_used.strftime("%Y-%m-%d") if entry.last_used else "Never" + ) + + except Exception as e: + logger.exception("Error loading entries") + self.notify(f"Failed to load entries: {e}", severity="error") + + def action_new_password(self) -> None: + """Create a new password entry.""" + self.notify("New password functionality coming soon!", severity="information") + + def action_find(self) -> None: + """Find a password entry.""" + self.notify("Find functionality coming soon!", severity="information") + + def action_refresh(self) -> None: + """Refresh the entry list.""" + self.load_entries() + self.notify("Entries refreshed", severity="information") + + def action_copy_password(self) -> None: + """Copy the current entry's password to clipboard.""" + if not self.current_entry: + self.notify("No entry selected", severity="warning") + return + + try: + # Use platform-specific clipboard handling + import pyperclip + pyperclip.copy(self.current_entry.password) + self.notify("Password copied to clipboard", severity="information") + except Exception as e: + logger.exception("Error copying to clipboard") + self.notify(f"Failed to copy to clipboard: {e}", severity="error") + + def action_copy_username(self) -> None: + """Copy the current entry's username to clipboard.""" + if not self.current_entry: + self.notify("No entry selected", severity="warning") + return + + try: + import pyperclip + pyperclip.copy(self.current_entry.username) + self.notify("Username copied to clipboard", severity="information") + except Exception as e: + logger.exception("Error copying username") + self.notify(f"Failed to copy username: {e}", severity="error") + + def action_delete(self) -> None: + """Delete the current entry.""" + if not self.current_entry: + self.notify("No entry selected", severity="warning") + return + + if Confirm.ask(f"Delete entry for {self.current_entry.website}?"): + try: + self.pm.delete_entry(self.current_entry.id) + self.load_entries() + self.notify("Entry deleted", severity="information") + self.current_entry = None + self.query_one("#entry-detail", Static).update("Select an entry to view details") + except Exception as e: + logger.exception("Error deleting entry") + self.notify(f"Failed to delete entry: {e}", severity="error") + +def main(): + """Run the Cerberus TUI.""" + app = CerberusTUI() + app.run() + +if __name__ == "__main__": + main() diff --git a/webext/chrome/background.js b/webext/chrome/background.js new file mode 100644 index 0000000..faa201f --- /dev/null +++ b/webext/chrome/background.js @@ -0,0 +1,83 @@ +// Background service worker for Cerberus Chrome/Edge extension (MV3) + +function connectNative() { + try { + return chrome.runtime.connectNative('com.cerberus.pm'); + } catch (e) { + console.error('Native host connection failed', e); + return null; + } +} + +function nativeRequest(payload) { + return new Promise((resolve, reject) => { + const port = connectNative(); + if (!port) { + reject(new Error('no_native_host')); + return; + } + const onMessage = (resp) => { + port.onMessage.removeListener(onMessage); + try { port.disconnect(); } catch {} + resolve(resp); + }; + const onDisconnect = () => { + port.onMessage.removeListener(onMessage); + reject(new Error('disconnected')); + }; + port.onMessage.addListener(onMessage); + port.onDisconnect.addListener(onDisconnect); + port.postMessage(payload); + }); +} + +async function getActiveTab() { + const tabs = await chrome.tabs.query({ active: true, currentWindow: true }); + return tabs[0]; +} + +async function getOriginForActiveTab() { + const tab = await getActiveTab(); + try { + const url = new URL(tab.url); + return url.origin; + } catch (e) { + return tab.url; + } +} + +chrome.runtime.onMessage.addListener((message, sender, sendResponse) => { + (async () => { + if (!message || !message.type) return; + if (message.type === 'GET_PAGE_FORMS') { + const tab = await getActiveTab(); + chrome.tabs.sendMessage(tab.id, { type: 'SCAN_FORMS' }, sendResponse); + return true; + } + if (message.type === 'FILL_CREDENTIALS') { + const tab = await getActiveTab(); + chrome.tabs.sendMessage(tab.id, { type: 'FILL_CREDENTIALS', payload: message.payload }, sendResponse); + return true; + } + if (message.type === 'GET_CREDENTIALS_FOR_TAB') { + const origin = await getOriginForActiveTab(); + try { + const resp = await nativeRequest({ type: 'get_for_origin', origin, include_password: true }); + sendResponse(resp); + } catch (e) { + sendResponse({ ok: false, error: String(e) }); + } + return true; + } + if (message.type === 'PING_NATIVE') { + try { + const resp = await nativeRequest({ type: 'ping' }); + sendResponse(resp); + } catch (e) { + sendResponse({ ok: false, error: String(e) }); + } + return true; + } + })(); + return true; // keep channel open for async +}); diff --git a/webext/chrome/manifest.json b/webext/chrome/manifest.json new file mode 100644 index 0000000..977b83c --- /dev/null +++ b/webext/chrome/manifest.json @@ -0,0 +1,30 @@ +{ + "manifest_version": 3, + "name": "Cerberus Password Manager", + "version": "0.1.0", + "description": "Auto-fill and manage passwords with Cerberus.", + "permissions": [ + "activeTab", + "storage", + "tabs", + "scripting", + "nativeMessaging" + ], + "host_permissions": [ + "<all_urls>" + ], + "action": { + "default_title": "Cerberus", + "default_popup": "popup.html" + }, + "background": { + "service_worker": "background.js" + }, + "content_scripts": [ + { + "matches": ["<all_urls>"], + "js": ["content.js"], + "run_at": "document_end" + } + ] +} diff --git a/webext/chrome/popup.html b/webext/chrome/popup.html new file mode 100644 index 0000000..b3482ed --- /dev/null +++ b/webext/chrome/popup.html @@ -0,0 +1,56 @@ +<!DOCTYPE html> +<html> +<head> + <meta charset="utf-8" /> + <title>Cerberus</title> + <style> + body { font-family: sans-serif; width: 280px; margin: 10px; } + button { width: 100%; padding: 8px; margin: 6px 0; } + input { width: 100%; padding: 6px; margin: 6px 0; } + </style> +</head> +<body> + <h3>Cerberus</h3> + <input id="username" placeholder="Username" /> + <input id="password" placeholder="Password" type="password" /> + <button id="fetch">Fetch from Vault</button> + <button id="fill">Fill on Page</button> + <div id="status" style="font-size: 12px; color: #666; margin-top: 6px;"></div> + <script> + async function getActiveTab() { + const tabs = await chrome.tabs.query({ active: true, currentWindow: true }); + return tabs[0]; + } + function sendToBackground(msg) { + return new Promise((resolve) => { + chrome.runtime.sendMessage(msg, resolve); + }); + } + async function fill(username, password) { + await sendToBackground({ type: 'FILL_CREDENTIALS', payload: { username, password } }); + } + document.getElementById('fill').addEventListener('click', async () => { + const username = document.getElementById('username').value; + const password = document.getElementById('password').value; + try { + await fill(username, password); + window.close(); + } catch (e) { console.error(e); } + }); + document.getElementById('fetch').addEventListener('click', async () => { + const status = document.getElementById('status'); + status.textContent = 'Fetching credentials from vault...'; + try { + const resp = await sendToBackground({ type: 'GET_CREDENTIALS_FOR_TAB' }); + if (!resp || !resp.ok) { status.textContent = 'Failed to fetch (native host?)'; return; } + const results = resp.result || []; + if (results.length === 0) { status.textContent = 'No matching entries'; return; } + const { username, password } = results[0]; + if (username) document.getElementById('username').value = username; + if (password) document.getElementById('password').value = password; + status.textContent = 'Fetched from vault'; + } catch (e) { console.error(e); status.textContent = 'Error'; } + }); + </script> +</body> +</html> diff --git a/webext/firefox/background.js b/webext/firefox/background.js new file mode 100644 index 0000000..d05cd37 --- /dev/null +++ b/webext/firefox/background.js @@ -0,0 +1,69 @@ +// Basic background script for Cerberus Firefox extension + +browser.runtime.onInstalled.addListener(() => { + console.log("Cerberus extension installed"); +}); + +function connectNative() { + try { + return browser.runtime.connectNative('com.cerberus.pm'); + } catch (e) { + console.error('Native host connection failed', e); + return null; + } +} + +function nativeRequest(payload) { + return new Promise((resolve, reject) => { + const port = connectNative(); + if (!port) { + reject(new Error('no_native_host')); + return; + } + const onMessage = (resp) => { + port.onMessage.removeListener(onMessage); + port.disconnect(); + resolve(resp); + }; + const onDisconnect = () => { + port.onMessage.removeListener(onMessage); + reject(new Error('disconnected')); + }; + port.onMessage.addListener(onMessage); + port.onDisconnect.addListener(onDisconnect); + port.postMessage(payload); + }); +} + +async function getOriginForTab(tabId) { + const tab = await browser.tabs.get(tabId); + try { + const url = new URL(tab.url); + return url.origin; + } catch (e) { + return tab.url; + } +} + +// Message router between popup/content and native host +browser.runtime.onMessage.addListener(async (message, sender) => { + if (!message || !message.type) return; + if (message.type === 'GET_PAGE_FORMS') { + return browser.tabs.sendMessage(sender.tab.id, { type: 'SCAN_FORMS' }); + } + if (message.type === 'FILL_CREDENTIALS') { + return browser.tabs.sendMessage(sender.tab.id, { + type: 'FILL_CREDENTIALS', + payload: message.payload, + }); + } + if (message.type === 'GET_CREDENTIALS_FOR_TAB') { + const origin = await getOriginForTab(sender.tab.id); + const resp = await nativeRequest({ type: 'get_for_origin', origin, include_password: true }).catch(err => ({ ok: false, error: String(err) })); + return resp; + } + if (message.type === 'PING_NATIVE') { + const resp = await nativeRequest({ type: 'ping' }).catch(err => ({ ok: false, error: String(err) })); + return resp; + } +}); diff --git a/webext/firefox/content.js b/webext/firefox/content.js new file mode 100644 index 0000000..9db4005 --- /dev/null +++ b/webext/firefox/content.js @@ -0,0 +1,57 @@ +// Content script: detects login/change-password forms and can fill them + +function findLoginForms() { + const forms = Array.from(document.querySelectorAll('form')); + const results = []; + for (const f of forms) { + const inputs = Array.from(f.querySelectorAll('input')); + const hasPassword = inputs.some(i => (i.type || '').toLowerCase() === 'password'); + const username = inputs.find(i => ['text','email','tel','username'].includes((i.type || '').toLowerCase()) || /user|email|login/i.test(i.name || i.id || '')); + const password = inputs.find(i => (i.type || '').toLowerCase() === 'password'); + if (hasPassword && (username || password)) { + results.push({ + action: f.getAttribute('action') || location.href, + usernameName: username && (username.name || username.id) || null, + passwordName: password && (password.name || password.id) || null, + }); + } + } + return results; +} + +function fillCredentials(payload) { + const { username, password } = payload || {}; + if (!username && !password) return false; + // Try to fill the first reasonable form + const forms = Array.from(document.querySelectorAll('form')); + for (const f of forms) { + const inputs = Array.from(f.querySelectorAll('input')); + const u = inputs.find(i => ['text','email','tel','username'].includes((i.type || '').toLowerCase()) || /user|email|login/i.test(i.name || i.id || '')); + const p = inputs.find(i => (i.type || '').toLowerCase() === 'password'); + if (u || p) { + if (u && username) { + u.focus(); + u.value = username; + u.dispatchEvent(new Event('input', { bubbles: true })); + } + if (p && password) { + p.focus(); + p.value = password; + p.dispatchEvent(new Event('input', { bubbles: true })); + } + return true; + } + } + return false; +} + +browser.runtime.onMessage.addListener((message) => { + if (!message || !message.type) return; + if (message.type === 'SCAN_FORMS') { + return Promise.resolve({ forms: findLoginForms() }); + } + if (message.type === 'FILL_CREDENTIALS') { + const ok = fillCredentials(message.payload || {}); + return Promise.resolve({ ok }); + } +}); diff --git a/webext/firefox/manifest.json b/webext/firefox/manifest.json new file mode 100644 index 0000000..12b2e7a --- /dev/null +++ b/webext/firefox/manifest.json @@ -0,0 +1,35 @@ +{ + "manifest_version": 2, + "name": "Cerberus Password Manager", + "version": "0.1.0", + "description": "Auto-fill and manage passwords with Cerberus.", + "permissions": [ + "activeTab", + "storage", + "contextMenus", + "tabs", + "nativeMessaging", + "<all_urls>" + ], + "background": { + "scripts": ["background.js"], + "persistent": false + }, + "content_scripts": [ + { + "matches": ["<all_urls>"], + "js": ["content.js"], + "run_at": "document_end" + } + ], + "browser_action": { + "default_title": "Cerberus", + "default_popup": "popup.html" + } + , + "browser_specific_settings": { + "gecko": { + "id": "cerberus@example.com" + } + } +} diff --git a/webext/firefox/popup.html b/webext/firefox/popup.html new file mode 100644 index 0000000..0c51b06 --- /dev/null +++ b/webext/firefox/popup.html @@ -0,0 +1,65 @@ +<!DOCTYPE html> +<html> +<head> + <meta charset="utf-8" /> + <title>Cerberus</title> + <style> + body { font-family: sans-serif; width: 280px; margin: 10px; } + button { width: 100%; padding: 8px; margin: 6px 0; } + input { width: 100%; padding: 6px; margin: 6px 0; } + </style> +</head> +<body> + <h3>Cerberus</h3> + <input id="username" placeholder="Username" /> + <input id="password" placeholder="Password" type="password" /> + <button id="fetch">Fetch from Vault</button> + <button id="fill">Fill on Page</button> + <div id="status" style="font-size: 12px; color: #666; margin-top: 6px;"></div> + <script> + async function getActiveTab() { + const tabs = await browser.tabs.query({ active: true, currentWindow: true }); + return tabs[0]; + } + + async function fill(username, password) { + const tab = await getActiveTab(); + await browser.tabs.sendMessage(tab.id, { type: 'FILL_CREDENTIALS', payload: { username, password } }); + } + + document.getElementById('fill').addEventListener('click', async () => { + const username = document.getElementById('username').value; + const password = document.getElementById('password').value; + try { + await fill(username, password); + window.close(); + } catch (e) { console.error(e); } + }); + + document.getElementById('fetch').addEventListener('click', async () => { + const status = document.getElementById('status'); + status.textContent = 'Fetching credentials from vault...'; + try { + const resp = await browser.runtime.sendMessage({ type: 'GET_CREDENTIALS_FOR_TAB' }); + if (!resp || !resp.ok) { + status.textContent = 'Failed to fetch (is native host installed and unlocked?)'; + return; + } + const results = resp.result || []; + if (results.length === 0) { + status.textContent = 'No entries matched for this origin'; + return; + } + // Choose the first match (later: add a dropdown) + const { username, password } = results[0]; + if (username) document.getElementById('username').value = username; + if (password) document.getElementById('password').value = password; + status.textContent = 'Fetched from vault'; + } catch (e) { + console.error(e); + status.textContent = 'Error fetching credentials'; + } + }); + </script> +</body> +</html> |
