Skip to content
Snippets Groups Projects
Commit 25850c86 authored by Tristan Maat's avatar Tristan Maat
Browse files

_sandboxchroot.py: Improve mount interface

parent 6a43fc34
No related branches found
No related tags found
Loading
......@@ -34,16 +34,78 @@ from . import utils, _signals
from . import Sandbox, SandboxFlags
class Mount():
def __init__(self, platform):
self.platform = platform
def _mount(self, src, dest, mount_type=None, options=None):
argv = [utils.get_host_tool('mount')]
if mount_type:
argv.extend([self.platform.switch(sunos='-F', default='-t'), mount_type])
if options:
argv.extend(['-o', options])
process = subprocess.Popen(
argv + [src, dest],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
_, err = process.communicate()
status = process.poll()
if status != 0:
raise ElementError('Mounting {} failed with exit code {}:\n{}'
.format(src, status, err.decode('utf-8')))
return dest
def _umount(self, path):
args = self.platform.switch(linux='-l', default=[])
process = subprocess.Popen(
[utils.get_host_tool('umount'), args, path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
_, err = process.communicate()
status = process.poll()
if status != 0:
raise ElementError('Unmounting {} failed with exit code {}:\n{}'
.format(path, status, err.decode('utf-8')))
@contextmanager
def mount(self, src, dest, mount_type=None, **kwargs):
options = ','.join([key for key, val in kwargs.items() if val])
yield self._mount(src, dest, mount_type, options)
self._umount(dest)
@contextmanager
def bind_mount(self, src, dest, **kwargs):
options = ','.join([key for key, val in kwargs.items() if val])
options += self.platform.switch(linux=',' if options else '' + 'rbind',
aix=',' if options else '' + 'rbind',
sunos='')
mount_type = self.platform.switch(aix='lofs', default=None)
yield self._mount(src, dest, mount_type, options)
self._umount(dest)
class SandboxChroot(Sandbox):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._mount_bind = self._get_context()._platform.switch(
sunos=[utils.get_host_tool('mount'), '-F', 'lofs'],
aix=[utils.get_host_tool('mount'), '-o', 'bind'],
linux=[utils.get_host_tool('mount'), '--rbind'],
# openbsd=[utils.get_host_tool('mount_nullfs')]
)
self.mount = Mount(self._get_context()._platform)
def run(self, command, flags, cwd=None, env=None):
......@@ -115,7 +177,7 @@ class SandboxChroot(Sandbox):
with ExitStack() as stack:
stack.enter_context(self.create_devices(flags))
stack.enter_context(self.mount(flags))
stack.enter_context(self.mount_dirs(flags))
stack.enter_context(_signals.suspendable(suspend, resume))
stack.enter_context(_signals.terminator(terminate))
......@@ -256,7 +318,7 @@ class SandboxChroot(Sandbox):
with ExitStack() as stack:
stack.enter_context(self.create_devices(flags))
stack.enter_context(self.mount(flags))
stack.enter_context(self.mount_dirs(flags))
process = subprocess.Popen(
[utils.get_host_tool('chroot'), self.get_directory()] + command,
......@@ -306,147 +368,28 @@ class SandboxChroot(Sandbox):
# flags (:class:`.SandboxFlags`): The sandbox flags
#
@contextmanager
def mount(self, flags):
mounting = []
if flags & SandboxFlags.INTERACTIVE:
mount_point = self.create_mount_point('/dev/', 'dir')
mounting.append(self.mount_bind('/dev/', mount_point))
mount_point = self.create_mount_point('/proc/', 'dir')
mounting.append(self.mount_proc(mount_point))
mount_point = self.create_mount_point('/tmp/', 'dir')
mounting.append(self.mount_bind('/tmp/', mount_point))
# User defined
# mount_map = MountMap(self, False)
# root_mount_source = mount_map.get_mount_source('/')
# mount_root_cmd = self._mount_bind + [root_mount_source, '/']
for point, process in mounting:
_, err = process.communicate()
status = process.poll()
if not status == 0:
raise ElementError('Mounting {} failed with exit code {}: {}'
.format(point, status, err.decode('utf-8')))
yield
unmounting = []
for point, _ in mounting:
unmounting.append(self.unmount(point))
for point, process in unmounting:
_, err = process.communicate()
status = process.poll()
if status != 0:
raise ElementError('Unmounting {} failed with exit code {}:\n{}'
.format(point, status, err.decode('utf-8')))
point = os.path.join(self.get_directory(), point.lstrip(os.sep))
def mount_dirs(self, flags):
# Try removing a file first, in case we mounted a file.
try:
os.remove(point)
# If the file is a directory, remove the directory.
except IsADirectoryError:
shutil.rmtree(point)
# If no file is found, the file was removed by umount.
except FileNotFoundError:
pass
# mount_bind()
#
# Create a binding mount from the given source to a target in the sandbox.
#
# Args:
# source (str): The directory or file to mount
# target (str): The directory or file inside the sandbox to mount to
# stdout (file): The stdout to report to
# stderr (file): The stderr to report to
#
# Returns:
# (tuple): (sandbox_target, process)
#
def mount_bind(self, source, point):
process = subprocess.Popen(
self._mount_bind + [source, point],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
return (point, process)
# mount_proc()
#
# Mount the proc device - do *not* use mount_bind() for this.
#
def mount_proc(self, point):
cmd = self._get_context()._platform.switch(
sunos=[utils.get_host_tool('mount'), '-F', 'proc', '/proc', point],
linux=[utils.get_host_tool('mount'), '-t', 'proc', '/proc', point],
aix=[utils.get_host_tool('mount'), '-o', 'bind', '/proc', point],
# openbsd=[utils.get_host_tool('mount_nullfs')]
)
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
return (point, process)
# unmount()
#
# Unmount a directory in the sandbox.
#
# Args:
# target (str): The directory or file inside the sandbox to unmount
# stdout (file): The stdout to report to
# stderr (file): The stderr to report to
#
# Returns:
# (tuple): (sandbox_target, process)
#
def unmount(self, point):
process = subprocess.Popen(
[utils.get_host_tool('umount'), point],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
with ExitStack() as stack:
if flags & SandboxFlags.INTERACTIVE:
point = os.path.join(self.get_directory(), '/dev/'.lstrip(os.sep))
os.makedirs(point, exist_ok=True)
stack.enter_context(self.mount.bind_mount('/dev/', point))
return (point, process)
point = os.path.join(self.get_directory(), '/proc/'.lstrip(os.sep))
os.makedirs(point, exist_ok=True)
stack.enter_context(self.mount.mount('/proc/', point, 'proc', ro=True))
# create_mount_point()
#
# Create a mount point.
#
# Args:
# target (str): The path to create
# kind ('file'|'dir'): Whether to create a file or a dir
#
# Returns:
# (str): The location of the mount point
#
def create_mount_point(self, target, kind):
mount_point = os.path.join(self.get_directory(), target.lstrip(os.sep))
point = os.path.join(self.get_directory(), '/tmp/'.lstrip(os.sep))
os.makedirs(point, exist_ok=True)
stack.enter_context(self.mount.bind_mount('/tmp/', point))
if kind == 'dir':
os.makedirs(mount_point, exist_ok=True)
elif kind == 'file':
try:
open(mount_point, mode='x')
except FileExistsError:
pass
# User defined
# mount_map = MountMap(self, False)
# root_mount_source = mount_map.get_mount_source('/')
# mount_root_cmd = self._mount_bind + [root_mount_source, '/']
return mount_point
yield
# mknod()
#
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment