Select Git revision
-
Marc René Arns authored
- add tempfs - add support for closing a fs after usage - close fs after testing for readonly, ext_deleteable and ext_writeable tests - import new spectest version with support for hooks
Marc René Arns authored- add tempfs - add support for closing a fs after usage - close fs after testing for readonly, ext_deleteable and ext_writeable tests - import new spectest version with support for hooks
dbfs.go 16.61 KiB
package dbfs
import (
"bytes"
"fmt"
"io"
iofs "io/fs"
"os"
"strings"
"time"
"github.com/google/uuid"
"gitlab.com/golang-utils/fs"
"gitlab.com/golang-utils/fs/path"
"zombiezen.com/go/sqlite"
"zombiezen.com/go/sqlite/sqlitex"
)
type FS struct {
conn *sqlite.Conn
local path.Local
remote *path.Remote
fileTableName string
blobTableName string
sqliteFlags []sqlite.OpenFlags
shouldCreateTables bool
}
func (f *FS) Close() error {
// fmt.Println("dbfs closed called")
if f.conn == nil {
return nil
}
return f.conn.Close()
}
// Option is an option for the dbfs
type Option func(*FS)
// OptCreateTables sets the option for creating/initializing the tables (always the default for in memory databases)
func OptCreateTables() Option {
return func(fsys *FS) {
fsys.shouldCreateTables = true
}
}
// OptSqliteFlags sets the flags of the underlying sqlite db
func OptSqliteFlags(flags ...sqlite.OpenFlags) Option {
return func(fsys *FS) {
fsys.sqliteFlags = flags
}
}
// OptFileTable sets the name of the file table
func OptFileTable(name string) Option {
return func(fsys *FS) {
fsys.fileTableName = name
}
}
// OptBlobTable sets the name of the blob table
func OptBlobTable(name string) Option {
return func(fsys *FS) {
fsys.blobTableName = name
}
}
var _ fs.FS = &FS{}
/*
PRAGMA journal_mode=DELETE;
PRAGMA synchronous=FULL;
PRAGMA foreign_keys=1;
PRAGMA busy_timeout=5000;
*/
/*
TODO: use the connection pool for concurrency, from the manual:
https://pkg.go.dev/zombiezen.com/go/sqlite#section-readme
An SQLite connection is represented by a *Conn. Connections cannot be used concurrently. A typical Go program will create a pool of connections (e.g. by using zombiezen.com/go/sqlite/sqlitex.NewPool to create a *zombiezen.com/go/sqlite/sqlitex.Pool) so goroutines can borrow a connection while they need to talk to the database.
This package assumes SQLite will be used concurrently by the process through several connections, so the build options for SQLite enable multi-threading and the shared cache.
The implementation automatically handles shared cache locking, see the documentation on Stmt.Step for details.
*/
// New creates a new filesystem, based on a sqlite database that resides in a local file
// file is the location of the database file.
func New(file path.Local, opts ...Option) (*FS, error) {
if path.IsDir(file) {
return nil, fs.ErrExpectedFile.Params(file.String())
}
return newFS(file, file.ToSystem(), opts...)
}
func NewInMemory(abs path.Absolute, opts ...Option) (*FS, error) {
if !path.IsDir(abs) {
return nil, fs.ErrExpectedDir.Params(abs)
}
opts = append(opts, OptCreateTables())
return newFS(abs, ":memory:", opts...)
}
func (fsys *FS) setBase(abs path.Absolute) {
if rem, is := abs.(*path.Remote); is {
fsys.remote = rem
} else {
fsys.local = abs.(path.Local)
}
}
func newFS(abs path.Absolute, connectstr string, opts ...Option) (*FS, error) {
fsys := &FS{}
fsys.fileTableName = "file"
fsys.blobTableName = "file_data"
for _, opt := range opts {
opt(fsys)
}
fsys.setBase(abs)
if connectstr != ":memory:" {
if fsys.remote != nil {
return nil, fmt.Errorf("the given file is not a local file")
}
if !fsys.shouldCreateTables {
info, err := os.Stat(fsys.local.ToSystem())
if err != nil && os.IsNotExist(err) {
return nil, err
}
if err != nil {
return nil, err
}
if info.IsDir() {
return nil, fs.ErrExpectedDir.Params(fsys.local.ToSystem())
}
}
}
conn, err := sqlite.OpenConn(connectstr, fsys.sqliteFlags...)
if err != nil {
return nil, err
}
fsys.conn = conn
if fsys.shouldCreateTables {
err = fsys.createTables()
if err != nil {
return nil, err
}
}
return fsys, nil
}
func (fsys *FS) createTables() error {
sql := fmt.Sprintf(`CREATE TABLE %s ( uuid TEXT PRIMARY KEY NOT NULL, modified INTEGER NOT NULL, name TEXT NOT NULL, rel_dir TEXT NOT NULL, is_dir INTEGER NOT NULL);`, fsys.fileTableName)
err := sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
return nil
},
})
if err != nil {
return err
}
sql = fmt.Sprintf(`CREATE UNIQUE INDEX file_name ON %s(rel_dir, name);`, fsys.fileTableName)
err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
return nil
},
})
if err != nil {
return err
}
sql = fmt.Sprintf(`CREATE TABLE %s ( uuid TEXT PRIMARY KEY NOT NULL, data BLOB NOT NULL CONSTRAINT uuid REFERENCES %s(uuid) ON DELETE CASCADE);`, fsys.blobTableName, fsys.fileTableName)
err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
return nil
},
})
if err != nil {
return err
}
return nil
}
func (fsys *FS) Delete(p path.Relative, recursive bool) (err error) {
if p.IsRoot() {
return fs.ErrNotSupported.Params(fsys, "Deleting root dir")
}
if !fsys.Exists(p) {
return nil
}
if !path.IsDir(p) && !p.IsRoot() {
sql := fmt.Sprintf(`DELETE FROM %s WHERE rel_dir = '%s' AND name = '%s' AND is_dir = 0 ;`, fsys.fileTableName, p.Dir().String(), path.Name(p))
return sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
return nil
},
})
}
if recursive {
//sql := fmt.Sprintf(`DELETE FROM files WHERE (rel_dir = like '%s' + "%%") OR ( (rel_dir = '%s') AND (name = '%s') AND (is_dir = 1) ) ;`,
sql := fmt.Sprintf(`DELETE FROM %s WHERE (rel_dir LIKE '%s%%') OR ( (rel_dir = '%s') AND (name = '%s') AND (is_dir = 1) ) ;`,
fsys.fileTableName,
p.String(), p.Dir().String(), path.Name(p))
// fmt.Println(sql)
return sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
return nil
},
})
}
sql := fmt.Sprintf(`SELECT count(uuid) FROM %s WHERE (rel_dir LIKE '%s%%') LIMIT 1 ;`, fsys.fileTableName, p.String())
var no int
err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
no = stmt.ColumnInt(0)
return nil
},
})
if err != nil {
return err
}
if no > 0 {
return fs.ErrNotEmpty.Params(p.String())
}
sql = fmt.Sprintf(`DELETE FROM %s WHERE (rel_dir = '%s' AND name = '%s' AND is_dir = 1) ;`, fsys.fileTableName,
p.Dir().String(), path.Name(p))
return sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
return nil
},
})
}
func (fsys *FS) Write(p path.Relative, rd io.ReadCloser, inbetween bool) (err error) {
defer func() {
if rd != nil {
rd.Close()
}
}()
// we don't need to write directories, since we don't have them and they are always "exist" (just a prefix)
if p.IsRoot() {
return nil
}
id := uuid.New().String()
modified := time.Now().Unix()
name := path.Name(p)
rel_dir := p.Dir().String()
var sql string
if path.IsDir(p) {
if fsys.Exists(p) {
return nil
}
if !inbetween && (!p.Dir().IsRoot() && !fsys.Exists(p.Dir())) {
return fs.ErrNotFound.Params(p.Dir().String())
}
parents := parentDirs(p)
/*
TODO: make a transaction
*/
/*
TODO optimize this into one statement
*/
for _, parent := range parents {
err = fsys.Write(path.Relative(parent), nil, false)
if err != nil {
return err
}
}
sql = fmt.Sprintf(`INSERT into %s (uuid,modified,name,rel_dir,is_dir) VALUES('%s',%v,'%s','%s',1) ;`, fsys.fileTableName,
id, modified, name, rel_dir,
)
err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
return nil
},
})
if err != nil {
// TODO: rollback
return err
}
return nil
}
if !inbetween && (!p.Dir().IsRoot() && !fsys.Exists(p.Dir())) {
return fs.ErrNotFound.Params(p.Dir().String())
}
parents := parentDirs(p)
/*
TODO: make a transaction
*/
/*
TODO optimize this into one statement
*/
for _, parent := range parents {
err = fsys.Write(path.Relative(parent), nil, false)
if err != nil {
return err
}
}
if rd == nil {
return fs.ErrWhileReading
}
if fsys.Exists(p) {
err = fsys.Delete(p, false)
if err != nil {
// TODO: rollback
return err
}
}
sql = fmt.Sprintf(`INSERT into %s (uuid,modified,name,rel_dir,is_dir) VALUES('%s',%v,'%s','%s',0) ;`, fsys.fileTableName,
id, modified, name, rel_dir,
)
//fmt.Println(sql)
err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
// stmt.BindZeroBlob()
return nil
},
})
if err != nil {
// TODO: rollback
return err
}
var size int64 = -1
var bt []byte
fl, isFile := rd.(*os.File)
if isFile {
finf, err := fl.Stat()
if err != nil {
//fmt.Printf("can't write to blob: %s\n", err.Error())
// TODO: rollback
return fs.ErrWhileReading
}
size = finf.Size()
//fl.Close()
}
fl1, isFile1 := rd.(iofs.File)
if isFile1 {
finf, err := fl1.Stat()
if err != nil {
//fmt.Printf("can't write to blob: %s\n", err.Error())
// TODO: rollback
return fs.ErrWhileReading
}
size = finf.Size()
}
if size == -1 {
bt, err = io.ReadAll(rd)
if err != nil {
//fmt.Printf("can't write to blob: %s\n", err.Error())
// TODO: rollback
return fs.ErrWhileReading
}
size = int64(len(bt))
}
sql = fmt.Sprintf(`INSERT into %s (uuid,data) VALUES('%s',zeroblob(%v)) ;`, fsys.blobTableName,
id, size,
)
//fmt.Println(sql)
err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
// stmt.BindZeroBlob()
return nil
},
})
if err != nil {
// TODO: rollback
return err
}
sql = fmt.Sprintf(`SELECT rowid FROM %s WHERE uuid = '%s' LIMIT 1;`, fsys.blobTableName, id)
//fmt.Println(sql)
var rowid int64
err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
rowid = stmt.ColumnInt64(0)
return nil
},
})
if err != nil {
// TODO: rollback
return err
}
if rowid == 0 {
return fs.ErrNotFound
}
blob, err := fsys.conn.OpenBlob("", fsys.blobTableName, "data", rowid, true)
if err != nil {
// TODO: rollback
//fmt.Printf("can't open blob: %s\n", err.Error())
return err
}
//fmt.Printf("size of blob: %v\n", blob.Size())
//io.Copy()
//blob.Write()
switch {
case isFile:
//fmt.Printf("isFile: %q\n", fl.Name())
_, err = io.Copy(blob, fl)
case isFile1:
//st, _ := fl1.Stat()
//fmt.Printf("isFile1: %#v\n", st.Name())
_, err = io.Copy(blob, fl1)
default:
_, err = blob.Write(bt)
}
//_, err = io.Copy(blob, bytes.NewReader(bt))
defer blob.Close()
if err != nil {
// fmt.Printf("can't write to blob: %s\n", err.Error())
// TODO: rollback
return err
}
return nil
}
func (fsys *FS) ModTime(p path.Relative) (t time.Time, err error) {
if path.IsDir(p) {
return t, fs.ErrNotSupported
}
sql := fmt.Sprintf(`SELECT modified FROM %s WHERE (is_dir = 0) AND (rel_dir = '%s') AND (name = '%s');`, fsys.fileTableName, p.Dir().String(), path.Name(p))
// fmt.Println(sql)
var unix int64
err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
unix = stmt.ColumnInt64(0)
return nil
},
})
// fmt.Printf("error: %v\n", err)
if err != nil {
return t, err
}
if unix == 0 {
return t, fs.ErrNotFound.Params(p)
}
return time.Unix(unix, 0), nil
}
func (m *FS) Abs(r path.Relative) path.Absolute {
if r.IsRoot() {
if m.remote != nil {
return m.remote
}
return m.local
}
if m.remote != nil {
return m.remote.Join(r.String())
}
return m.local.Join(r.String())
}
func (fsys *FS) Exists(p path.Relative) bool {
if p.IsRoot() {
return true
}
if path.IsDir(p) {
sql := fmt.Sprintf(`SELECT Count(uuid) FROM %s WHERE rel_dir = '%s' AND name = '%s' AND is_dir = 1 LIMIT 1;`, fsys.fileTableName, p.Dir().String(), path.Name(p))
// fmt.Println(sql)
var no int
err := sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
no = stmt.ColumnInt(0)
return nil
},
})
if err != nil {
// fmt.Printf("error: %s\n", err.Error())
return false
}
// fmt.Printf("no: %v\n", no)
return no >= 1
}
sql := fmt.Sprintf(`SELECT Count(uuid) FROM %s WHERE rel_dir = '%s' AND name = '%s' AND is_dir = 0 LIMIT 1;`, fsys.fileTableName, p.Dir().String(), path.Name(p))
// fmt.Println(sql)
var no int
err := sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
no = stmt.ColumnInt(0)
return nil
},
})
if err != nil {
return false
}
return no >= 1
}
func (fsys *FS) Size(file path.Relative) int64 {
if file.IsRoot() || path.IsDir(file) {
return -2
}
if !fsys.Exists(file) {
return -1
}
//sql := fmt.Sprintf(`SELECT length(data) FROM files WHERE rel_dir = '%s' AND name = '%s' LIMIT 1;`, file.Dir().String(), path.Name(file))
var uuid string
sql := fmt.Sprintf(`SELECT uuid FROM %s WHERE rel_dir = '%s' AND name = '%s' LIMIT 1;`, fsys.fileTableName, file.Dir().String(), path.Name(file))
err := sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
rd := stmt.ColumnReader(0)
if rd == nil {
return fs.ErrNotFound
}
bt, err := io.ReadAll(rd)
if err != nil {
return fs.ErrWhileReading
}
uuid = string(bt)
return nil
},
})
if err != nil {
return -1
}
if uuid == "" {
return -1
}
var size int64
sql = fmt.Sprintf(`SELECT length(data) FROM %s WHERE uuid = '%s' LIMIT 1;`, fsys.blobTableName, uuid)
err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
size = stmt.ColumnInt64(0)
return nil
},
})
if err != nil {
return -1
}
return size
}
func (fsys *FS) Reader(p path.Relative) (io.ReadCloser, error) {
if p.IsRoot() || path.IsDir(p) {
if path.IsDir(p) {
if !fsys.Exists(p) {
return nil, fs.ErrNotFound
}
}
sql := fmt.Sprintf(`SELECT name, is_dir FROM %s WHERE rel_dir = '%s';`, fsys.fileTableName, p.String())
// fmt.Println(sql)
var bf bytes.Buffer
err := sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
rd := stmt.ColumnReader(0)
if rd == nil {
return fs.ErrNotFound
}
bt, err := io.ReadAll(rd)
if err != nil {
return fs.ErrWhileReading
}
name := string(bt)
isDirNo := stmt.ColumnInt(1)
isDir := isDirNo > 0
if name == "" {
return nil
}
if isDir {
bf.WriteString(name + "/\n")
} else {
bf.WriteString(name + "\n")
}
return nil
},
})
if err != nil {
return nil, err
}
return fs.ReadCloser(strings.NewReader(bf.String())), nil
}
sql := fmt.Sprintf(`SELECT uuid FROM %s WHERE is_dir = 0 AND rel_dir = '%s' AND name = '%s' LIMIT 1;`, fsys.fileTableName, p.Dir().String(), path.Name(p))
//fmt.Println(sql)
var uuid string
err := sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
rd := stmt.ColumnReader(0)
if rd == nil {
return fs.ErrNotFound
}
bt, err := io.ReadAll(rd)
if err != nil {
return fs.ErrWhileReading
}
uuid = string(bt)
return nil
},
})
if err != nil {
return nil, err
}
if uuid == "" {
return nil, fs.ErrNotFound.Params(p.String())
}
sql = fmt.Sprintf(`SELECT rowid FROM %s WHERE uuid = '%s' LIMIT 1;`, fsys.blobTableName, uuid)
//fmt.Println(sql)
var rowid int64
err = sqlitex.ExecuteTransient(fsys.conn, sql, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
rowid = stmt.ColumnInt64(0)
return nil
},
})
if err != nil {
return nil, err
}
if rowid == 0 {
return nil, fs.ErrNotFound
}
// blob, err := fsys.conn.OpenBlob("main", "files", "data", rowid, false)
blob, err := fsys.conn.OpenBlob("", fsys.blobTableName, "data", rowid, false)
if err != nil {
return nil, err
}
return blob, nil
}