diff options
| author | Brett Curran <brettjcurran@gmail.com> | 2026-02-12 14:10:59 +1100 |
|---|---|---|
| committer | Brett Curran <brettjcurran@gmail.com> | 2026-04-15 16:35:04 +1000 |
| commit | 521b5c2b678099c8c6f7845a3584bd1cbdd87a67 (patch) | |
| tree | 32dbfea41305008cac989bb08b2651d78921ce39 | |
| parent | 09d90f48afecc6337d1eb247bc3ecb70e59e3fee (diff) | |
add publish, refresh functionality
* pressing 'p' will publish selected files, and save those successfully published into a history file
* pressing 'r' will re-scan the current directory
| -rw-r--r-- | tui.py | 148 | ||||
| -rw-r--r-- | uv.lock | 2 |
2 files changed, 144 insertions, 6 deletions
@@ -6,6 +6,24 @@ from textual import on from pathlib import Path import os import subprocess +import json +import time + + +HISTORY_FILE = ".publish_history.json" + + +CONFIG = { + "remote_user": "user", + "remote_host": "192.168.1.1", + "remote_base_path": "/var/www/vtt/assets", + "category_map": { + "Scenes": "scenes", + "Tokens": "tokens", + "Maps": "maps", + "Handouts": "handouts" + } +} # --- MOCK DATA GENERATOR (Creates fake files for testing) --- @@ -222,11 +240,31 @@ class PublisherApp(App): yield Footer() + def load_history(self): + """Load publication history from JSON.""" + if not os.path.exists(HISTORY_FILE): + return {} + try: + with open(HISTORY_FILE, 'r') as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return {} + + def save_history(self): + """Save publication history to JSON.""" + try: + with open(HISTORY_FILE, 'w') as f: + json.dump(self.history, f, indent=4) + except IOError as e: + self.log_message(f"Warning: Could not save history. {e}") + def on_mount(self) -> None: """Called when app starts.""" version_str = self.get_git_version() self.title = f"FVTT PUBLISH {version_str}" + self.history = self.load_history() + table = self.query_one(FileTable) table.cursor_type = "row" @@ -268,6 +306,8 @@ class PublisherApp(App): def populate_table(self, root_path): """Clears and refills the table with .webp files from the path.""" + self.current_scan_path = root_path + table = self.query_one(FileTable) table.clear() @@ -282,20 +322,118 @@ class PublisherApp(App): if "handouts" in str(path): cat = "Handouts" + full_path_str = str(path.resolve()) + current_mtime = path.stat().st_mtime + + status = "[NEW]" + should_check = False + + if full_path_str in self.history: + last_record = self.history[full_path_str] + last_mtime = last_record.get("mtime", 0) + + if current_mtime > last_mtime: + status = "[MOD]" + should_check = False + else: + status = "[OK]" + try: display_location = str(path.parent.relative_to(root)) except ValueError: display_location = str(path.parent) + initial_icon = table.ICON_UNCHECKED + # Add row table.add_row( - table.ICON_UNCHECKED, # Select - "[NEW]", # Status - cat, # Category - path.name, # Filename - display_location # Location + initial_icon, + status, + cat, + path.name, + display_location, + key=str(path) ) + def action_refresh(self): + """ + Rescan the current directory. + """ + if hasattr(self, "current_scan_path") and self.current_scan_path: + self.log_message(f"Refreshing: {self.current_scan_path}...") + self.populate_table(self.current_scan_path) + else: + self.log_message("Nothing to refresh. Select a directory first.") + + def action_publish(self): + """Publish selected files using rsync.""" + table = self.query_one(FileTable) + selected_rows = [] + + # 1. Identify what to publish + for row_key in table.rows: + # Check the "Select" column (Column 0) + if table.get_cell(row_key, "Select") == table.ICON_CHECKED: + selected_rows.append(row_key) + + if not selected_rows: + self.log_message("No files selected.") + return + + self.log_message(f"Starting publish for {len(selected_rows)} files...") + + # 2. Process each file + for row_key in selected_rows: + # Recover the full path from the key we saved earlier + full_path = Path(row_key.value) + filename = full_path.name + + # Get the category specific to this file + category_display = table.get_cell(row_key, "Category") + + # Map "Scenes" -> "scenes", etc. + remote_folder = CONFIG["category_map"].get(category_display, "misc") + + destination = ( + f"{CONFIG['remote_user']}@{CONFIG['remote_host']}:" + f"{CONFIG['remote_base_path']}/{remote_folder}/" + ) + + cmd = ["rsync", "-avz", str(full_path), destination] + + # 4. Execute + try: + self.log_message(f"Run: {' '.join(cmd)}") # Debug print + + # # Run actual command + # result = subprocess.run( + # cmd, + # capture_output=True, + # text=True, + # check=True + # ) + + # 5. Update UI on success + self.log_message(f"SUCCESS: {filename}") + table.update_cell(row_key, "Status", "[DONE]") + table.update_cell(row_key, "Select", table.ICON_UNCHECKED) + + full_path_str = str(full_path.resolve()) + self.history[full_path_str] = { + "mtime": full_path.stat().st_mtime, + "category": category_display, + "published_at": time.time() + } + + except subprocess.CalledProcessError as e: + self.log_message(f"FAIL: {filename}") + self.log_message(f"Rsync Error: {e.stderr}") + except Exception as e: + self.log_message(f"Error: {e}") + + self.save_history() + self.log_message("History updated.") + if __name__ == "__main__": app = PublisherApp() @@ -120,7 +120,7 @@ wheels = [ [[package]] name = "vtt-publish" -version = "0.1.0" +version = "0.1.1" source = { virtual = "." } dependencies = [ { name = "textual" }, |
