Commit 5ef19a0b authored by Valentin David's avatar Valentin David
Browse files

Avoid copying temporary file when adding object to CAS in server.

The file is already a temporary file and does not need copy.  ENOSPC
is thrown during that copy in issue #609.

Fixes #678.
parent 227fa26d
Loading
Loading
Loading
Loading
+25 −17
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ import os
import stat
import tempfile
import uuid
import contextlib
from urllib.parse import urlparse

import grpc
@@ -480,13 +481,14 @@ class CASCache():
    #     digest (Digest): An optional Digest object to populate
    #     path (str): Path to file to add
    #     buffer (bytes): Byte buffer to add
    #     link_directly (bool): Whether file given by path can be linked
    #
    # Returns:
    #     (Digest): The digest of the added object
    #
    # Either `path` or `buffer` must be passed, but not both.
    #
    def add_object(self, *, digest=None, path=None, buffer=None):
    def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
        # Exactly one of the two parameters has to be specified
        assert (path is None) != (buffer is None)

@@ -496,28 +498,34 @@ class CASCache():
        try:
            h = hashlib.sha256()
            # Always write out new file to avoid corruption if input file is modified
            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
            with contextlib.ExitStack() as stack:
                if path is not None and link_directly:
                    tmp = stack.enter_context(open(path, 'rb'))
                    for chunk in iter(lambda: tmp.read(4096), b""):
                        h.update(chunk)
                else:
                    tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
                    # Set mode bits to 0644
                os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
                    os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)

                    if path:
                        with open(path, 'rb') as f:
                            for chunk in iter(lambda: f.read(4096), b""):
                                h.update(chunk)
                            out.write(chunk)
                                tmp.write(chunk)
                    else:
                        h.update(buffer)
                    out.write(buffer)
                        tmp.write(buffer)

                out.flush()
                    tmp.flush()

                digest.hash = h.hexdigest()
                digest.size_bytes = os.fstat(out.fileno()).st_size
                digest.size_bytes = os.fstat(tmp.fileno()).st_size

                # Place file at final location
                objpath = self.objpath(digest)
                os.makedirs(os.path.dirname(objpath), exist_ok=True)
                os.link(out.name, objpath)
                os.link(tmp.name, objpath)

        except FileExistsError as e:
            # We can ignore the failed link() if the object is already in the repo.
@@ -889,7 +897,7 @@ class CASCache():
        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
            self._fetch_blob(remote, digest, f)

            added_digest = self.add_object(path=f.name)
            added_digest = self.add_object(path=f.name, link_directly=True)
            assert added_digest.hash == digest.hash

        return objpath
@@ -900,7 +908,7 @@ class CASCache():
                f.write(data)
                f.flush()

                added_digest = self.add_object(path=f.name)
                added_digest = self.add_object(path=f.name, link_directly=True)
                assert added_digest.hash == digest.hash

    # Helper function for _fetch_directory().
+1 −1
Original line number Diff line number Diff line
@@ -208,7 +208,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
                        context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
                        return response
                    out.flush()
                    digest = self.cas.add_object(path=out.name)
                    digest = self.cas.add_object(path=out.name, link_directly=True)
                    if digest.hash != client_digest.hash:
                        context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
                        return response