from textual.app import App, ComposeResult from textual.containers import Container, Horizontal, Vertical from textual.widgets import Header, Footer, DirectoryTree, DataTable, Log, Static from textual.binding import Binding from textual import on from pathlib import Path import os import subprocess import json import time from importlib.metadata import version, PackageNotFoundError HISTORY_FILE = ".publish_history.json" DEFAULT_CONFIG = { "remote_user": "user", "remote_host": "192.168.1.1", "remote_base_path": "/var/www/vtt/assets", "category_map": { "Scenes": "scenes", "Tokens": "tokens", "Maps": "maps", "Handouts": "handouts", "Misc": "misc" }, "allowed_extentions": [ ".webp", ".dungeondraft_map" ], "ignore_patterns": [ "_wip", "wip_", "_ref", "backup", "old_version", ".git", "__pycache__" ], "category_keywords": { "token": "Tokens", "creature": "Tokens", "npc": "Tokens", "character": "Tokens", "handout": "Handouts", "letter": "Handouts", "clue": "Handouts", "paper": "Handouts", ".dungeondraft_map": "Maps", "battlemap": "Maps", "map": "Maps", "grid": "Maps", "scene": "Scenes", "bg": "Scenes", "landing": "Scenes" } } config_path = Path.home() / ".vtt_publish.json" if config_path.exists(): with open(config_path, "r") as f: user_config = json.load(f) CONFIG = DEFAULT_CONFIG | user_config else: CONFIG = DEFAULT_CONFIG def create_mock_files(): root = Path("mock_campaign") root.mkdir(exist_ok=True) dirs = [ "artwork/scenes/4_belcorras_retreat/lasdas_lament", "artwork/scenes/4_belcorras_retreat/affinity", "artwork/tokens/creatures", "artwork/handouts/letters" ] for d in dirs: full_path = root / d full_path.mkdir(parents=True, exist_ok=True) # Create fake webp files if "lasdas_lament" in str(full_path): (full_path / "lasdas_lament.webp").touch() (full_path / "lasdas_lament_map_bg.webp").touch() if "creatures" in str(full_path): (full_path / "nhakazarin.webp").touch() class CampaignTree(DirectoryTree): """A DirectoryTree with Vim-like navigation.""" BINDINGS = [ # Map Vim keys to standard Tree actions Binding("j", "cursor_down", "Down", show=False), Binding("k", "cursor_up", "Up", show=False), ] class FileTable(DataTable): """A DataTable with custom file selection actions.""" ICON_CHECKED = "✔" ICON_UNCHECKED = "✘" BINDINGS = [ Binding("space", "toggle_select", "Select/Deselect"), Binding("a", "toggle_all", "Select All"), Binding("j", "cursor_down", "Down", show=False), Binding("k", "cursor_up", "Up", show=False), Binding("l", "toggle_synced", "Hide/Show Synced"), # Category Shortcuts Binding("s", "set_category('Scenes')", "Set: Scenes"), Binding("t", "set_category('Tokens')", "Set: Tokens"), Binding("m", "set_category('Maps')", "Set: Maps"), Binding("h", "set_category('Handouts')", "Set: Handouts"), ] def action_toggle_select(self): try: row_key, _ = self.coordinate_to_cell_key(self.cursor_coordinate) current_val = str(self.get_cell(row_key, "Select")) new_val = self.ICON_CHECKED if current_val == self.ICON_UNCHECKED else self.ICON_UNCHECKED self.update_cell(row_key, "Select", new_val) except Exception: pass def action_toggle_all(self): all_checked = True for row_key in self.rows: if self.get_cell(row_key, "Select") == self.ICON_UNCHECKED: all_checked = False break target_val = self.ICON_UNCHECKED if all_checked else self.ICON_CHECKED for row_key in self.rows: self.update_cell(row_key, "Select", target_val) def action_toggle_synced(self): """ Toggle visibiity of [OK] files. """ app = self.app app.show_synced_files = not app.show_synced_files state = "Showing" if app.show_synced_files else "Hiding" app.log_message(f"{state} synced files.") if hasattr(app, "current_scan_path") and app.current_scan_path: app.populate_table(app.current_scan_path) def action_set_category(self, category_name: str): try: row_key, _ = self.coordinate_to_cell_key(self.cursor_coordinate) self.update_cell(row_key, "Category", category_name) except: pass # --- THE TUI APPLICATION --- class PublisherApp(App): """The Terminal Publishing Tool""" CSS = """ Screen { layout: vertical; background: #05100c; } Header { dock: top; background: #1a3b2e; color: #a5d6a7; } Footer { dock: bottom; # background: #1a3b2e; # color: #4caf50; } #main_container { layout: horizontal; height: 1fr; } #left_pane { width: 30%; height: 100%; border-right: solid #1a3b2e; } #right_pane { width: 70%; height: 100%; } CampaignTree, FileTable { height: 1fr; background: #0f1f18; /* Dark Green */ color: #4caf50; /* Dim Green Text */ border: heavy #1a3b2e; /* Dim Border */ padding: 0 1; } CampaignTree:focus { background: #162d23; /* Slightly Lighter */ color: #ffffff; /* Text pops to white */ border: heavy #4caf50; /* Bright Neon Border */ } FileTable:focus { background: #162d23; /* Matches Tree Focus */ border: heavy #4caf50; /* Bright Neon Border */ } FileTable > .datatable--cursor { background: #2e7d32; /* Distinct Highlighting Green */ color: white; text-style: bold; } FileTable > .datatable--header { background: #1a3b2e; color: #81c784; text-style: bold; } #log_pane { height: 20%; border: solid #4caf50; background: #0a1410; color: #81c784; text-wrap: wrap; scrollbar-size: 0 0; } """ BINDINGS = [ ("tab", "toggle_focus", "Switch Pane"), ("p", "publish", "Publish Selected"), ("r", "refresh", "Refresh Scan"), ("q", "quit", "Quit"), ] def get_app_version(self): try: return version("vtt-publish") except PackageNotFoundError: pass try: # git describe --tags --always --dirty # Returns things like: "v1.0.2-4-g9a2b3c" or "v1.0.2" return subprocess.check_output( ["git", "describe", "--tags", "--always", "--dirty"], stderr=subprocess.DEVNULL ).decode("utf-8").strip() except (subprocess.CalledProcessError, FileNotFoundError): return "dev" def compose(self) -> ComposeResult: """Create child widgets for the app.""" yield Header(show_clock=True) # Main split view with Container(id="main_container"): # Left Pane: Directory Tree with Container(id="left_pane"): # Start at current directory or mock directory start_path = "./mock_campaign" if os.path.exists("./mock_campaign") else "." yield CampaignTree(start_path, id="tree_view") # Right Pane: Data Table with Container(id="right_pane"): yield FileTable(id="file_table") # Bottom Pane: Log log_widget = Log(id="log_pane", highlight=True) log_widget.can_focus = False yield log_widget yield Footer() def load_history(self): """Load publication history from JSON.""" if not os.path.exists(HISTORY_FILE): return {} try: with open(HISTORY_FILE, 'r') as f: return json.load(f) except (json.JSONDecodeError, IOError): return {} def save_history(self): """Save publication history to JSON.""" try: with open(HISTORY_FILE, 'w') as f: json.dump(self.history, f, indent=4) except IOError as e: self.log_message(f"Warning: Could not save history. {e}") def on_mount(self) -> None: """Called when app starts.""" version_str = self.get_app_version() self.title = f"FVTT PUBLISH {version_str}" self.show_synced_files = True self.history = self.load_history() table = self.query_one(FileTable) table.cursor_type = "row" table.add_column("Select", key="Select") table.add_column("Status", key="Status") table.add_column("Category", key="Category") table.add_column("Filename", key="Filename") table.add_column("Location", key="Location") def is_ignored(self, path: Path) -> bool: """Check if path contains any ignored patterns.""" path_str = str(path).lower() for pattern in CONFIG.get("ignore_patterns", []): if pattern.lower() in path_str: return True return False def guess_category(self, path: Path) -> str: """Guess category based on path keywords.""" path_str = str(path).lower() for keyword, category in CONFIG.get("category_keywords", {}).items(): if keyword in path_str: return category return "Misc" # --- ACTIONS & LOGIC --- def log_message(self, msg: str): """Write to the bottom log window.""" log = self.query_one(Log) log.write_line(f"> {msg}") def action_toggle_focus(self): """Switch focus between Tree and Table.""" if self.query_one("#tree_view").has_focus: self.query_one("#file_table").focus() else: self.query_one("#tree_view").focus() @on(DirectoryTree.DirectorySelected) def handle_directory_click(self, event: DirectoryTree.DirectorySelected): """When a directory is selected in the tree, scan it.""" path = event.path self.log_message(f"Scanning directory: {path}") self.populate_table(path) def populate_table(self, root_path): """Clears and refills the table with .webp files from the path.""" self.current_scan_path = root_path table = self.query_one(FileTable) table.clear() root = Path(root_path) allowed_exts = CONFIG.get("allowed_extentions", []) for path in root.rglob("*"): if not path.is_file(): continue if path.suffix.lower() not in allowed_exts: continue if self.is_ignored(path): continue cat = self.guess_category(path) full_path_str = str(path.resolve()) current_mtime = path.stat().st_mtime status = "[NEW]" if full_path_str in self.history: last_record = self.history[full_path_str] cat = last_record.get("category", cat) last_mtime = last_record.get("mtime", 0) if current_mtime > last_mtime: status = "[MOD]" else: status = "[OK]" if status == "[OK]" and not self.show_synced_files: continue try: display_location = str(path.parent.relative_to(root)) except ValueError: display_location = str(path.parent) initial_icon = table.ICON_UNCHECKED table.add_row( initial_icon, status, cat, path.name, display_location, key=str(path) ) def action_refresh(self): """ Rescan the current directory. """ if hasattr(self, "current_scan_path") and self.current_scan_path: self.log_message(f"Refreshing: {self.current_scan_path}...") self.populate_table(self.current_scan_path) else: self.log_message("Nothing to refresh. Select a directory first.") def action_publish(self): """Publish selected files using rsync.""" table = self.query_one(FileTable) selected_rows = [] for row_key in table.rows: if table.get_cell(row_key, "Select") == table.ICON_CHECKED: selected_rows.append(row_key) if not selected_rows: self.log_message("No files selected.") return self.log_message(f"Starting publish for {len(selected_rows)} files...") for row_key in selected_rows: full_path = Path(row_key.value) filename = full_path.name category_display = table.get_cell(row_key, "Category") remote_folder = CONFIG["category_map"].get(category_display, "misc") destination = ( f"{CONFIG['remote_user']}@{CONFIG['remote_host']}:" f"{CONFIG['remote_base_path']}/{remote_folder}/" ) cmd = ["rsync", "-avz", str(full_path), destination] try: result = subprocess.run( cmd, capture_output=True, text=True, check=True ) self.log_message(f"SUCCESS: {filename}") table.update_cell(row_key, "Status", "[DONE]") table.update_cell(row_key, "Select", table.ICON_UNCHECKED) full_path_str = str(full_path.resolve()) self.history[full_path_str] = { "mtime": full_path.stat().st_mtime, "category": category_display, "published_at": time.time() } except subprocess.CalledProcessError as e: self.log_message(f"FAIL: {filename}") self.log_message(f"Rsync Error: {e.stderr}") except Exception as e: self.log_message(f"Error: {e}") self.save_history() self.log_message("History updated.") def main(): app = PublisherApp() app.run() if __name__ == "__main__": main()