...
 
Commits (2)
import os
from op_askpass import operations
from op_askpass.configuration import get_configuration_directory
def main() -> None:
item_name = os.environ["OP_ASKPASS_ITEM_NAME"]
print(operations.get_password_from_op(executable=get_configuration_directory() / "op", item_name=item_name))
import tempfile
from pathlib import Path
from typing import Optional
import click
from op_askpass import operations
from op_askpass.configuration import get_configuration_directory
from op_askpass.fingerprint_generator import SSHKeyGenFingerprintGenerator
from op_askpass.key_store import FileKeyStore
from op_askpass.key_store import get_default_key_store
@click.group()
......@@ -15,17 +18,44 @@ def main() -> None:
@main.command()
def list_keys() -> None:
""" Print already stored keys fingerprint and their 1Password uids. """
keys = operations.list_keys(key_store=FileKeyStore(file_path=Path("/home/mg/.op-askpass.json")))
keys = operations.list_keys(key_store=get_default_key_store())
for fingerprint, onepass_uid in keys:
print(fingerprint, "||", onepass_uid)
@main.command()
@click.argument("op_domain")
@click.argument("op_email")
@click.option("--install-path", type=Path, help="Where the 1Password client should be installed.")
@click.option("--verify/--no-verify", default=True, help="Should the download 1Password client binary be verified.")
def setup_op_client(op_domain: str, op_email: str, verify: bool, install_path: Optional[Path]) -> None:
""" Download the 1Password API client and login to email in domain. """
operations.setup_op_client(
download_url="https://cache.agilebits.com/dist/1P/op/pkg/v0.5.7/op_linux_amd64_v0.5.7.zip",
install_path=install_path or get_configuration_directory(),
verify=verify,
op_domain=op_domain,
op_email=op_email,
)
@main.command()
@click.argument("op_domain")
@click.option("--install-path", type=Path, help="Where the 1Password client was installed.")
def login(op_domain: str, install_path: Optional[Path]) -> None:
install_path = install_path or get_configuration_directory()
operations.login_to_op(install_path / "op", op_domain, key_store=get_default_key_store())
@main.command()
@click.argument("public_or_private_key", type=Path)
def delete_key(public_or_private_key: Path) -> None:
""" Remove given key from prompt-less store. """
operations.delete_key_from_path(path=public_or_private_key, fingerprint_generator=SSHKeyGenFingerprintGenerator(),
key_store=FileKeyStore(file_path=Path("/home/mg/.op-askpass.json")))
operations.delete_key_from_path(
path=public_or_private_key,
fingerprint_generator=SSHKeyGenFingerprintGenerator(),
key_store=get_default_key_store(),
)
@main.command()
......@@ -33,5 +63,9 @@ def delete_key(public_or_private_key: Path) -> None:
@click.argument("onepass_name_or_uid", type=str)
def add_key(public_or_private_key: Path, onepass_name_or_uid: str) -> None:
""" Add given key with password stored in 1Password. The key will be available later on for prompt-less add. """
operations.add_key_from_path(path=public_or_private_key, onepass_uid=onepass_name_or_uid, fingerprint_generator=SSHKeyGenFingerprintGenerator(),
key_store=FileKeyStore(file_path=Path("/home/mg/.op-askpass.json")))
operations.add_key_from_path(
path=public_or_private_key,
onepass_uid=onepass_name_or_uid,
fingerprint_generator=SSHKeyGenFingerprintGenerator(),
key_store=get_default_key_store(),
)
from pathlib import Path
def get_configuration_directory() -> Path:
dir = Path.home() / ".op-askpass"
dir.mkdir(exist_ok=True)
return dir
import abc
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple
from op_askpass.configuration import get_configuration_directory
@dataclass
class KeyEntry:
key_path: Path
onepass_uid: str
class AbstractKeyStore(abc.ABC):
@abc.abstractmethod
def add_fingerprint(self, fingerprint: str, onepass_uid: str) -> None:
def add_fingerprint(self, fingerprint: str, onepass_uid: str, key_path: Path) -> None:
...
@abc.abstractmethod
......@@ -14,28 +23,28 @@ class AbstractKeyStore(abc.ABC):
...
@abc.abstractmethod
def get_onepass_uid(self, fingerprint: str) -> str:
def get_key_entry(self, fingerprint: str) -> KeyEntry:
...
@abc.abstractmethod
def items(self) -> List[Tuple[str, str]]: ...
def items(self) -> List[Tuple[str, KeyEntry]]: ...
class MemoryKeyStore(AbstractKeyStore):
def delete_fingerprint(self, fingerprint: str) -> None:
self.__store.pop(fingerprint, None)
def items(self) -> List[Tuple[str, str]]:
def items(self) -> List[Tuple[str, KeyEntry]]:
return list(self.__store.items())
def get_onepass_uid(self, fingerprint: str) -> str:
def get_key_entry(self, fingerprint: str) -> KeyEntry:
return self.__store[fingerprint]
def __init__(self) -> None:
self.__store: Dict[str, str] = {}
self.__store: Dict[str, KeyEntry] = {}
def add_fingerprint(self, fingerprint: str, onepass_uid: str) -> None:
self.__store[fingerprint] = onepass_uid
def add_fingerprint(self, fingerprint: str, onepass_uid: str, key_path: Path) -> None:
self.__store[fingerprint] = KeyEntry(onepass_uid=onepass_uid, key_path=key_path)
class FileKeyStore(AbstractKeyStore):
......@@ -44,28 +53,33 @@ class FileKeyStore(AbstractKeyStore):
contents.pop(fingerprint, None)
self.__save_contents(file_path=self.__file_path, contents=contents)
def items(self) -> List[Tuple[str, str]]:
def items(self) -> List[Tuple[str, KeyEntry]]:
return list(self.__read_contents(self.__file_path).items())
def __init__(self, file_path: Path) -> None:
self.__file_path = file_path
@staticmethod
def __read_contents(file_path: Path) -> Dict[str, str]:
def __read_contents(file_path: Path) -> Dict[str, KeyEntry]:
try:
return json.loads(file_path.read_text(encoding="utf-8"))
data = json.loads(file_path.read_text(encoding="utf-8"))
return {k: KeyEntry(onepass_uid=v["onepass_uid"], key_path=Path(v["key_path"])) for k, v in data.items()}
except IOError:
return {}
@staticmethod
def __save_contents(file_path: Path, contents: Dict[str, str]) -> None:
file_path.write_text(json.dumps(contents), encoding="utf-8")
def __save_contents(file_path: Path, contents: Dict[str, KeyEntry]) -> None:
file_path.write_text(json.dumps({k: {"onepass_uid": v.onepass_uid, "key_path": str(v.key_path)} for k, v in contents.items()}), encoding="utf-8")
def add_fingerprint(self, fingerprint: str, onepass_uid: str) -> None:
def add_fingerprint(self, fingerprint: str, onepass_uid: str, key_path: Path) -> None:
contents = self.__read_contents(self.__file_path)
contents[fingerprint] = onepass_uid
contents[fingerprint] = KeyEntry(onepass_uid=onepass_uid, key_path=key_path)
self.__save_contents(file_path=self.__file_path, contents=contents)
def get_onepass_uid(self, fingerprint: str) -> str:
def get_key_entry(self, fingerprint: str) -> KeyEntry:
return self.__read_contents(self.__file_path)[fingerprint]
def get_default_key_store() -> AbstractKeyStore:
return FileKeyStore(file_path=get_configuration_directory() / "op-askpass.json")
import abc
import uuid
from pathlib import Path
import pytest
from op_askpass.key_store import AbstractKeyStore, MemoryKeyStore, FileKeyStore
from op_askpass.key_store import AbstractKeyStore, MemoryKeyStore, FileKeyStore, KeyEntry
class AbstractKeyStoreTests(abc.ABC):
......@@ -11,29 +12,32 @@ class AbstractKeyStoreTests(abc.ABC):
key_store: AbstractKeyStore = self.key_store_factory()
onepass_uid = "XXXXXX"
fingerprint = "SSSSSSSSS"
path = Path("x")
expected_entry = KeyEntry(onepass_uid=onepass_uid, key_path=path)
key_store.add_fingerprint(fingerprint=fingerprint, onepass_uid=onepass_uid)
key_store.add_fingerprint(fingerprint=fingerprint, onepass_uid=onepass_uid, key_path=path)
assert key_store.get_onepass_uid(fingerprint=fingerprint) == onepass_uid
assert key_store.items() == [(fingerprint, onepass_uid)]
assert key_store.get_key_entry(fingerprint=fingerprint) == expected_entry
assert key_store.items() == [(fingerprint, expected_entry)]
def test_delete_key_fingerprint_should_correctly_remove_a_fingerprint(self) -> None:
key_store: AbstractKeyStore = self.key_store_factory()
onepass_uid = "XXXXXX"
fingerprint = "SSSSSSSSS"
key_store.add_fingerprint(fingerprint=fingerprint, onepass_uid=onepass_uid)
path = Path("x")
key_store.add_fingerprint(fingerprint=fingerprint, onepass_uid=onepass_uid, key_path=path)
key_store.delete_fingerprint(fingerprint=fingerprint)
with pytest.raises(KeyError, match=fingerprint):
key_store.get_onepass_uid(fingerprint=fingerprint)
key_store.get_key_entry(fingerprint=fingerprint)
assert key_store.items() == []
def test_get_onepass_uid_should_raise_key_error_on_missing_fingerprint(self) -> None:
key_store: AbstractKeyStore = self.key_store_factory()
with pytest.raises(KeyError, match="not-existing"):
key_store.get_onepass_uid(fingerprint="not-existing")
key_store.get_key_entry(fingerprint="not-existing")
@abc.abstractmethod
def key_store_factory(self) -> AbstractKeyStore:
......
from op_askpass.operations.list_keys import list_keys
from op_askpass.operations.add_key import add_key_from_path
from op_askpass.operations.delete_key import delete_key_from_path
from op_askpass.operations.op_client import setup_op_client, login_to_op, get_password_from_op
......@@ -6,4 +6,4 @@ from op_askpass.key_store import AbstractKeyStore
def add_key_from_path(path: Path, onepass_uid: str, fingerprint_generator: AbstractFingerprintGenerator, key_store: AbstractKeyStore) -> None:
fingerprint = fingerprint_generator.for_path(path=path)
key_store.add_fingerprint(fingerprint=fingerprint, onepass_uid=onepass_uid)
key_store.add_fingerprint(fingerprint=fingerprint, onepass_uid=onepass_uid, key_path=path.absolute())
from pathlib import Path
from op_askpass.fingerprint_generator import MD5FingerprintGenerator
from op_askpass.key_store import MemoryKeyStore
from op_askpass.key_store import MemoryKeyStore, KeyEntry
from op_askpass.operations.add_key import add_key_from_path
......@@ -14,4 +14,4 @@ def test_add_key_should_add_key_from_path(tmp_path: Path) -> None:
add_key_from_path(path=key_path, onepass_uid=uid, fingerprint_generator=fingerprint_generator, key_store=key_store)
assert key_store.get_onepass_uid(fingerprint=fingerprint_generator.for_path(key_path)) == uid
assert key_store.get_key_entry(fingerprint=fingerprint_generator.for_path(key_path)) == KeyEntry(onepass_uid=uid, key_path=key_path)
......@@ -12,7 +12,7 @@ def test_delete_key_from_path_should_remove_the_key(tmp_path: Path) -> None:
fingerprint_generator = MD5FingerprintGenerator()
uid = "some-uid"
fingerprint = fingerprint_generator.for_path(key_path)
key_store.add_fingerprint(fingerprint=fingerprint, onepass_uid=uid)
key_store.add_fingerprint(fingerprint=fingerprint, onepass_uid=uid, key_path=None)
delete_key_from_path(path=key_path, fingerprint_generator=fingerprint_generator, key_store=key_store)
......
from pathlib import Path
from op_askpass.key_store import MemoryKeyStore
from op_askpass.key_store import MemoryKeyStore, KeyEntry
from op_askpass.operations.list_keys import list_keys
......@@ -8,8 +8,9 @@ def test_list_keys_should_return_list_of_added_keys(tmp_path: Path) -> None:
key_store = MemoryKeyStore()
fingerprint = "some-fingerprint"
uid = "some-uid"
key_store.add_fingerprint(fingerprint=fingerprint, onepass_uid=uid)
path = Path("x")
key_store.add_fingerprint(fingerprint=fingerprint, onepass_uid=uid, key_path=path)
keys = list_keys(key_store=key_store)
assert keys == [(fingerprint, uid)]
assert keys == [(fingerprint, KeyEntry(onepass_uid=uid, key_path=path))]
import json
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from subprocess import DEVNULL
from op_askpass.key_store import AbstractKeyStore
def download_op_client(download_url: str, download_dir: Path) -> None:
subprocess.check_call(["wget", download_url], cwd=str(download_dir))
def unzip_op_client_archive(archive_path: Path, extract_dir: Path) -> None:
subprocess.check_call(["unzip", str(archive_path)], cwd=str(extract_dir))
def verify_signature(client_path: Path, signature_file: Path) -> None:
subprocess.check_call(["gpg", "--verify", str(signature_file), str(client_path)])
return
def signin_to_op(executable: Path, domain: str, email: str) -> None:
subprocess.check_call([str(executable), "signin", domain, email, "--output=raw"])
def get_password_from_op(executable: Path, item_name: str) -> str:
output = subprocess.check_output([str(executable), "get", "item", item_name]).decode("utf-8").strip()
obj = json.loads(output)
return obj["details"]["password"]
def login_to_op(executable: Path, op_domain: str, key_store: AbstractKeyStore) -> None:
session_key = subprocess.check_output([str(executable), "signin", op_domain, "--output=raw"]).decode("utf-8").strip()
env = {**os.environ, f"OP_SESSION_{op_domain}": session_key}
for fingerprint, key_entry in key_store.items():
subprocess.check_call(
["ssh-add", str(key_entry.key_path)],
env={**env, "OP_ASKPASS_ITEM_NAME": key_entry.onepass_uid, "SSH_ASKPASS": "op-askpass-get-password"},
stdin=DEVNULL,
)
def setup_op_client(download_url: str, install_path: Path, op_domain: str, op_email: str, verify: bool= True) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(str(tmpdir))
download_dir = tmpdir / "downloaded_client"
download_dir.mkdir()
download_op_client(download_url=download_url, download_dir=download_dir)
extract_dir = tmpdir / "extracted_client"
extract_dir.mkdir()
unzip_op_client_archive(archive_path=download_dir / "op_linux_amd64_v0.5.7.zip", extract_dir=extract_dir)
if verify:
verify_signature(client_path=extract_dir / "op", signature_file=extract_dir / "op.sig")
shutil.copy(src=str(extract_dir / "op"), dst=str(install_path / "op"))
signin_to_op(executable=install_path / "op", domain=op_domain, email=op_email)
......@@ -6,6 +6,9 @@ with open("README.md", "r") as fh:
with open("requirements.txt", "r") as fh:
requirements = fh.read().splitlines()
with open("dev_requirements.txt", "r") as fh:
dev_requirements = fh.read().splitlines()
setup(
name="op_askpass",
version="0.0.1",
......@@ -17,6 +20,9 @@ setup(
url="https://github.com/pypa/sampleproject",
packages=["op_askpass"],
install_requires=requirements,
extras_require={
"dev": dev_requirements,
},
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
......@@ -25,6 +31,7 @@ setup(
entry_points={
'console_scripts': [
'op-askpass=op_askpass.cli.main:main',
'op-askpass-get-password=op_askpass.cli.get_password:main',
],
},
)