Commit 7f7fc01e authored by Nick Zaccardi's avatar Nick Zaccardi
Browse files

Add Fetch to Storage Contribs

parent 18845777
Loading
Loading
Loading
Loading
+24 −3
Original line number Diff line number Diff line
import io
import mimetypes
import os.path
import pathlib
import mimetypes
import uuid

try:
    # When using S3 you need to install boto
    import botocore
    import boto3
except ImportError:
    pass

import uuid

import falcon


@@ -69,6 +69,17 @@ class S3FileStore:
        resp.status = falcon.HTTP_303
        resp.location = self.make_download_url(doc)

    def _fetch(self, path):
        """Returns the S3 object from the service"""
        return self.connection.Object(self.bucket, path.lstrip('/')).get()

    def fetch_fp(self, doc, opener=None):
        opener = opener if opener else self._fetch

        resp = opener(doc.path)
        return io.BytesIO(resp['Body'].read())

    # TODO: this should take a Document not a path
    def remove(self, path):
        obj = self.connection.Object(self.bucket, path.lstrip('/'))
        resp = obj.delete()
@@ -164,3 +175,13 @@ class LocalFileStore:
        resp.content_type = content_type

        resp.status = falcon.HTTP_200

    def fetch_fp(self, doc: Document, opener=None):
        """Yield a file into a file-like object

        You can pass `opener` to this function to change the way the file is opened. This uses the
        opener kwarg for open.
        """
        return open(pathlib.Path(doc.path),
                    mode='rb',
                    opener=opener)
+64 −0
Original line number Diff line number Diff line
import mimetypes
import random
import string


_BOUNDARY_CHARS = string.digits + string.ascii_letters


def encode_multipart(fields, files, boundary=None):
    """Create a multi-part http request

    Original was found here with an MIT license:
        http://code.activestate.com/recipes/578668-encode-multipart-form-data-for-uploading-files-via/

    Encode dict of form fields and dict of files as multipart/form-data. Return tuple of
    (body_string, headers_dict).

    Each value in files is a dict with required keys 'filename' and 'content', and optional
    'mimetype' (if not specified, tries to guess mime type or uses 'application/octet-stream').
    """

    def escape_quote(s):
        return s.replace('"', '\\"')

    if boundary is None:
        boundary = ''.join(random.choice(_BOUNDARY_CHARS) for i in range(30))

    lines = []

    for name, value in fields.items():
        lines.extend((
            f'--{boundary}',
            f'Content-Disposition: form-data; name="{escape_quote(name)}"',
            '',
            value,
        ))

    for name, value in files.items():
        filename = value['filename']
        mimetype = (value.get('mimetype') or
                    mimetypes.guess_type(filename)[0] or
                    'application/octet-stream')
        name, filename = escape_quote(name), escape_quote(filename)

        lines.extend((
            f'--{boundary}',
            f'Content-Disposition: form-data; name="{name}"; filename="{filename}"',
            f'Content-Type: {mimetype}',
            '',
            value['content'],
        ))

    lines.extend((
        f'--{boundary}--',
        '',
    ))
    body = '\r\n'.join(lines)

    headers = {
        'Content-Type': f'multipart/form-data; boundary={boundary}',
        'Content-Length': str(len(body)),
    }

    return (body, headers)
+26 −3
Original line number Diff line number Diff line
import os.path
import pathlib
import tempfile
import falcon_helpers.contrib.storage as storage


def document_from_temp(temp):
    return storage.Document(
        name=temp.name,
        uid=os.path.basename(temp.name),
        storage_type='local',
        path=temp.name,
        details={}
    )


class TestLocalFileStore:
    def test_fetching_a_file_by_fp(self):
        with tempfile.TemporaryDirectory() as d:
            store = storage.LocalFileStore(d, uidgen=lambda: 'unique-name')

            with tempfile.NamedTemporaryFile(prefix=d) as rf:
                rf.write(b'test')
                rf.seek(0)

                doc = document_from_temp(rf)
                fp = store.fetch_fp(doc)
                with fp as f:
                    assert f.read() == b'test'

    def test_save_does_not_require_path(self):
        with tempfile.TemporaryDirectory() as d:
            store = storage.LocalFileStore(d, uidgen=lambda: 'unique-name')

            with tempfile.NamedTemporaryFile() as f:
            with tempfile.NamedTemporaryFile(prefix=d) as f:
                doc = store.save(f.name, f)
                assert doc.uid == 'unique-name'
                assert doc.name == f.name
                assert doc.path == str(pathlib.Path(d).joinpath(doc.uid))

            with tempfile.NamedTemporaryFile() as f:
            with tempfile.NamedTemporaryFile(prefix=d) as f:
                doc = store.save(f.name, f, path='other')
                assert doc.uid == 'unique-name'
                assert doc.name == f.name
                assert doc.path == str(pathlib.Path(d).joinpath('other', doc.uid))