Add batch snapshot creation

This commit is contained in:
Heliguy
2024-09-22 22:20:23 -04:00
parent 9b6405a73c
commit f05a0170d2
6 changed files with 104 additions and 11 deletions

View File

@@ -229,6 +229,7 @@ class HostInfo:
home = home
clipboard = Gdk.Display.get_default().get_clipboard()
main_window = None
snapshots_path = f"{home}/.var/app/io.github.flattool.Warehouse/data/Snapshots/"
# Get all possible installation icon theme dirs
output = subprocess.run(

View File

@@ -86,6 +86,7 @@ warehouse_sources = [
'remotes_page/remotes_page.py',
'remotes_page/remote_row.py',
'remotes_page/add_remote_dialog.py',
'snapshot_page/tar_worker.py',
'snapshot_page/snapshot_page.py',
'snapshot_page/snapshots_list_page.py',
'snapshot_page/snapshot_box.py',

View File

@@ -3,7 +3,8 @@ from .host_info import HostInfo
from .error_toast import ErrorToast
from .loading_status import LoadingStatus
from .app_row import AppRow
import subprocess, os
from .tar_worker import TarWorker
import subprocess, os, time
@Gtk.Template(resource_path="/io/github/flattool/Warehouse/snapshot_page/new_snapshot_dialog.ui")
class NewSnapshotDialog(Adw.Dialog):
@@ -29,10 +30,10 @@ class NewSnapshotDialog(Adw.Dialog):
else:
self.selected_rows.remove(row)
self.valid_checker()
total = len(self.selected_rows)
self.total_selected_label.set_label(_("{} Selected").format(total))
self.total_selected_label.set_visible(total > 0)
self.valid_checker()
def generate_list(self, *args):
for package in HostInfo.flatpaks:
@@ -63,6 +64,33 @@ class NewSnapshotDialog(Adw.Dialog):
valid = len(self.selected_rows) > 0 and len(self.name_entry.get_text().strip()) > 0
self.create_button.set_sensitive(valid)
def get_total_fraction(self):
total = 0
stopped_workers_amount = 0
for worker in self.workers:
total += worker.fraction
if worker.stop:
stopped_workers_amount += 1
if stopped_workers_amount == len(self.workers):
return False
print(f"{total / len(self.workers):.2f}")
return True
def on_create(self, button):
self.workers.clear()
for row in self.selected_rows:
package = row.package
worker = TarWorker(
existing_path=package.data_path,
new_path=f"{HostInfo.snapshots_path}{package.info['id']}",
file_name=f"{int(time.time())}_{package.info["version"]}",
name=self.name_entry.get_text(),
)
self.workers.append(worker)
worker.compress()
def on_invalidate(self, search_entry):
self.listbox.invalidate_filter()
@@ -88,9 +116,11 @@ class NewSnapshotDialog(Adw.Dialog):
# Extra Object Creations
self.rows = []
self.selected_rows = []
self.workers = []
# Connections
self.connect("closed", self.on_close)
self.create_button.connect("clicked", self.on_create)
self.search_entry.connect("search-changed", self.on_invalidate)
self.list_cancel_button.connect("clicked", lambda *_: self.close())
self.name_entry.connect("changed", lambda *_: self.valid_checker())

View File

@@ -55,28 +55,26 @@ class SnapshotPage(Adw.BreakpointBin):
# This must be set to the created object from within the class's __init__ method
instance = None
page_name = "snapshots"
snapshots_path = f"{HostInfo.home}/.var/app/io.github.flattool.Warehouse/data/Snapshots/"
def sort_snapshots(self, *args):
self.active_snapshot_paks.clear()
self.leftover_snapshots.clear()
bad_folders = []
if not os.path.exists(self.snapshots_path):
if not os.path.exists(HostInfo.snapshots_path):
try:
os.makedirs(self.snapshots_path)
os.makedirs(HostInfo.snapshots_path)
except Exception as e:
self.toast_overlay.add_toast(ErrorToast(_("Could not load Snapshots"), str(e)).toast)
return
for folder in os.listdir(self.snapshots_path):
for folder in os.listdir(HostInfo.snapshots_path):
if folder.count('.') < 2 or ' ' in folder:
bad_folders.append(folder)
continue
has_tar = False
for file in os.listdir(f"{self.snapshots_path}{folder}"):
for file in os.listdir(f"{HostInfo.snapshots_path}{folder}"):
if file.endswith(".tar.zst"):
has_tar = True
break
@@ -93,7 +91,7 @@ class SnapshotPage(Adw.BreakpointBin):
for folder in bad_folders:
try:
subprocess.run(['gio', 'trash', f'{self.snapshots_path}{folder}'])
subprocess.run(['gio', 'trash', f'{HostInfo.snapshots_path}{folder}'])
except Exception:
pass
@@ -164,7 +162,7 @@ class SnapshotPage(Adw.BreakpointBin):
def open_snapshots_folder(self, button, overlay):
try:
Gio.AppInfo.launch_default_for_uri(f"file://{self.snapshots_path}", None)
Gio.AppInfo.launch_default_for_uri(f"file://{HostInfo.snapshots_path}", None)
overlay.add_toast(Adw.Toast.new(_("Opened snapshots folder")))
except Exception as e:
overlay.add_toast(ErrorToast(_("Could not open folder"), str(e)).toast)

View File

@@ -62,7 +62,7 @@ class SnapshotsListPage(Adw.NavigationPage):
# Extra Object Creation
self.parent_page = parent_page
self.snapshots_path = parent_page.snapshots_path
self.snapshots_path = HostInfo.snapshots_path
self.current_folder = None
self.current_package = None
self.snapshots_rows = []

View File

@@ -0,0 +1,63 @@
from gi.repository import Adw, Gtk, GLib, Gio
from .host_info import HostInfo
from .error_toast import ErrorToast
import os, tarfile, subprocess, json
class TarWorker:
def __init__(self, existing_path, new_path, file_name, name=""):
self.existing_path = existing_path
self.new_path = new_path
self.file_name = file_name
self.name = name
self.should_check = False
self.stop = False
self.fraction = 0.0
self.total = 0
def compress_thread(self, *args):
try:
if not os.path.exists(self.new_path):
os.makedirs(self.new_path)
self.total = int(subprocess.run(['du', '-s', self.existing_path], check=True, text=True, capture_output=True).stdout.split('\t')[0])
self.total /= 1.5 # estimate for space savings
subprocess.run(['tar', 'cafv', f'{self.new_path}/{self.file_name}.tar.zst', '-C', self.existing_path, '.'], check=True, capture_output=True)
with open(f"{self.new_path}/{self.file_name}.json", 'w') as file:
data = {
'snapshot_version': 1,
'name': self.name,
}
json.dump(data, file, indent=4)
self.stop = True # tell the check timeout to stop, because we know the file is done being made
except subprocess.CalledProcessError as cpe:
self.stop = True
print(cpe.stderr)
except Exception as e:
self.stop = True
print(f"Error during compression: {e}")
def check_size(self):
try:
output = subprocess.run(['du', '-s', f"{self.new_path}/{self.file_name}.tar.zst"], check=True, text=True, capture_output=True).stdout.split('\t')[0]
working_total = int(output)
self.fraction = working_total / self.total
if self.stop:
print("fraction: 1.00")
return False # stop the timeout
else:
print(f"fraction: {self.fraction:.2f}")
return True # continue the timeout
except subprocess.CalledProcessError as cpe:
return not self.stop # continue the timeout or stop the timeout
def compress(self):
self.compress = True
self.stop = False
Gio.Task.new(None, None, None).run_in_thread(self.compress_thread)
GLib.timeout_add(10, self.check_size)