Loading falcon_helpers/contrib/storage.py +24 −3 Original line number Diff line number Diff line import io import mimetypes import os.path import pathlib import mimetypes import uuid try: # When using S3 you need to install boto import botocore import boto3 except ImportError: pass import uuid import falcon Loading Loading @@ -69,6 +69,17 @@ class S3FileStore: resp.status = falcon.HTTP_303 resp.location = self.make_download_url(doc) def _fetch(self, path): """Returns the S3 object from the service""" return self.connection.Object(self.bucket, path.lstrip('/')).get() def fetch_fp(self, doc, opener=None): opener = opener if opener else self._fetch resp = opener(doc.path) return io.BytesIO(resp['Body'].read()) # TODO: this should take a Document not a path def remove(self, path): obj = self.connection.Object(self.bucket, path.lstrip('/')) resp = obj.delete() Loading Loading @@ -164,3 +175,13 @@ class LocalFileStore: resp.content_type = content_type resp.status = falcon.HTTP_200 def fetch_fp(self, doc: Document, opener=None): """Yield a file into a file-like object You can pass `opener` to this function to change the way the file is opened. This uses the opener kwarg for open. """ return open(pathlib.Path(doc.path), mode='rb', opener=opener) falcon_helpers/testing.py 0 → 100644 +64 −0 Original line number Diff line number Diff line import mimetypes import random import string _BOUNDARY_CHARS = string.digits + string.ascii_letters def encode_multipart(fields, files, boundary=None): """Create a multi-part http request Original was found here with an MIT license: http://code.activestate.com/recipes/578668-encode-multipart-form-data-for-uploading-files-via/ Encode dict of form fields and dict of files as multipart/form-data. Return tuple of (body_string, headers_dict). Each value in files is a dict with required keys 'filename' and 'content', and optional 'mimetype' (if not specified, tries to guess mime type or uses 'application/octet-stream'). """ def escape_quote(s): return s.replace('"', '\\"') if boundary is None: boundary = ''.join(random.choice(_BOUNDARY_CHARS) for i in range(30)) lines = [] for name, value in fields.items(): lines.extend(( f'--{boundary}', f'Content-Disposition: form-data; name="{escape_quote(name)}"', '', value, )) for name, value in files.items(): filename = value['filename'] mimetype = (value.get('mimetype') or mimetypes.guess_type(filename)[0] or 'application/octet-stream') name, filename = escape_quote(name), escape_quote(filename) lines.extend(( f'--{boundary}', f'Content-Disposition: form-data; name="{name}"; filename="{filename}"', f'Content-Type: {mimetype}', '', value['content'], )) lines.extend(( f'--{boundary}--', '', )) body = '\r\n'.join(lines) headers = { 'Content-Type': f'multipart/form-data; boundary={boundary}', 'Content-Length': str(len(body)), } return (body, headers) falcon_helpers/tests/test_contrib/test_storage.py +26 −3 Original line number Diff line number Diff line import os.path import pathlib import tempfile import falcon_helpers.contrib.storage as storage def document_from_temp(temp): return storage.Document( name=temp.name, uid=os.path.basename(temp.name), storage_type='local', path=temp.name, details={} ) class TestLocalFileStore: def test_fetching_a_file_by_fp(self): with tempfile.TemporaryDirectory() as d: store = storage.LocalFileStore(d, uidgen=lambda: 'unique-name') with tempfile.NamedTemporaryFile(prefix=d) as rf: rf.write(b'test') rf.seek(0) doc = document_from_temp(rf) fp = store.fetch_fp(doc) with fp as f: assert f.read() == b'test' def test_save_does_not_require_path(self): with tempfile.TemporaryDirectory() as d: store = storage.LocalFileStore(d, uidgen=lambda: 'unique-name') with tempfile.NamedTemporaryFile() as f: with tempfile.NamedTemporaryFile(prefix=d) as f: doc = store.save(f.name, f) assert doc.uid == 'unique-name' assert doc.name == f.name assert doc.path == str(pathlib.Path(d).joinpath(doc.uid)) with tempfile.NamedTemporaryFile() as f: with tempfile.NamedTemporaryFile(prefix=d) as f: doc = store.save(f.name, f, path='other') assert doc.uid == 'unique-name' assert doc.name == f.name assert doc.path == str(pathlib.Path(d).joinpath('other', doc.uid)) Loading
falcon_helpers/contrib/storage.py +24 −3 Original line number Diff line number Diff line import io import mimetypes import os.path import pathlib import mimetypes import uuid try: # When using S3 you need to install boto import botocore import boto3 except ImportError: pass import uuid import falcon Loading Loading @@ -69,6 +69,17 @@ class S3FileStore: resp.status = falcon.HTTP_303 resp.location = self.make_download_url(doc) def _fetch(self, path): """Returns the S3 object from the service""" return self.connection.Object(self.bucket, path.lstrip('/')).get() def fetch_fp(self, doc, opener=None): opener = opener if opener else self._fetch resp = opener(doc.path) return io.BytesIO(resp['Body'].read()) # TODO: this should take a Document not a path def remove(self, path): obj = self.connection.Object(self.bucket, path.lstrip('/')) resp = obj.delete() Loading Loading @@ -164,3 +175,13 @@ class LocalFileStore: resp.content_type = content_type resp.status = falcon.HTTP_200 def fetch_fp(self, doc: Document, opener=None): """Yield a file into a file-like object You can pass `opener` to this function to change the way the file is opened. This uses the opener kwarg for open. """ return open(pathlib.Path(doc.path), mode='rb', opener=opener)
falcon_helpers/testing.py 0 → 100644 +64 −0 Original line number Diff line number Diff line import mimetypes import random import string _BOUNDARY_CHARS = string.digits + string.ascii_letters def encode_multipart(fields, files, boundary=None): """Create a multi-part http request Original was found here with an MIT license: http://code.activestate.com/recipes/578668-encode-multipart-form-data-for-uploading-files-via/ Encode dict of form fields and dict of files as multipart/form-data. Return tuple of (body_string, headers_dict). Each value in files is a dict with required keys 'filename' and 'content', and optional 'mimetype' (if not specified, tries to guess mime type or uses 'application/octet-stream'). """ def escape_quote(s): return s.replace('"', '\\"') if boundary is None: boundary = ''.join(random.choice(_BOUNDARY_CHARS) for i in range(30)) lines = [] for name, value in fields.items(): lines.extend(( f'--{boundary}', f'Content-Disposition: form-data; name="{escape_quote(name)}"', '', value, )) for name, value in files.items(): filename = value['filename'] mimetype = (value.get('mimetype') or mimetypes.guess_type(filename)[0] or 'application/octet-stream') name, filename = escape_quote(name), escape_quote(filename) lines.extend(( f'--{boundary}', f'Content-Disposition: form-data; name="{name}"; filename="{filename}"', f'Content-Type: {mimetype}', '', value['content'], )) lines.extend(( f'--{boundary}--', '', )) body = '\r\n'.join(lines) headers = { 'Content-Type': f'multipart/form-data; boundary={boundary}', 'Content-Length': str(len(body)), } return (body, headers)
falcon_helpers/tests/test_contrib/test_storage.py +26 −3 Original line number Diff line number Diff line import os.path import pathlib import tempfile import falcon_helpers.contrib.storage as storage def document_from_temp(temp): return storage.Document( name=temp.name, uid=os.path.basename(temp.name), storage_type='local', path=temp.name, details={} ) class TestLocalFileStore: def test_fetching_a_file_by_fp(self): with tempfile.TemporaryDirectory() as d: store = storage.LocalFileStore(d, uidgen=lambda: 'unique-name') with tempfile.NamedTemporaryFile(prefix=d) as rf: rf.write(b'test') rf.seek(0) doc = document_from_temp(rf) fp = store.fetch_fp(doc) with fp as f: assert f.read() == b'test' def test_save_does_not_require_path(self): with tempfile.TemporaryDirectory() as d: store = storage.LocalFileStore(d, uidgen=lambda: 'unique-name') with tempfile.NamedTemporaryFile() as f: with tempfile.NamedTemporaryFile(prefix=d) as f: doc = store.save(f.name, f) assert doc.uid == 'unique-name' assert doc.name == f.name assert doc.path == str(pathlib.Path(d).joinpath(doc.uid)) with tempfile.NamedTemporaryFile() as f: with tempfile.NamedTemporaryFile(prefix=d) as f: doc = store.save(f.name, f, path='other') assert doc.uid == 'unique-name' assert doc.name == f.name assert doc.path == str(pathlib.Path(d).joinpath('other', doc.uid))