Skip to content
Snippets Groups Projects
Select Git revision
  • main default protected
  • v0.22.1
  • v0.22.0
  • v0.21.0
  • v0.20.1
  • v0.20.0
  • v0.19.0
  • v0.18.3
  • v0.18.2
  • v0.18.1
  • v0.18.0
  • v0.17.0
  • v0.16.0
  • v0.15.0
  • v0.14.0
  • v0.13.0
  • v0.12.0
  • v0.11.0
  • v0.10.0
  • v0.9.0
  • v0.8.1
21 results

dbfs.go

  • Marc René Arns's avatar
    9772ea8c
    - add option skip abs spec tests · 9772ea8c
    Marc René Arns authored
    - add tempfs
    - add support for closing a fs after usage
    - close fs after testing for readonly, ext_deleteable and ext_writeable tests
    - import new spectest version with support for hooks
    9772ea8c
    History
    - add option skip abs spec tests
    Marc René Arns authored
    - add tempfs
    - add support for closing a fs after usage
    - close fs after testing for readonly, ext_deleteable and ext_writeable tests
    - import new spectest version with support for hooks
dbfs.go 16.61 KiB
package dbfs

import (
	"bytes"
	"fmt"
	"io"
	iofs "io/fs"
	"os"
	"strings"
	"time"

	"github.com/google/uuid"
	"gitlab.com/golang-utils/fs"
	"gitlab.com/golang-utils/fs/path"
	"zombiezen.com/go/sqlite"
	"zombiezen.com/go/sqlite/sqlitex"
)

type FS struct {
	conn               *sqlite.Conn
	local              path.Local
	remote             *path.Remote
	fileTableName      string
	blobTableName      string
	sqliteFlags        []sqlite.OpenFlags
	shouldCreateTables bool
}

func (f *FS) Close() error {
	// fmt.Println("dbfs closed called")
	if f.conn == nil {
		return nil
	}
	return f.conn.Close()
}

// Option is an option for the dbfs
type Option func(*FS)

// OptCreateTables sets the option for creating/initializing the tables (always the default for in memory databases)
func OptCreateTables() Option {
	return func(fsys *FS) {
		fsys.shouldCreateTables = true
	}
}

// OptSqliteFlags sets the flags of the underlying sqlite db
func OptSqliteFlags(flags ...sqlite.OpenFlags) Option {
	return func(fsys *FS) {
		fsys.sqliteFlags = flags
	}
}

// OptFileTable sets the name of the file table
func OptFileTable(name string) Option {
	return func(fsys *FS) {
		fsys.fileTableName = name
	}
}

// OptBlobTable sets the name of the blob table
func OptBlobTable(name string) Option {
	return func(fsys *FS) {
		fsys.blobTableName = name
	}
}

var _ fs.FS = &FS{}

/*
	PRAGMA journal_mode=DELETE;
	PRAGMA synchronous=FULL;
	PRAGMA foreign_keys=1;
	PRAGMA busy_timeout=5000;
*/

/*
TODO: use the connection pool for concurrency, from the manual:
https://pkg.go.dev/zombiezen.com/go/sqlite#section-readme

An SQLite connection is represented by a *Conn. Connections cannot be used concurrently. A typical Go program will create a pool of connections (e.g. by using zombiezen.com/go/sqlite/sqlitex.NewPool to create a *zombiezen.com/go/sqlite/sqlitex.Pool) so goroutines can borrow a connection while they need to talk to the database.

This package assumes SQLite will be used concurrently by the process through several connections, so the build options for SQLite enable multi-threading and the shared cache.

The implementation automatically handles shared cache locking, see the documentation on Stmt.Step for details.
*/

// New creates a new filesystem, based on a sqlite database that resides in a local file
// file is the location of the database file.
func New(file path.Local, opts ...Option) (*FS, error) {
	if path.IsDir(file) {
		return nil, fs.ErrExpectedFile.Params(file.String())
	}

	return newFS(file, file.ToSystem(), opts...)
}

func NewInMemory(abs path.Absolute, opts ...Option) (*FS, error) {
	if !path.IsDir(abs) {
		return nil, fs.ErrExpectedDir.Params(abs)
	}

	opts = append(opts, OptCreateTables())

	return newFS(abs, ":memory:", opts...)
}

func (fsys *FS) setBase(abs path.Absolute) {
	if rem, is := abs.(*path.Remote); is {
		fsys.remote = rem
	} else {
		fsys.local = abs.(path.Local)
	}
}

func newFS(abs path.Absolute, connectstr string, opts ...Option) (*FS, error) {
	fsys := &FS{}
	fsys.fileTableName = "file"
	fsys.blobTableName = "file_data"

	for _, opt := range opts {
		opt(fsys)
	}

	fsys.setBase(abs)

	if connectstr != ":memory:" {
		if fsys.remote != nil {
			return nil, fmt.Errorf("the given file is not a local file")
		}

		if !fsys.shouldCreateTables {
			info, err := os.Stat(fsys.local.ToSystem())
			if err != nil && os.IsNotExist(err) {
				return nil, err
			}

			if err != nil {
				return nil, err
			}

			if info.IsDir() {
				return nil, fs.ErrExpectedDir.Params(fsys.local.ToSystem())
			}
		}
	}

	conn, err := sqlite.OpenConn(connectstr, fsys.sqliteFlags...)
	if err != nil {
		return nil, err
	}

	fsys.conn = conn

	if fsys.shouldCreateTables {
		err = fsys.createTables()

		if err != nil {
			return nil, err
		}
	}

	return fsys, nil
}

func (fsys *FS) createTables() error {

	sql := fmt.Sprintf(`CREATE TABLE %s ( uuid TEXT PRIMARY KEY NOT NULL, modified INTEGER NOT NULL, name TEXT NOT NULL, rel_dir TEXT NOT NULL, is_dir INTEGER NOT NULL);`, fsys.fileTableName)

	err := sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
		ResultFunc: func(stmt *sqlite.Stmt) error {
			return nil
		},
	})

	if err != nil {
		return err
	}

	sql = fmt.Sprintf(`CREATE UNIQUE INDEX file_name ON %s(rel_dir, name);`, fsys.fileTableName)

	err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
		ResultFunc: func(stmt *sqlite.Stmt) error {
			return nil
		},
	})

	if err != nil {
		return err
	}

	sql = fmt.Sprintf(`CREATE TABLE %s ( uuid TEXT PRIMARY KEY NOT NULL, data BLOB NOT NULL CONSTRAINT uuid REFERENCES %s(uuid) ON DELETE CASCADE);`, fsys.blobTableName, fsys.fileTableName)

	err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
		ResultFunc: func(stmt *sqlite.Stmt) error {
			return nil
		},
	})

	if err != nil {
		return err
	}

	return nil
}

func (fsys *FS) Delete(p path.Relative, recursive bool) (err error) {
	if p.IsRoot() {
		return fs.ErrNotSupported.Params(fsys, "Deleting root dir")
	}
	if !fsys.Exists(p) {
		return nil
	}

	if !path.IsDir(p) && !p.IsRoot() {
		sql := fmt.Sprintf(`DELETE  FROM %s WHERE rel_dir = '%s' AND name = '%s' AND is_dir = 0 ;`, fsys.fileTableName, p.Dir().String(), path.Name(p))
		return sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
			ResultFunc: func(stmt *sqlite.Stmt) error {
				return nil
			},
		})
	}

	if recursive {
		//sql := fmt.Sprintf(`DELETE FROM files WHERE (rel_dir = like '%s' + "%%") OR ( (rel_dir = '%s') AND (name = '%s') AND (is_dir = 1) ) ;`,
		sql := fmt.Sprintf(`DELETE FROM %s WHERE (rel_dir LIKE '%s%%')  OR ( (rel_dir = '%s') AND (name = '%s') AND (is_dir = 1) ) ;`,
			fsys.fileTableName,
			p.String(), p.Dir().String(), path.Name(p))

		//	fmt.Println(sql)
		return sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
			ResultFunc: func(stmt *sqlite.Stmt) error {
				return nil
			},
		})
	}

	sql := fmt.Sprintf(`SELECT count(uuid) FROM %s WHERE (rel_dir LIKE '%s%%') LIMIT 1 ;`, fsys.fileTableName, p.String())

	var no int
	err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
		ResultFunc: func(stmt *sqlite.Stmt) error {
			no = stmt.ColumnInt(0)
			return nil
		},
	})

	if err != nil {
		return err
	}

	if no > 0 {
		return fs.ErrNotEmpty.Params(p.String())
	}

	sql = fmt.Sprintf(`DELETE  FROM %s WHERE (rel_dir = '%s' AND name = '%s' AND is_dir = 1) ;`, fsys.fileTableName,
		p.Dir().String(), path.Name(p))

	return sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
		ResultFunc: func(stmt *sqlite.Stmt) error {
			return nil
		},
	})
}

func (fsys *FS) Write(p path.Relative, rd io.ReadCloser, inbetween bool) (err error) {
	defer func() {
		if rd != nil {
			rd.Close()
		}
	}()

	// we don't need to write directories, since we don't have them and they are always "exist" (just a prefix)
	if p.IsRoot() {
		return nil
	}

	id := uuid.New().String()
	modified := time.Now().Unix()
	name := path.Name(p)
	rel_dir := p.Dir().String()

	var sql string

	if path.IsDir(p) {
		if fsys.Exists(p) {
			return nil
		}

		if !inbetween && (!p.Dir().IsRoot() && !fsys.Exists(p.Dir())) {
			return fs.ErrNotFound.Params(p.Dir().String())
		}

		parents := parentDirs(p)

		/*
			TODO: make a transaction
		*/

		/*
			TODO optimize this into one statement
		*/
		for _, parent := range parents {
			err = fsys.Write(path.Relative(parent), nil, false)
			if err != nil {
				return err
			}
		}

		sql = fmt.Sprintf(`INSERT into %s (uuid,modified,name,rel_dir,is_dir) VALUES('%s',%v,'%s','%s',1) ;`, fsys.fileTableName,
			id, modified, name, rel_dir,
		)

		err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
			ResultFunc: func(stmt *sqlite.Stmt) error {
				return nil
			},
		})

		if err != nil {
			// TODO: rollback
			return err
		}

		return nil
	}

	if !inbetween && (!p.Dir().IsRoot() && !fsys.Exists(p.Dir())) {
		return fs.ErrNotFound.Params(p.Dir().String())
	}

	parents := parentDirs(p)

	/*
		TODO: make a transaction
	*/

	/*
		TODO optimize this into one statement
	*/
	for _, parent := range parents {
		err = fsys.Write(path.Relative(parent), nil, false)
		if err != nil {
			return err
		}
	}

	if rd == nil {
		return fs.ErrWhileReading
	}

	if fsys.Exists(p) {
		err = fsys.Delete(p, false)

		if err != nil {
			// TODO: rollback
			return err
		}

	}

	sql = fmt.Sprintf(`INSERT into %s (uuid,modified,name,rel_dir,is_dir) VALUES('%s',%v,'%s','%s',0) ;`, fsys.fileTableName,
		id, modified, name, rel_dir,
	)

	//fmt.Println(sql)

	err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
		ResultFunc: func(stmt *sqlite.Stmt) error {
			//	stmt.BindZeroBlob()
			return nil
		},
	})

	if err != nil {
		// TODO: rollback
		return err
	}

	var size int64 = -1
	var bt []byte

	fl, isFile := rd.(*os.File)

	if isFile {
		finf, err := fl.Stat()
		if err != nil {
			//fmt.Printf("can't write to blob: %s\n", err.Error())
			// TODO: rollback
			return fs.ErrWhileReading
		}

		size = finf.Size()
		//fl.Close()
	}

	fl1, isFile1 := rd.(iofs.File)

	if isFile1 {
		finf, err := fl1.Stat()
		if err != nil {
			//fmt.Printf("can't write to blob: %s\n", err.Error())
			// TODO: rollback
			return fs.ErrWhileReading
		}

		size = finf.Size()
	}

	if size == -1 {
		bt, err = io.ReadAll(rd)

		if err != nil {
			//fmt.Printf("can't write to blob: %s\n", err.Error())
			// TODO: rollback
			return fs.ErrWhileReading
		}

		size = int64(len(bt))
	}

	sql = fmt.Sprintf(`INSERT into %s (uuid,data) VALUES('%s',zeroblob(%v)) ;`, fsys.blobTableName,
		id, size,
	)

	//fmt.Println(sql)

	err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
		ResultFunc: func(stmt *sqlite.Stmt) error {
			//	stmt.BindZeroBlob()
			return nil
		},
	})

	if err != nil {
		// TODO: rollback
		return err
	}

	sql = fmt.Sprintf(`SELECT rowid FROM %s WHERE uuid = '%s' LIMIT 1;`, fsys.blobTableName, id)

	//fmt.Println(sql)
	var rowid int64

	err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
		ResultFunc: func(stmt *sqlite.Stmt) error {
			rowid = stmt.ColumnInt64(0)
			return nil
		},
	})

	if err != nil {
		// TODO: rollback
		return err
	}

	if rowid == 0 {
		return fs.ErrNotFound
	}

	blob, err := fsys.conn.OpenBlob("", fsys.blobTableName, "data", rowid, true)

	if err != nil {
		// TODO: rollback
		//fmt.Printf("can't open blob: %s\n", err.Error())
		return err
	}

	//fmt.Printf("size of blob: %v\n", blob.Size())

	//io.Copy()
	//blob.Write()
	switch {
	case isFile:
		//fmt.Printf("isFile: %q\n", fl.Name())
		_, err = io.Copy(blob, fl)
	case isFile1:
		//st, _ := fl1.Stat()
		//fmt.Printf("isFile1: %#v\n", st.Name())
		_, err = io.Copy(blob, fl1)
	default:
		_, err = blob.Write(bt)
	}

	//_, err = io.Copy(blob, bytes.NewReader(bt))
	defer blob.Close()
	if err != nil {
		//	fmt.Printf("can't write to blob: %s\n", err.Error())
		// TODO: rollback
		return err
	}
	return nil
}

func (fsys *FS) ModTime(p path.Relative) (t time.Time, err error) {
	if path.IsDir(p) {
		return t, fs.ErrNotSupported
	}

	sql := fmt.Sprintf(`SELECT modified FROM %s WHERE (is_dir = 0) AND (rel_dir = '%s') AND (name = '%s');`, fsys.fileTableName, p.Dir().String(), path.Name(p))

	//	fmt.Println(sql)
	var unix int64

	err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
		ResultFunc: func(stmt *sqlite.Stmt) error {
			unix = stmt.ColumnInt64(0)
			return nil
		},
	})

	//	fmt.Printf("error: %v\n", err)

	if err != nil {
		return t, err
	}

	if unix == 0 {
		return t, fs.ErrNotFound.Params(p)
	}

	return time.Unix(unix, 0), nil
}

func (m *FS) Abs(r path.Relative) path.Absolute {
	if r.IsRoot() {
		if m.remote != nil {
			return m.remote
		}
		return m.local
	}

	if m.remote != nil {
		return m.remote.Join(r.String())
	}
	return m.local.Join(r.String())
}

func (fsys *FS) Exists(p path.Relative) bool {
	if p.IsRoot() {
		return true
	}

	if path.IsDir(p) {
		sql := fmt.Sprintf(`SELECT Count(uuid) FROM %s WHERE rel_dir = '%s' AND name = '%s' AND is_dir = 1 LIMIT 1;`, fsys.fileTableName, p.Dir().String(), path.Name(p))
		//	fmt.Println(sql)
		var no int

		err := sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
			ResultFunc: func(stmt *sqlite.Stmt) error {
				no = stmt.ColumnInt(0)
				return nil
			},
		})

		if err != nil {
			//		fmt.Printf("error: %s\n", err.Error())
			return false
		}
		//	fmt.Printf("no: %v\n", no)

		return no >= 1
	}

	sql := fmt.Sprintf(`SELECT Count(uuid) FROM %s WHERE rel_dir = '%s' AND name = '%s' AND is_dir = 0 LIMIT 1;`, fsys.fileTableName, p.Dir().String(), path.Name(p))
	// fmt.Println(sql)
	var no int

	err := sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
		ResultFunc: func(stmt *sqlite.Stmt) error {
			no = stmt.ColumnInt(0)
			return nil
		},
	})

	if err != nil {
		return false
	}

	return no >= 1
}

func (fsys *FS) Size(file path.Relative) int64 {
	if file.IsRoot() || path.IsDir(file) {
		return -2
	}

	if !fsys.Exists(file) {
		return -1
	}

	//sql := fmt.Sprintf(`SELECT length(data) FROM files WHERE rel_dir = '%s' AND name = '%s' LIMIT 1;`, file.Dir().String(), path.Name(file))
	var uuid string

	sql := fmt.Sprintf(`SELECT uuid FROM %s WHERE rel_dir = '%s' AND name = '%s' LIMIT 1;`, fsys.fileTableName, file.Dir().String(), path.Name(file))

	err := sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
		ResultFunc: func(stmt *sqlite.Stmt) error {
			rd := stmt.ColumnReader(0)
			if rd == nil {
				return fs.ErrNotFound
			}

			bt, err := io.ReadAll(rd)

			if err != nil {
				return fs.ErrWhileReading
			}

			uuid = string(bt)
			return nil
		},
	})

	if err != nil {
		return -1
	}

	if uuid == "" {
		return -1
	}

	var size int64

	sql = fmt.Sprintf(`SELECT length(data) FROM %s WHERE uuid = '%s' LIMIT 1;`, fsys.blobTableName, uuid)

	err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
		ResultFunc: func(stmt *sqlite.Stmt) error {
			size = stmt.ColumnInt64(0)
			return nil
		},
	})

	if err != nil {
		return -1
	}

	return size
}

func (fsys *FS) Reader(p path.Relative) (io.ReadCloser, error) {

	if p.IsRoot() || path.IsDir(p) {

		if path.IsDir(p) {
			if !fsys.Exists(p) {
				return nil, fs.ErrNotFound
			}
		}

		sql := fmt.Sprintf(`SELECT name, is_dir FROM %s WHERE rel_dir = '%s';`, fsys.fileTableName, p.String())

		//	fmt.Println(sql)
		var bf bytes.Buffer

		err := sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
			ResultFunc: func(stmt *sqlite.Stmt) error {
				rd := stmt.ColumnReader(0)
				if rd == nil {
					return fs.ErrNotFound
				}

				bt, err := io.ReadAll(rd)

				if err != nil {
					return fs.ErrWhileReading
				}

				name := string(bt)

				isDirNo := stmt.ColumnInt(1)

				isDir := isDirNo > 0

				if name == "" {
					return nil
				}

				if isDir {
					bf.WriteString(name + "/\n")
				} else {
					bf.WriteString(name + "\n")
				}

				return nil
			},
		})

		if err != nil {
			return nil, err
		}

		return fs.ReadCloser(strings.NewReader(bf.String())), nil
	}

	sql := fmt.Sprintf(`SELECT uuid FROM %s WHERE is_dir = 0 AND rel_dir = '%s' AND name = '%s' LIMIT 1;`, fsys.fileTableName, p.Dir().String(), path.Name(p))
	//fmt.Println(sql)
	var uuid string
	err := sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
		ResultFunc: func(stmt *sqlite.Stmt) error {
			rd := stmt.ColumnReader(0)
			if rd == nil {
				return fs.ErrNotFound
			}

			bt, err := io.ReadAll(rd)

			if err != nil {
				return fs.ErrWhileReading
			}

			uuid = string(bt)
			return nil
		},
	})

	if err != nil {
		return nil, err
	}

	if uuid == "" {
		return nil, fs.ErrNotFound.Params(p.String())
	}

	sql = fmt.Sprintf(`SELECT rowid FROM %s WHERE uuid = '%s' LIMIT 1;`, fsys.blobTableName, uuid)
	//fmt.Println(sql)

	var rowid int64

	err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
		ResultFunc: func(stmt *sqlite.Stmt) error {
			rowid = stmt.ColumnInt64(0)

			return nil
		},
	})

	if err != nil {
		return nil, err
	}

	if rowid == 0 {
		return nil, fs.ErrNotFound
	}

	// blob, err := fsys.conn.OpenBlob("main", "files", "data", rowid, false)
	blob, err := fsys.conn.OpenBlob("", fsys.blobTableName, "data", rowid, false)

	if err != nil {
		return nil, err
	}

	return blob, nil
}