Commit eb59c788 authored by Maciej Gol's avatar Maciej Gol

Add KeyFingerprint and AbstractKeyLoader

parent 545cc78f
......@@ -7,6 +7,7 @@ import click
from op_askpass import operations
from op_askpass.configuration import get_configuration_directory
from op_askpass.fingerprint_generator import SSHKeyGenFingerprintGenerator
from op_askpass.key_loader import SSHKeyLoader
from op_askpass.key_store import get_default_key_store
......@@ -44,7 +45,7 @@ def setup_op_client(op_domain: str, op_email: str, verify: bool, install_path: O
@click.option("--install-path", type=Path, help="Where the 1Password client was installed.")
def login(op_domain: str, install_path: Optional[Path]) -> None:
install_path = install_path or get_configuration_directory()
operations.login_to_op(install_path / "op", op_domain, key_store=get_default_key_store())
operations.login_to_op(install_path / "op", op_domain, key_store=get_default_key_store(), key_loader=SSHKeyLoader())
@main.command()
......
import abc
import hashlib
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass
class KeyFingerprint:
length: int
hash: str
comment: str
key_type: str
def __eq__(self, other: Any) -> bool:
if not isinstance(other, KeyFingerprint):
return NotImplemented
return self.length == other.length and self.hash == other.hash and self.key_type == other.key_type
def __hash__(self) -> int:
return hash(self.length) + hash(self.hash) + hash(self.key_type)
@classmethod
def from_str(cls, s: str) -> "KeyFingerprint":
length, hash, comment, key_type = s.split()
return KeyFingerprint(
length=int(length),
hash=hash,
comment=comment,
key_type=key_type.strip("(").strip(")"),
)
def to_str(self) -> str:
return f"{self.length} {self.hash} {self.comment} ({self.key_type})"
class AbstractFingerprintGenerator(abc.ABC):
@abc.abstractmethod
def for_path(self, path: Path) -> str:
def for_path(self, path: Path) -> KeyFingerprint:
...
class MD5FingerprintGenerator(AbstractFingerprintGenerator):
def for_path(self, path: Path) -> str:
def for_path(self, path: Path) -> KeyFingerprint:
md5 = hashlib.md5()
md5.update(path.read_bytes())
return str(md5.hexdigest())
hex = str(md5.hexdigest())
return KeyFingerprint(
length=len(hex), hash=hex, comment="some-key", key_type="md5"
)
class SSHKeyGenFingerprintGenerator(AbstractFingerprintGenerator):
def __init__(self, executable_name: str = "ssh-keygen") -> None:
self.__executable_name = executable_name
def for_path(self, path: Path) -> str:
output = subprocess.check_output([self.__executable_name, "-l", "-f", path], encoding="utf-8")
return output.strip()
def for_path(self, path: Path) -> KeyFingerprint:
output = subprocess.check_output(
[self.__executable_name, "-l", "-f", path], encoding="utf-8"
)
return KeyFingerprint.from_str(output.strip())
......@@ -5,7 +5,7 @@ from pathlib import Path
import pytest
from op_askpass.fingerprint_generator import AbstractFingerprintGenerator, MD5FingerprintGenerator, \
SSHKeyGenFingerprintGenerator
SSHKeyGenFingerprintGenerator, KeyFingerprint
HERE = Path(__file__).parent
......@@ -20,7 +20,7 @@ class AbstractFingerprintGeneratorTests(abc.ABC):
def test_for_path_should_correctly_generate_fingerprint(self, test_key_file_path: Path) -> None:
generator = self.fingerprint_generator_factory()
assert isinstance(generator.for_path(test_key_file_path), str)
assert isinstance(generator.for_path(test_key_file_path), KeyFingerprint)
@abc.abstractmethod
def fingerprint_generator_factory(self) -> AbstractFingerprintGenerator:
......@@ -36,7 +36,7 @@ class TestMD5FingerprintGenerator(AbstractFingerprintGeneratorTests):
file_path = tmp_path / "yada.txt"
file_path.write_bytes(b"xxx")
assert generator.for_path(file_path) == 'f561aaf6ef0bf14d4208bb46a4ccb3ad'
assert generator.for_path(file_path) == KeyFingerprint(length=32, hash='f561aaf6ef0bf14d4208bb46a4ccb3ad', comment="some-key", key_type="md5")
class TestSSHKeyGenFingerprintGenerator(AbstractFingerprintGeneratorTests):
......@@ -50,4 +50,4 @@ class TestSSHKeyGenFingerprintGenerator(AbstractFingerprintGeneratorTests):
def test_for_path_should_generate_correct_checksum(self, test_key_file_path: Path) -> None:
generator = self.fingerprint_generator_factory()
assert generator.for_path(test_key_file_path) == '2048 SHA256:IKJs9nCZuYQ2tND/G2kCM1/+ggJsWMnWn+iP7y4FXDc test@key (RSA)'
assert generator.for_path(test_key_file_path) == KeyFingerprint.from_str('2048 SHA256:IKJs9nCZuYQ2tND/G2kCM1/+ggJsWMnWn+iP7y4FXDc test@key (RSA)')
import abc
import os
import subprocess
from pathlib import Path
from typing import List, Set
from op_askpass.fingerprint_generator import AbstractFingerprintGenerator, MD5FingerprintGenerator, \
SSHKeyGenFingerprintGenerator, KeyFingerprint
class AbstractKeyLoader(abc.ABC):
@abc.abstractmethod
def list_loaded_keys(self) -> List[KeyFingerprint]: ...
@abc.abstractmethod
def load_key(self, key_path: Path, op_domain: str, op_session_key: str, op_uid: str) -> None: ...
@abc.abstractmethod
def get_fingerprint_generator(self) -> AbstractFingerprintGenerator: ...
class MemoryKeyLoader(AbstractKeyLoader):
def get_fingerprint_generator(self) -> AbstractFingerprintGenerator:
return MD5FingerprintGenerator()
def __init__(self) -> None:
self.__keys: Set[KeyFingerprint] = set()
def load_key(self, key_path: Path, op_domain: str, op_session_key: str, op_uid: str) -> None:
self.__keys.add(self.get_fingerprint_generator().for_path(key_path))
def list_loaded_keys(self) -> List[KeyFingerprint]:
return list(self.__keys)
class SSHKeyLoader(AbstractKeyLoader):
def get_fingerprint_generator(self) -> AbstractFingerprintGenerator:
return SSHKeyGenFingerprintGenerator()
def list_loaded_keys(self) -> List[KeyFingerprint]:
output: str = subprocess.check_output(
["ssh-add", "-l"],
encoding="utf-8"
)
return [KeyFingerprint.from_str(s) for s in output.splitlines()]
def load_key(self, key_path: Path, op_domain: str, op_session_key: str, op_uid: str) -> None:
env = {**os.environ, f"OP_SESSION_{op_domain}": op_session_key}
subprocess.check_call(
["ssh-add", str(key_path)],
env={**env, "OP_ASKPASS_ITEM_NAME": op_uid, "SSH_ASKPASS": "op-askpass-get-password"},
stdin=subprocess.DEVNULL,
)
import abc
from pathlib import Path
from op_askpass.key_loader import AbstractKeyLoader, MemoryKeyLoader
class AbstractKeyLoaderTests(abc.ABC):
@abc.abstractmethod
def key_loader_factory(self) -> AbstractKeyLoader: ...
def test_list_loaded_keys_should_return_empty_list_when_empty(self) -> None:
loader = self.key_loader_factory()
actual = loader.list_loaded_keys()
assert actual == []
def test_load_key_should_correctly_load_given_key(self) -> None:
loader = self.key_loader_factory()
fingerprint_generator = loader.get_fingerprint_generator()
key_path = Path(__file__).parent / "test_data" / "test_key"
loader.load_key(key_path=key_path, op_domain="some-domain", op_uid="some-uid", op_session_key="some-key")
assert loader.list_loaded_keys() == [fingerprint_generator.for_path(key_path)]
class TestMemoryKeyLoader(AbstractKeyLoaderTests):
def key_loader_factory(self) -> AbstractKeyLoader:
return MemoryKeyLoader()
......@@ -5,6 +5,7 @@ from pathlib import Path
from typing import Dict, List, Tuple
from op_askpass.configuration import get_configuration_directory
from op_askpass.fingerprint_generator import KeyFingerprint
@dataclass
......@@ -15,69 +16,69 @@ class KeyEntry:
class AbstractKeyStore(abc.ABC):
@abc.abstractmethod
def add_fingerprint(self, fingerprint: str, onepass_uid: str, key_path: Path) -> None:
def add_fingerprint(self, fingerprint: KeyFingerprint, onepass_uid: str, key_path: Path) -> None:
...
@abc.abstractmethod
def delete_fingerprint(self, fingerprint: str) -> None:
def delete_fingerprint(self, fingerprint: KeyFingerprint) -> None:
...
@abc.abstractmethod
def get_key_entry(self, fingerprint: str) -> KeyEntry:
def get_key_entry(self, fingerprint: KeyFingerprint) -> KeyEntry:
...
@abc.abstractmethod
def items(self) -> List[Tuple[str, KeyEntry]]: ...
def items(self) -> List[Tuple[KeyFingerprint, KeyEntry]]: ...
class MemoryKeyStore(AbstractKeyStore):
def delete_fingerprint(self, fingerprint: str) -> None:
def delete_fingerprint(self, fingerprint: KeyFingerprint) -> None:
self.__store.pop(fingerprint, None)
def items(self) -> List[Tuple[str, KeyEntry]]:
def items(self) -> List[Tuple[KeyFingerprint, KeyEntry]]:
return list(self.__store.items())
def get_key_entry(self, fingerprint: str) -> KeyEntry:
def get_key_entry(self, fingerprint: KeyFingerprint) -> KeyEntry:
return self.__store[fingerprint]
def __init__(self) -> None:
self.__store: Dict[str, KeyEntry] = {}
self.__store: Dict[KeyFingerprint, KeyEntry] = {}
def add_fingerprint(self, fingerprint: str, onepass_uid: str, key_path: Path) -> None:
def add_fingerprint(self, fingerprint: KeyFingerprint, onepass_uid: str, key_path: Path) -> None:
self.__store[fingerprint] = KeyEntry(onepass_uid=onepass_uid, key_path=key_path)
class FileKeyStore(AbstractKeyStore):
def delete_fingerprint(self, fingerprint: str) -> None:
def delete_fingerprint(self, fingerprint: KeyFingerprint) -> None:
contents = self.__read_contents(self.__file_path)
contents.pop(fingerprint, None)
self.__save_contents(file_path=self.__file_path, contents=contents)
def items(self) -> List[Tuple[str, KeyEntry]]:
def items(self) -> List[Tuple[KeyFingerprint, KeyEntry]]:
return list(self.__read_contents(self.__file_path).items())
def __init__(self, file_path: Path) -> None:
self.__file_path = file_path
@staticmethod
def __read_contents(file_path: Path) -> Dict[str, KeyEntry]:
def __read_contents(file_path: Path) -> Dict[KeyFingerprint, KeyEntry]:
try:
data = json.loads(file_path.read_text(encoding="utf-8"))
return {k: KeyEntry(onepass_uid=v["onepass_uid"], key_path=Path(v["key_path"])) for k, v in data.items()}
return {KeyFingerprint.from_str(k): KeyEntry(onepass_uid=v["onepass_uid"], key_path=Path(v["key_path"])) for k, v in data.items()}
except IOError:
return {}
@staticmethod
def __save_contents(file_path: Path, contents: Dict[str, KeyEntry]) -> None:
file_path.write_text(json.dumps({k: {"onepass_uid": v.onepass_uid, "key_path": str(v.key_path)} for k, v in contents.items()}), encoding="utf-8")
def __save_contents(file_path: Path, contents: Dict[KeyFingerprint, KeyEntry]) -> None:
file_path.write_text(json.dumps({k.to_str(): {"onepass_uid": v.onepass_uid, "key_path": str(v.key_path)} for k, v in contents.items()}), encoding="utf-8")
def add_fingerprint(self, fingerprint: str, onepass_uid: str, key_path: Path) -> None:
def add_fingerprint(self, fingerprint: KeyFingerprint, onepass_uid: str, key_path: Path) -> None:
contents = self.__read_contents(self.__file_path)
contents[fingerprint] = KeyEntry(onepass_uid=onepass_uid, key_path=key_path)
self.__save_contents(file_path=self.__file_path, contents=contents)
def get_key_entry(self, fingerprint: str) -> KeyEntry:
def get_key_entry(self, fingerprint: KeyFingerprint) -> KeyEntry:
return self.__read_contents(self.__file_path)[fingerprint]
......
......@@ -4,6 +4,7 @@ from pathlib import Path
import pytest
from op_askpass.fingerprint_generator import KeyFingerprint
from op_askpass.key_store import AbstractKeyStore, MemoryKeyStore, FileKeyStore, KeyEntry
......@@ -11,7 +12,7 @@ class AbstractKeyStoreTests(abc.ABC):
def test_add_key_fingerprint_should_correctly_add_fingerprint(self) -> None:
key_store: AbstractKeyStore = self.key_store_factory()
onepass_uid = "XXXXXX"
fingerprint = "SSSSSSSSS"
fingerprint = KeyFingerprint(length=10, hash="SSSSSSSSS", comment="x", key_type="some")
path = Path("x")
expected_entry = KeyEntry(onepass_uid=onepass_uid, key_path=path)
......@@ -23,13 +24,13 @@ class AbstractKeyStoreTests(abc.ABC):
def test_delete_key_fingerprint_should_correctly_remove_a_fingerprint(self) -> None:
key_store: AbstractKeyStore = self.key_store_factory()
onepass_uid = "XXXXXX"
fingerprint = "SSSSSSSSS"
fingerprint = KeyFingerprint(length=10, hash="SSSSSSSSS", comment="x", key_type="some")
path = Path("x")
key_store.add_fingerprint(fingerprint=fingerprint, onepass_uid=onepass_uid, key_path=path)
key_store.delete_fingerprint(fingerprint=fingerprint)
with pytest.raises(KeyError, match=fingerprint):
with pytest.raises(KeyError, match=fingerprint.hash):
key_store.get_key_entry(fingerprint=fingerprint)
assert key_store.items() == []
......@@ -37,7 +38,7 @@ class AbstractKeyStoreTests(abc.ABC):
key_store: AbstractKeyStore = self.key_store_factory()
with pytest.raises(KeyError, match="not-existing"):
key_store.get_key_entry(fingerprint="not-existing")
key_store.get_key_entry(fingerprint=KeyFingerprint(length=10, hash="not-existing", comment="x", key_type="some"))
@abc.abstractmethod
def key_store_factory(self) -> AbstractKeyStore:
......
......@@ -12,7 +12,7 @@ def test_delete_key_from_path_should_remove_the_key(tmp_path: Path) -> None:
fingerprint_generator = MD5FingerprintGenerator()
uid = "some-uid"
fingerprint = fingerprint_generator.for_path(key_path)
key_store.add_fingerprint(fingerprint=fingerprint, onepass_uid=uid, key_path=None)
key_store.add_fingerprint(fingerprint=fingerprint, onepass_uid=uid, key_path=key_path)
delete_key_from_path(path=key_path, fingerprint_generator=fingerprint_generator, key_store=key_store)
......
import json
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from subprocess import DEVNULL
from op_askpass.key_loader import AbstractKeyLoader
from op_askpass.key_store import AbstractKeyStore
__all__ = ["get_password_from_op", "login_to_op", "setup_op_client"]
def download_op_client(download_url: str, download_dir: Path) -> None:
def __download_op_client(download_url: str, download_dir: Path) -> None:
subprocess.check_call(["wget", download_url], cwd=str(download_dir))
def unzip_op_client_archive(archive_path: Path, extract_dir: Path) -> None:
def __unzip_op_client_archive(archive_path: Path, extract_dir: Path) -> None:
subprocess.check_call(["unzip", str(archive_path)], cwd=str(extract_dir))
def verify_signature(client_path: Path, signature_file: Path) -> None:
def __verify_signature(client_path: Path, signature_file: Path) -> None:
subprocess.check_call(["gpg", "--verify", str(signature_file), str(client_path)])
return
def signin_to_op(executable: Path, domain: str, email: str) -> None:
def __signin_to_op(executable: Path, domain: str, email: str) -> None:
subprocess.check_call([str(executable), "signin", domain, email, "--output=raw"])
def get_password_from_op(executable: Path, item_name: str) -> str:
output = subprocess.check_output([str(executable), "get", "item", item_name]).decode("utf-8").strip()
output = (
subprocess.check_output([str(executable), "get", "item", item_name])
.decode("utf-8")
.strip()
)
obj = json.loads(output)
return obj["details"]["password"]
def login_to_op(executable: Path, op_domain: str, key_store: AbstractKeyStore) -> None:
session_key = subprocess.check_output([str(executable), "signin", op_domain, "--output=raw"]).decode("utf-8").strip()
env = {**os.environ, f"OP_SESSION_{op_domain}": session_key}
def login_to_op(
executable: Path,
op_domain: str,
key_store: AbstractKeyStore,
key_loader: AbstractKeyLoader,
) -> None:
session_key = (
subprocess.check_output([str(executable), "signin", op_domain, "--output=raw"])
.decode("utf-8")
.strip()
)
loaded_fingerprints = set(key_loader.list_loaded_keys())
for fingerprint, key_entry in key_store.items():
subprocess.check_call(
["ssh-add", str(key_entry.key_path)],
env={**env, "OP_ASKPASS_ITEM_NAME": key_entry.onepass_uid, "SSH_ASKPASS": "op-askpass-get-password"},
stdin=DEVNULL,
if fingerprint in loaded_fingerprints:
print(f"Skipping key {key_entry.key_path}. Already loaded.")
continue
key_loader.load_key(
key_path=key_entry.key_path,
op_domain=op_domain,
op_session_key=session_key,
op_uid=key_entry.onepass_uid,
)
def setup_op_client(download_url: str, install_path: Path, op_domain: str, op_email: str, verify: bool= True) -> None:
def setup_op_client(
download_url: str,
install_path: Path,
op_domain: str,
op_email: str,
verify: bool = True,
) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(str(tmpdir))
download_dir = tmpdir / "downloaded_client"
download_dir.mkdir()
download_op_client(download_url=download_url, download_dir=download_dir)
__download_op_client(download_url=download_url, download_dir=download_dir)
extract_dir = tmpdir / "extracted_client"
extract_dir.mkdir()
unzip_op_client_archive(archive_path=download_dir / "op_linux_amd64_v0.5.7.zip", extract_dir=extract_dir)
__unzip_op_client_archive(
archive_path=download_dir / "op_linux_amd64_v0.5.7.zip",
extract_dir=extract_dir,
)
if verify:
verify_signature(client_path=extract_dir / "op", signature_file=extract_dir / "op.sig")
__verify_signature(
client_path=extract_dir / "op", signature_file=extract_dir / "op.sig"
)
shutil.copy(src=str(extract_dir / "op"), dst=str(install_path / "op"))
signin_to_op(executable=install_path / "op", domain=op_domain, email=op_email)
__signin_to_op(executable=install_path / "op", domain=op_domain, email=op_email)
......@@ -12,26 +12,24 @@ with open("dev_requirements.txt", "r") as fh:
setup(
name="op_askpass",
version="0.0.2",
author="Example Author",
author_email="author@example.com",
description="A small example package",
author="Maciej Gol",
author_email="1kroolik1@gmail.com",
description="Add password-protected ssh keys promptless using 1Password.",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/pypa/sampleproject",
packages=["op_askpass"],
install_requires=requirements,
extras_require={
"dev": dev_requirements,
},
extras_require={"dev": dev_requirements},
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
entry_points={
'console_scripts': [
'op-askpass=op_askpass.cli.main:main',
'op-askpass-get-password=op_askpass.cli.get_password:main',
],
"console_scripts": [
"op-askpass=op_askpass.cli.main:main",
"op-askpass-get-password=op_askpass.cli.get_password:main",
]
},
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment