...
 
Commits (8)
......@@ -254,6 +254,7 @@ impl Opt {
let chunk_settings = self.get_chunk_settings();
let multifile =
multifile::MultiFile::open_defaults(&self.repo, Some(chunk_settings), &key)
.await
.with_context(|| "Exeprienced an internal backend error.")?;
Ok((DynamicBackend::from(multifile), key))
}
......
......@@ -35,6 +35,7 @@ pub async fn new(options: Opt) -> Result<()> {
create_dir_all(&options.repo)?;
// Open the repository and set the key
let mut mf = MultiFile::open_defaults(&options.repo, Some(settings), &key)
.await
.with_context(|| "Unable to create MultiFile directory.")?;
mf.write_key(&encrypted_key)
.await
......
......@@ -143,14 +143,10 @@ impl Backend for DynamicBackend {
DynamicBackend::FlatFile(x) => x.read_chunk(location).await,
}
}
async fn write_chunk(
&mut self,
chunk: Chunk,
id: asuran::repository::ChunkID,
) -> Result<SegmentDescriptor> {
async fn write_chunk(&mut self, chunk: Chunk) -> Result<SegmentDescriptor> {
match self {
DynamicBackend::MultiFile(x) => x.write_chunk(chunk, id).await,
DynamicBackend::FlatFile(x) => x.write_chunk(chunk, id).await,
DynamicBackend::MultiFile(x) => x.write_chunk(chunk).await,
DynamicBackend::FlatFile(x) => x.write_chunk(chunk).await,
}
}
async fn close(&mut self) {
......
......@@ -136,6 +136,21 @@ impl UnpackedChunk {
}
}
/// A split representation of a `Chunk`'s 'header' or metadata.
/// Used for on disk storage
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ChunkHeader {
compression: Compression,
encryption: Encryption,
hmac: HMAC,
mac: Vec<u8>,
id: ChunkID,
}
/// A split representation of a `Chunk`'s body, or contained data
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ChunkBody(pub Vec<u8>);
/// A binary blob, ready to be commited to storage
///
/// A `Chunk` is an arbitrary sequence of bytes, along with its associated `ChunkID`
......@@ -291,6 +306,32 @@ impl Chunk {
self.id
}
/// Splits a `Chunk` into its header and body components
pub fn split(self) -> (ChunkHeader, ChunkBody) {
let header = ChunkHeader {
compression: self.compression,
encryption: self.encryption,
hmac: self.hmac,
mac: self.mac,
id: self.id,
};
let body = ChunkBody(self.data);
(header, body)
}
/// Combines a header and a body into a `Chunk`
pub fn unsplit(header: ChunkHeader, body: ChunkBody) -> Chunk {
Chunk {
data: body.0,
compression: header.compression,
encryption: header.encryption,
hmac: header.hmac,
mac: header.mac,
id: header.id,
}
}
#[cfg(test)]
#[cfg_attr(tarpaulin, skip)]
/// Testing only function used to corrupt the data
......@@ -388,4 +429,23 @@ mod tests {
assert!(id.verify(&data1));
assert!(!id.verify(&data2));
}
#[test]
fn split_unsplit() {
let data_string = "I am but a humble test string";
let data_bytes = data_string.as_bytes().to_vec();
let compression = Compression::LZ4 { level: 1 };
let encryption = Encryption::new_aes256ctr();
let hmac = HMAC::SHA256;
let key = Key::random(32);
let packed = Chunk::pack(data_bytes, compression, encryption, hmac, &key);
let (header, body) = packed.split();
let packed = Chunk::unsplit(header, body);
let result = packed.unpack(&key);
assert!(result.is_ok());
}
}
......@@ -54,7 +54,7 @@ fn get_repo(key: Key) -> Repository<impl BackendClone> {
encryption: Encryption::new_aes256ctr(),
hmac: HMAC::Blake3,
};
let backend = Mem::new(settings);
let backend = Mem::new(settings, key.clone());
Repository::with(backend, settings, key)
}
......
......@@ -39,7 +39,7 @@ fn get_repo(key: Key) -> Repository<impl BackendClone> {
encryption: Encryption::new_aes256ctr(),
hmac: HMAC::Blake2bp,
};
let backend = Mem::new(settings);
let backend = Mem::new(settings, key.clone());
Repository::with(backend, settings, key)
}
......
......@@ -96,8 +96,8 @@ mod tests {
hmac: HMAC::Blake2b,
};
let backend = crate::repository::backend::mem::Mem::new(settings);
let key = Key::random(32);
let backend = crate::repository::backend::mem::Mem::new(settings, key.clone());
let repo = Repository::with(backend, settings, key);
let mut manifest = Manifest::load(&repo);
......@@ -110,8 +110,8 @@ mod tests {
#[tokio::test(threaded_scheduler)]
async fn new_archive_updates_time() {
let settings = ChunkSettings::lightweight();
let backend = crate::repository::backend::mem::Mem::new(settings);
let key = Key::random(32);
let backend = crate::repository::backend::mem::Mem::new(settings, key.clone());
let repo = Repository::with(backend.clone(), settings, key);
let mut manifest = Manifest::load(&repo);
......
......@@ -422,7 +422,7 @@ mod tests {
fn get_repo_mem(key: Key) -> Repository<impl BackendClone> {
let settings = ChunkSettings::lightweight();
let backend = Mem::new(settings);
let backend = Mem::new(settings, key.clone());
Repository::with(backend, settings, key)
}
......
......@@ -163,7 +163,7 @@ impl<T: BackendClone + 'static> Repository<T> {
// Get highest segment and check to see if has enough space
let backend = &mut self.backend;
let location = backend.write_chunk(chunk, id).await?;
let location = backend.write_chunk(chunk).await?;
self.backend.get_index().set_chunk(id, location).await?;
......@@ -309,7 +309,7 @@ mod tests {
hmac: HMAC::Blake2b,
encryption: Encryption::new_aes256ctr(),
};
let backend = Mem::new(settings);
let backend = Mem::new(settings, key.clone());
Repository::with(backend, settings, key)
}
......
......@@ -37,6 +37,8 @@ pub enum BackendError {
FileLockError,
#[error("Cancelled oneshot")]
CancelledOneshotError(#[from] futures::channel::oneshot::Canceled),
#[error("Chunk Unpacking Error")]
ChunkUnpackError(#[from] asuran_core::repository::chunk::ChunkError),
#[error("Unknown Error")]
Unknown(String),
}
......@@ -127,7 +129,7 @@ pub trait Backend: 'static + Send + Sync + std::fmt::Debug + 'static {
///
/// This must be passed owned data because it will be sent into a task, so the caller has no
/// control over drop time
async fn write_chunk(&mut self, chunk: Chunk, id: ChunkID) -> Result<SegmentDescriptor>;
async fn write_chunk(&mut self, chunk: Chunk) -> Result<SegmentDescriptor>;
/// Consumes the current backend handle, and does any work necessary to
/// close out the backend properly
///
......
......@@ -21,7 +21,15 @@ impl LockedFile {
pub fn open_read_write<T: AsRef<Path>>(path: T) -> Result<Option<LockedFile>> {
// generate the lock file path
let path = path.as_ref().to_path_buf();
let lock_file_path = path.with_extension("lock");
let extension = if let Some(ext) = path.extension() {
// FIXME: Really need to handle this in a way that doesn't panic on non unicode
let mut ext = ext.to_str().unwrap().to_string();
ext.push_str(".lock");
ext
} else {
"lock".to_string()
};
let lock_file_path = path.with_extension(extension);
// Check to see if the lock file exists
if Path::exists(&lock_file_path) {
// Unable to return the lock, failing
......
......@@ -58,7 +58,7 @@ pub trait SyncBackend: 'static + Send + std::fmt::Debug {
fn write_key(&mut self, key: EncryptedKey) -> Result<()>;
fn read_key(&mut self) -> Result<EncryptedKey>;
fn read_chunk(&mut self, location: SegmentDescriptor) -> Result<Chunk>;
fn write_chunk(&mut self, chunk: Chunk, id: ChunkID) -> Result<SegmentDescriptor>;
fn write_chunk(&mut self, chunk: Chunk) -> Result<SegmentDescriptor>;
}
enum SyncIndexCommand {
......@@ -80,7 +80,7 @@ enum SyncManifestCommand<I> {
enum SyncBackendCommand {
ReadChunk(SegmentDescriptor, oneshot::Sender<Result<Chunk>>),
WriteChunk(Chunk, ChunkID, oneshot::Sender<Result<SegmentDescriptor>>),
WriteChunk(Chunk, oneshot::Sender<Result<SegmentDescriptor>>),
ReadKey(oneshot::Sender<Result<EncryptedKey>>),
WriteKey(EncryptedKey, oneshot::Sender<Result<()>>),
Close(oneshot::Sender<()>),
......@@ -158,8 +158,8 @@ where
SyncBackendCommand::ReadChunk(location, ret) => {
ret.send(backend.read_chunk(location)).unwrap();
}
SyncBackendCommand::WriteChunk(chunk, id, ret) => {
ret.send(backend.write_chunk(chunk, id)).unwrap();
SyncBackendCommand::WriteChunk(chunk, ret) => {
ret.send(backend.write_chunk(chunk)).unwrap();
}
SyncBackendCommand::WriteKey(key, ret) => {
ret.send(backend.write_key(key)).unwrap();
......@@ -353,11 +353,11 @@ impl<B: SyncBackend> Backend for BackendHandle<B> {
.unwrap();
o.await?
}
async fn write_chunk(&mut self, chunk: Chunk, id: ChunkID) -> Result<SegmentDescriptor> {
async fn write_chunk(&mut self, chunk: Chunk) -> Result<SegmentDescriptor> {
let (i, o) = oneshot::channel();
self.channel
.send(SyncCommand::Backend(SyncBackendCommand::WriteChunk(
chunk, id, i,
chunk, i,
)))
.await
.unwrap();
......
......@@ -259,8 +259,9 @@ impl SyncBackend for FlatFile {
))
}
}
fn write_chunk(&mut self, chunk: Chunk, id: ChunkID) -> Result<SegmentDescriptor> {
fn write_chunk(&mut self, chunk: Chunk) -> Result<SegmentDescriptor> {
let start = self.write.seek(SeekFrom::End(0))?;
let id = chunk.get_id();
let tx = FlatFileTransaction::Insert { chunk, id };
rmps::encode::write(&mut self.write, &tx)?;
Ok(SegmentDescriptor {
......
use crate::repository::backend::common;
use crate::repository::backend::common::sync_backend::*;
use crate::repository::backend::*;
use crate::repository::{Chunk, EncryptedKey};
use crate::repository::{Chunk, EncryptedKey, Key};
use std::collections::HashMap;
use std::convert::TryInto;
......@@ -17,25 +17,34 @@ pub struct Mem {
manifest: Vec<StoredArchive>,
chunk_settings: ChunkSettings,
key: Option<EncryptedKey>,
actual_key: Key,
len: u64,
}
impl Mem {
pub fn new_raw(chunk_settings: ChunkSettings) -> Mem {
pub fn new_raw(chunk_settings: ChunkSettings, key: Key) -> Mem {
let max = usize::max_value().try_into().unwrap();
let data = common::Segment::new(Cursor::new(Vec::new()), max).unwrap();
let data = common::Segment::new(
Cursor::new(Vec::new()),
Cursor::new(Vec::new()),
max,
chunk_settings,
key.clone(),
)
.unwrap();
Mem {
data,
index: HashMap::new(),
manifest: Vec::new(),
chunk_settings,
actual_key: key,
key: None,
len: num_cpus::get() as u64,
}
}
pub fn new(chunk_settings: ChunkSettings) -> BackendHandle<Mem> {
BackendHandle::new(Self::new_raw(chunk_settings))
pub fn new(chunk_settings: ChunkSettings, key: Key) -> BackendHandle<Mem> {
BackendHandle::new(Self::new_raw(chunk_settings, key))
}
}
......@@ -116,8 +125,8 @@ impl SyncBackend for Mem {
fn read_chunk(&mut self, location: SegmentDescriptor) -> Result<Chunk> {
self.data.read_chunk(location.start)
}
fn write_chunk(&mut self, chunk: Chunk, id: ChunkID) -> Result<SegmentDescriptor> {
let start = self.data.write_chunk(chunk, id)?;
fn write_chunk(&mut self, chunk: Chunk) -> Result<SegmentDescriptor> {
let start = self.data.write_chunk(chunk)?;
Ok(SegmentDescriptor {
segment_id: 0,
start,
......@@ -134,15 +143,16 @@ mod tests {
#[tokio::test(threaded_scheduler)]
#[should_panic]
async fn bad_key_access() {
let backend = Mem::new(ChunkSettings::lightweight());
let key = Key::random(32);
let backend = Mem::new(ChunkSettings::lightweight(), key);
backend.read_key().await.unwrap();
}
/// Checks to make sure setting and retriving a key works
#[tokio::test(threaded_scheduler)]
async fn key_sanity() {
let backend = Mem::new(ChunkSettings::lightweight());
let key = Key::random(32);
let backend = Mem::new(ChunkSettings::lightweight(), key.clone());
let key_key = [0_u8; 128];
let encrypted_key =
EncryptedKey::encrypt(&key, 1024, 1, Encryption::new_aes256ctr(), &key_key);
......
......@@ -31,7 +31,7 @@ impl MultiFile {
/// Will error if creating or locking any of the index or manifest files
/// fails (such as if the user does not have permissions for that
/// directory), or if any other I/O error occurs
pub fn open_defaults(
pub async fn open_defaults(
path: impl AsRef<Path>,
chunk_settings: Option<ChunkSettings>,
key: &Key,
......@@ -39,9 +39,19 @@ impl MultiFile {
let size_limit = 2_000_000_000;
let segments_per_directory = 100;
let index_handle = index::Index::open(&path)?;
let manifest_handle = manifest::Manifest::open(&path, chunk_settings, key)?;
let segment_handle =
segment::SegmentHandler::open(&path, size_limit, segments_per_directory)?;
let mut manifest_handle = manifest::Manifest::open(&path, chunk_settings, key)?;
let chunk_settings = if let Some(chunk_settings) = chunk_settings {
chunk_settings
} else {
manifest_handle.chunk_settings().await
};
let segment_handle = segment::SegmentHandler::open(
&path,
size_limit,
segments_per_directory,
chunk_settings,
key.clone(),
)?;
let path = path.as_ref().to_path_buf();
Ok(MultiFile {
index_handle,
......@@ -104,8 +114,8 @@ impl Backend for MultiFile {
}
/// Starts writing a chunk, and returns a oneshot reciever with the result of that process
async fn write_chunk(&mut self, chunk: Chunk, id: ChunkID) -> Result<SegmentDescriptor> {
self.segment_handle.write_chunk(chunk, id).await
async fn write_chunk(&mut self, chunk: Chunk) -> Result<SegmentDescriptor> {
self.segment_handle.write_chunk(chunk).await
}
/// Closes out the index, segment handler, and manifest cleanly, making sure all operations are
......@@ -128,17 +138,19 @@ mod tests {
use tempfile::{tempdir, TempDir};
// Utility function, sets up a tempdir and opens a MultiFile Backend
fn setup(key: &Key) -> (TempDir, MultiFile) {
async fn setup(key: &Key) -> (TempDir, MultiFile) {
let tempdir = tempdir().unwrap();
let path = tempdir.path().to_path_buf();
let mf = MultiFile::open_defaults(path, Some(ChunkSettings::lightweight()), key).unwrap();
let mf = MultiFile::open_defaults(path, Some(ChunkSettings::lightweight()), key)
.await
.unwrap();
(tempdir, mf)
}
#[tokio::test]
async fn key_store_load() {
let key = Key::random(32);
let (tempdir, mut mf) = setup(&key);
let (tempdir, mut mf) = setup(&key).await;
// Encrypt the key and store it
let enc_key = EncryptedKey::encrypt(&key, 512, 1, Encryption::new_aes256ctr(), b"");
mf.write_key(&enc_key).await.expect("Unable to write key");
......
......@@ -98,8 +98,8 @@ impl<T: Backend> Backend for BackendWrapper<T> {
async fn read_chunk(&mut self, location: SegmentDescriptor) -> Result<Chunk> {
self.0.read_chunk(location).await
}
async fn write_chunk(&mut self, chunk: Chunk, id: ChunkID) -> Result<SegmentDescriptor> {
self.0.write_chunk(chunk, id).await
async fn write_chunk(&mut self, chunk: Chunk) -> Result<SegmentDescriptor> {
self.0.write_chunk(chunk).await
}
async fn close(&mut self) {
self.0.close().await
......@@ -130,8 +130,8 @@ impl Backend for BackendObject {
async fn read_chunk(&mut self, location: SegmentDescriptor) -> Result<Chunk> {
self.read_chunk(location).await
}
async fn write_chunk(&mut self, chunk: Chunk, id: ChunkID) -> Result<SegmentDescriptor> {
self.write_chunk(chunk, id).await
async fn write_chunk(&mut self, chunk: Chunk) -> Result<SegmentDescriptor> {
self.write_chunk(chunk).await
}
async fn close(&mut self) {
self.close().await
......
......@@ -18,12 +18,12 @@ pub fn get_repo_mem(key: Key) -> Repository<impl BackendClone> {
hmac: HMAC::Blake2b,
encryption: Encryption::new_aes256ctr(),
};
let backend = asuran::repository::backend::mem::Mem::new(settings);
let backend = asuran::repository::backend::mem::Mem::new(settings, key.clone());
Repository::with(backend, settings, key)
}
#[allow(dead_code)]
pub fn get_repo_bare(path: &str, key: Key) -> Repository<impl BackendClone> {
pub async fn get_repo_bare(path: &str, key: Key) -> Repository<impl BackendClone> {
let settings = ChunkSettings {
compression: Compression::ZStd { level: 1 },
hmac: HMAC::Blake2b,
......@@ -34,6 +34,7 @@ pub fn get_repo_bare(path: &str, key: Key) -> Repository<impl BackendClone> {
Some(settings),
&key,
)
.await
.unwrap();
Repository::with(backend, settings, key)
}
......
......@@ -18,7 +18,7 @@ async fn backup_restore_no_empty_dirs_filesystem() {
let repo_root = tempdir().unwrap();
let repo_root_path = repo_root.path().to_str().unwrap();
let key = Key::random(32);
let mut repo = common::get_repo_bare(repo_root_path, key);
let mut repo = common::get_repo_bare(repo_root_path, key).await;
let chunker = FastCDC::default();
let archive = ActiveArchive::new("test");
......
......@@ -12,7 +12,7 @@ async fn put_drop_get_multifile() {
let tempdir = tempdir().unwrap();
let root_path = tempdir.path().to_str().unwrap();
let key = Key::random(32);
let mut repo = common::get_repo_bare(root_path, key.clone());
let mut repo = common::get_repo_bare(root_path, key.clone()).await;
let chunker = FastCDC::default();
......@@ -47,7 +47,7 @@ async fn put_drop_get_multifile() {
println!("Manifest: \n {:?}", manifest);
}
repo.close().await;
let mut repo = common::get_repo_bare(root_path, key);
let mut repo = common::get_repo_bare(root_path, key).await;
let mut manifest = Manifest::load(&mut repo);
let archive = manifest.archives().await[0].load(&mut repo).await.unwrap();
......