Commit 016ecc15 authored by Blaise Thompson's avatar Blaise Thompson Committed by Kyle Sunden
Browse files

avro

parent aa168305
......@@ -13,12 +13,13 @@ before_script:
- pip install -U virtualenv
- virtualenv venv
- source venv/bin/activate
- pip install -U twine
- pip install -U flit
- pip install -U pytest
- pip install -U mypy
- pip install -U yaqd-core
- pip install -U aiohttp
- pip install -e .
- pip install -U yaq-traits
- pip install .
# test stage
mypy:
......@@ -30,13 +31,17 @@ entry:
script:
- yaqd-gdrive --version
- yaqd-gdrive -h
traits:
stage: test
script:
- yaq-traits check yaqd_gdrive/gdrive.avpr
# deploy stage
twine:
stage: deploy
script:
- python setup.py sdist bdist_wheel
- twine upload dist/*
- flit publish
artifacts:
paths:
- dist/*
......
include LICENSE
include README.md
[build-system]
requires = ["flit_core >=2,<4"]
build-backend = "flit_core.buildapi"
[tool.flit.metadata]
module = "yaqd_gdrive"
author = "yaq developers"
author-email = "git@ksunden.space"
home-page = "https://yaq.fyi"
description-file = "README.md"
requires-python = ">=3.7"
requires = ["yaqd-core>=2020.06.3", "aiohttp"]
classifiers = [
"Development Status :: 2 - Pre-Alpha",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)",
"Natural Language :: English",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Topic :: Scientific/Engineering",
]
[tool.flit.metadata.urls]
Source = "https://gitlab.com/yaq/yaqd-gdrive"
Issues = "https://gitlab.com/yaq/yaqd-gdrive/issues"
[tool.flit.metadata.requires-extra]
dev = ["black", "pre-commit"]
[tool.flit.scripts]
yaqd-gdrive = "yaqd_gdrive._gdrive:GDrive.main"
[tool.black]
line-length = 99
target-version = ['py36', 'py37', 'py38']
target-version = ['py37', 'py38']
include = '\.pyi?$'
exclude = '''
/(
......@@ -15,5 +48,3 @@ exclude = '''
| dist
)/
'''
#! /usr/bin/env python3
import os
from setuptools import setup, find_packages
here = os.path.abspath(os.path.dirname(__file__))
def read(fname):
return open(os.path.join(here, fname)).read()
with open(os.path.join(here, "yaqd_gdrive", "VERSION")) as version_file:
version = version_file.read().strip()
extra_files = {"yaqd_gdrive": ["VERSION"]}
setup(
name="yaqd-gdrive",
packages=find_packages(exclude=("tests", "tests.*")),
package_data=extra_files,
python_requires=">=3.7",
install_requires=["yaqd-core>=2020.05.1", "aiohttp"],
extras_require={
"docs": ["sphinx", "sphinx-gallery>=0.3.0", "sphinx-rtd-theme"],
"dev": ["black", "pre-commit", "pydocstyle"],
},
version=version,
description="Google Drive yaq daemon",
long_description=read("README.md"),
long_description_content_type="text/markdown",
author="yaq Developers",
license="LGPL v3",
url="http://gitlab.com/yaq/yaqd-gdrive",
project_urls={
"Source": "https://gitlab.com/yaq/yaqd-gdrive",
"Documentation": "https://yaq.fyi",
"Issue Tracker": "https://gitlab.com/yaq/yaqd-gdrive/issues",
},
entry_points={"console_scripts": ["yaqd-gdrive=yaqd_gdrive._gdrive:GDrive.main"]},
keywords="spectroscopy science multidimensional hardware",
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)",
"Natural Language :: English",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Topic :: Scientific/Engineering",
],
)
"""Daemon interface for writing files to Google Drive"""
from .__version__ import *
from ._gdrive import *
......@@ -14,8 +14,6 @@ import aiohttp.web # type: ignore
import appdirs # type: ignore
import yaqd_core
from .__version__ import __branch__
UploadItem = collections.namedtuple(
"UploadItem", "kind name path parent client_id", defaults=[None]
......@@ -28,7 +26,7 @@ def refresh_oauth(func):
res = await func(self, *args, **kwargs)
if res.status != 401:
return res
self._access_token = None
self._state["access_token"] = None
try:
await self._use_refresh_token()
except:
......@@ -41,27 +39,20 @@ def refresh_oauth(func):
class GDrive(yaqd_core.Base):
_kind = "gdrive"
_version = "1.0.0" + f"+{__branch__}" if __branch__ else ""
defaults = {
"scopes": ["https://www.googleapis.com/auth/drive.file"],
"authorization_url": "https://accounts.google.com/o/oauth2/v2/auth",
"token_url": "https://www.googleapis.com/oauth2/v4/token",
"create_file_url": "https://www.googleapis.com/upload/drive/v3/files",
"generate_ids_url": "https://www.googleapis.com/drive/v3/files/generateIds",
"update_file_url": "https://www.googleapis.com/upload/drive/v3/files/{fileId}", # Needs fileId
"download_url": "https://drive.google.com/uc?id={fileId}", # Needs fileId
"open_url": "https://drive.google.com/open?id={fileId}", # Needs fileId
}
def __init__(self, name, config, config_filepath):
print(config, yaqd_core)
super().__init__(name, config, config_filepath)
self._http_session = aiohttp.ClientSession()
self._upload_queue = []
self._copy_queue = []
self._id_mapping = {}
self._state["upload_queue"] = [
UploadItem(*[None if i == "None" else i for i in item])
for item in self._state["upload_queue"]
]
self._state["copy_queue"] = [
UploadItem(*[None if i == "None" else i for i in item])
for item in self._state["copy_queue"]
]
self._free_ids = []
self._access_token = None
self._refresh_token = None
super().__init__(name, config, config_filepath)
self._client_secret = config["client_secret"]
self._client_id = config["client_id"]
self._root_folder_id = config["root_folder_id"]
......@@ -81,7 +72,7 @@ class GDrive(yaqd_core.Base):
@property
def _auth_header(self):
return {"Authorization": f"Bearer {self._access_token}"}
return {"Authorization": f"Bearer {self._state['access_token']}"}
async def _authorize(self):
code = asyncio.Future()
......@@ -117,7 +108,7 @@ class GDrive(yaqd_core.Base):
json={
"code": code,
"client_id": self._client_id,
"client_secret": self._client_secret,
"client_secret": self._config["client_secret"],
"redirect_uri": "http://127.0.0.1:39202",
"grant_type": "authorization_code",
},
......@@ -125,31 +116,31 @@ class GDrive(yaqd_core.Base):
self.logger.debug(await res.text())
res.raise_for_status()
json = await res.json()
self._access_token = json["access_token"]
self._refresh_token = json["refresh_token"]
self._state["access_token"] = json["access_token"]
self._state["refresh_token"] = json["refresh_token"]
async def _use_refresh_token(self):
self.logger.info("Refreshing token")
async with self._http_session.post(
self._token_url,
json={
"refresh_token": self._refresh_token,
"refresh_token": self._state["refresh_token"],
"client_id": self._client_id,
"client_secret": self._client_secret,
"grant_type": "refresh_token",
},
) as res:
res.raise_for_status()
self._access_token = (await res.json())["access_token"]
self._state["access_token"] = (await res.json())["access_token"]
return res
@refresh_oauth
async def _create_file(self, name, parent, file_=None, *, id_=None, mime_type=None):
async def _create_file(self, name, parent, file_=None, *, id=None, mime_type=None):
# TODO: investigate using resumable uploads instead of multipart
# May be more reliable
with aiohttp.MultipartWriter("related") as mpwriter:
mpwriter.append_json(
{"name": name, "parents": [parent], "id": id_, "mimeType": mime_type}
{"name": name, "parents": [parent], "id": id, "mimeType": mime_type}
)
if file_ is not None:
with open(file_, "rb") as f:
......@@ -164,19 +155,19 @@ class GDrive(yaqd_core.Base):
self.logger.debug(await res.text())
return res
async def _create_folder(self, name, parent, *, id_=None):
async def _create_folder(self, name, parent, *, id=None):
await self._create_file(
name, parent, id_=id_, mime_type="application/vnd.google-apps.folder"
name, parent, id=id, mime_type="application/vnd.google-apps.folder"
)
@refresh_oauth
async def _update_file(self, file_, id_):
async def _update_file(self, file_, id):
with aiohttp.MultipartWriter("related") as mpwriter:
mpwriter.append_json({"mimeType": None})
with open(file_, "rb") as f:
mpwriter.append(f.read())
async with self._http_session.patch(
self._update_file_url.format(fileId=id_),
self._update_file_url.format(fileId=id),
headers=self._auth_header,
params={"uploadType": "multipart"},
data=mpwriter,
......@@ -196,28 +187,6 @@ class GDrive(yaqd_core.Base):
self._free_ids += ids
return res
def get_state(self):
return {
"access_token": self._access_token,
"refresh_token": self._refresh_token,
"upload_queue": self._upload_queue,
"copy_queue": self._copy_queue,
"id_mapping": self._id_mapping,
}
def _load_state(self, state):
self._access_token = state.get("access_token")
self._refresh_token = state.get("refresh_token")
upload_queue = state.get("upload_queue", [])
self._upload_queue = [
UploadItem(*[None if i == "None" else i for i in item]) for item in upload_queue
]
copy_queue = state.get("copy_queue", [])
self._copy_queue = [
UploadItem(*[None if i == "None" else i for i in item]) for item in copy_queue
]
self._id_mapping = state.get("id_mapping", {})
async def _stock_ids(self):
while True:
if len(self._free_ids) < 32:
......@@ -225,15 +194,15 @@ class GDrive(yaqd_core.Base):
await asyncio.sleep(0.1)
async def _get_id(self, client_id=None):
id_ = self._id_mapping.get(client_id)
id = self._state["id_mapping"].get(client_id)
# Avoid popping if id is already reserved
if id_ is None:
if id is None:
while not self._free_ids:
await asyncio.sleep(0.01)
id_ = self._free_ids.pop(0)
id = self._free_ids.pop(0)
if client_id is not None:
self._id_mapping[client_id] = id_
return id_
self._state["id_mapping"][client_id] = id
return id
def _dir_enqueue(self, path, queue, parent_id):
for child in path.iterdir():
......@@ -244,25 +213,25 @@ class GDrive(yaqd_core.Base):
async def _upload(self):
while True:
self.logger.debug("_upload", len(self._upload_queue))
while self._upload_queue:
self.logger.debug("_upload", len(self._state["upload_queue"]))
while self._state["upload_queue"]:
self._busy = True
try:
item = self._upload_queue[0]
item = self._state["upload_queue"][0]
path = pathlib.Path(item.path)
id_ = await self._get_id(item.client_id)
id = await self._get_id(item.client_id)
parent = item.parent if item.parent else self._root_folder_id
if item.kind == "folder_create":
await self._create_folder(item.name, parent, id_=id_)
await self._create_folder(item.name, parent, id=id)
elif item.kind == "file_create":
await self._create_file(item.name, parent, path, id_=id_)
await self._create_file(item.name, parent, path, id=id)
elif item.kind == "file_update":
await self._update_file(path, id_)
await self._update_file(path, id)
except FileNotFoundError:
self._upload_queue.pop(0)
self._state["upload_queue"].pop(0)
except BaseException as e:
self.logger.error(e)
self._upload_queue.append(self._upload_queue.pop(0))
self._state["upload_queue"].append(self._state["upload_queue"].pop(0))
else:
try:
if str(path).startswith(str(self._cache_dir)):
......@@ -270,16 +239,16 @@ class GDrive(yaqd_core.Base):
except FileNotFoundError:
pass
self._upload_queue.pop(0)
self._state["upload_queue"].pop(0)
self._busy = False
await asyncio.sleep(0.01)
await asyncio.sleep(1)
async def _copy(self):
while True:
while self._copy_queue:
while self._state["copy_queue"]:
try:
item = self._copy_queue[0]._asdict()
item = self._state["copy_queue"][0]._asdict()
path = pathlib.Path(item["path"])
if path.is_file():
fd, tmp = tempfile.mkstemp(
......@@ -288,86 +257,86 @@ class GDrive(yaqd_core.Base):
os.close(fd)
shutil.copy(path, tmp)
item["path"] = tmp
self._upload_queue.append(UploadItem(**item))
self._state["upload_queue"].append(UploadItem(**item))
elif item["kind"] == "folder_upload":
item["kind"] = "folder_create"
id_ = await self._get_id(item["client_id"])
id = await self._get_id(item["client_id"])
if item["client_id"] is None:
self._id_mapping[id_] = id_
item["client_id"] = id_
self._upload_queue.append(UploadItem(**item))
self._dir_enqueue(path, self._copy_queue, id_)
self._state["id_mapping"][id] = id
item["client_id"] = id
self._state["upload_queue"].append(UploadItem(**item))
self._dir_enqueue(path, self._state["copy_queue"], id)
else:
self._upload_queue.append(UploadItem(**item))
self._state["upload_queue"].append(UploadItem(**item))
except BaseException as e:
self.logger.error(e)
self._copy_queue.append(self._copy_queue.pop(0))
self._state["copy_queue"].append(self._state["copy_queue"].pop(0))
else:
self._copy_queue.pop(0)
self._state["copy_queue"].pop(0)
await asyncio.sleep(0.01)
await asyncio.sleep(1)
def reserve_id(self, client_id, drive_id=None):
client_id = str(client_id)
if drive_id is None:
drive_id = self._id_mapping.get(client_id)
drive_id = self._state["id_mapping"].get(client_id)
if drive_id is None:
drive_id = self._free_ids.pop(0)
self._id_mapping[client_id] = drive_id
self._state["id_mapping"][client_id] = drive_id
return drive_id
def id_to_open_url(self, id_):
return self._open_url.format(fileId=self._id_mapping.get(id_, id_))
def id_to_open_url(self, id):
return self._open_url.format(fileId=self._state["id_mapping"].get(id, id))
def id_to_download_url(self, id_):
return self._download_url.format(fileId=self._id_mapping.get(id_, id_))
def id_to_download_url(self, id):
return self._download_url.format(fileId=self._state["id_mapping"].get(id, id))
def create_folder(self, path, parent_id=None, id_=None):
def create_folder(self, path, parent_id=None, id=None):
path = pathlib.Path(path)
self._upload_queue.append(
self._state["upload_queue"].append(
UploadItem(
"folder_create",
path.name,
str(path),
self._id_mapping.get(parent_id, parent_id),
id_,
self._state["id_mapping"].get(parent_id, parent_id),
id,
)
)
def upload_folder(self, path, parent_id=None, id_=None):
def upload_folder(self, path, parent_id=None, id=None):
path = pathlib.Path(path)
self._copy_queue.append(
self._state["copy_queue"].append(
UploadItem(
"folder_upload",
path.name,
str(path),
self._id_mapping.get(parent_id, parent_id),
id_,
self._state["id_mapping"].get(parent_id, parent_id),
id,
)
)
def create_file(self, path, parent_id=None, id_=None):
def create_file(self, path, parent_id=None, id=None):
path = pathlib.Path(path)
self._copy_queue.append(
self._state["copy_queue"].append(
UploadItem(
"file_create",
path.name,
str(path),
self._id_mapping.get(parent_id, parent_id),
id_,
self._state["id_mapping"].get(parent_id, parent_id),
id,
)
)
def update_file(self, path, id_=None):
def update_file(self, path, id=None):
path = pathlib.Path(path)
self._copy_queue.append(UploadItem("file_update", path.name, str(path), None, id_))
self._state["copy_queue"].append(UploadItem("file_update", path.name, str(path), None, id))
def is_uploaded(self, id_):
for item in self._copy_queue:
if item.client_id == id_:
def is_uploaded(self, id):
for item in self._state["copy_queue"]:
if item.client_id == id:
return False
for item in self._upload_queue:
if item.client_id == id_:
for item in self._state["upload_queue"]:
if item.client_id == id:
return False
return True
......@@ -375,7 +344,3 @@ class GDrive(yaqd_core.Base):
loop = asyncio.get_event_loop()
loop.create_task(self._http_session.close())
super().close()
if __name__ == "__main__":
GDrive.main()
{
"config": {
"authorization_url": {
"default": "https://accounts.google.com/o/oauth2/v2/auth",
"type": "string"
},
"client_id": {
"type": "string"
},
"client_secret": {
"type": "string"
},
"create_file_url": {
"default": "https://www.googleapis.com/upload/drive/v3/files",
"type": "string"
},
"download_url": {
"default": "https://drive.google.com/uc?id={file_id}",
"type": "string"
},
"generate_ids_url": {
"default": "https://www.googleapis.com/drive/v3/files/generateIds",
"type": "string"
},
"make": {
"default": null,
"type": [
"null",
"string"
]
},
"model": {
"default": null,
"type": [
"null",
"string"
]
},
"open_url": {
"default": "https://drive.google.com/open?id={file_id}",
"type": "string"
},
"port": {
"doc": "TCP port for daemon to occupy.",
"type": "int"
},
"root_folder_id": {
"type": "string"
},
"scopes": {
"default": [
"https://www.googleapis.com/auth/drive.file"
],
"items": "string",
"type": "array"
},
"serial": {
"default": null,
"doc": "Serial number for the particular device represented by the daemon",
"type": [
"null",
"string"
]
},
"token_url": {
"default": "https://www.googleapis.com/oauth2/v4/token",
"type": "string"
},
"update_file_url": {
"default": "https://www.googleapis.com/upload/drive/v3/files/{file_id}",
"type": "string"
}
},
"doc": "Upload files to a specified google drive folder",
"installation": {
"PyPI": "https://pypi.org/project/yaqd-gdrive"
},
"links": {
"bugtracker": "https://gitlab.com/yaq/yaqd-gdrive/issues",
"source": "https://gitlab.com/yaq/yaqd-gdrive"
},
"messages": {
"busy": {
"doc": "Returns true if daemon is currently busy.",
"request": [],
"response": "boolean"
},
"create_file": {
"doc": "Create a new file on the remote drive",
"request": [