from textual.app import App, ComposeResult from textual.containers import Container, Horizontal, Vertical from textual.widgets import Header, Footer, DirectoryTree, DataTable, Log, Static from textual.binding import Binding from textual import on from pathlib import Path import os import subprocess import json import time HISTORY_FILE = ".publish_history.json" CONFIG = { "remote_user": "user", "remote_host": "192.168.1.1", "remote_base_path": "/var/www/vtt/assets", "category_map": { "Scenes": "scenes", "Tokens": "tokens", "Maps": "maps", "Handouts": "handouts" } } # --- MOCK DATA GENERATOR (Creates fake files for testing) --- def create_mock_files(): root = Path("mock_campaign") root.mkdir(exist_ok=True) # Create a structure matching your mockup dirs = [ "artwork/scenes/4_belcorras_retreat/lasdas_lament", "artwork/scenes/4_belcorras_retreat/affinity", "artwork/tokens/creatures", "artwork/handouts/letters" ] for d in dirs: full_path = root / d full_path.mkdir(parents=True, exist_ok=True) # Create fake webp files if "lasdas_lament" in str(full_path): (full_path / "lasdas_lament.webp").touch() (full_path / "lasdas_lament_map_bg.webp").touch() if "creatures" in str(full_path): (full_path / "nhakazarin.webp").touch() class CampaignTree(DirectoryTree): """A DirectoryTree with Vim-like navigation.""" BINDINGS = [ # Map Vim keys to standard Tree actions Binding("j", "cursor_down", "Down", show=False), Binding("k", "cursor_up", "Up", show=False), Binding("h", "cursor_left", "Collapse / Parent", show=True), Binding("l", "cursor_right", "Expand / Child", show=True), ] class FileTable(DataTable): """A DataTable with custom file selection actions.""" # Define our constants here for easy access ICON_CHECKED = "✔" ICON_UNCHECKED = "✘" BINDINGS = [ Binding("space", "toggle_select", "Select/Deselect"), Binding("a", "toggle_all", "Select All"), Binding("j", "cursor_down", "Down", show=False), Binding("k", "cursor_up", "Up", show=False), # Category Shortcuts Binding("s", "set_category('Scenes')", "Set: Scenes"), Binding("t", "set_category('Tokens')", "Set: Tokens"), Binding("m", "set_category('Maps')", "Set: Maps"), Binding("h", "set_category('Handouts')", "Set: Handouts"), # 'h' is safe here! ] def action_toggle_select(self): try: row_key, _ = self.coordinate_to_cell_key(self.cursor_coordinate) current_val = str(self.get_cell(row_key, "Select")) new_val = self.ICON_CHECKED if current_val == self.ICON_UNCHECKED else self.ICON_UNCHECKED self.update_cell(row_key, "Select", new_val) # We can bubble a log message event up if we want, or just print for now except Exception: pass def action_toggle_all(self): all_checked = True for row_key in self.rows: if self.get_cell(row_key, "Select") == self.ICON_UNCHECKED: all_checked = False break target_val = self.ICON_UNCHECKED if all_checked else self.ICON_CHECKED for row_key in self.rows: self.update_cell(row_key, "Select", target_val) def action_set_category(self, category_name: str): try: row_key, _ = self.coordinate_to_cell_key(self.cursor_coordinate) self.update_cell(row_key, "Category", category_name) except: pass # --- THE TUI APPLICATION --- class PublisherApp(App): """The Terminal Publishing Tool""" CSS = """ Screen { layout: vertical; background: #0f1f18; /* Deep dark green background */ } /* HEADER & FOOTER */ Header { dock: top; background: #1a3b2e; color: #4caf50; } Footer { dock: bottom; background: #1a3b2e; color: #4caf50; } /* MAIN CONTAINER */ #main_container { height: 70%; layout: horizontal; border-bottom: solid #4caf50; } /* LEFT PANE (Directory) */ #left_pane { width: 30%; height: 100%; border-right: solid #4caf50; background: #0f1f18; } /* STYLE CLASS FOR THE STATIC HEADER */ .pane-header { background: #4caf50; color: #0f1f18; /* Dark text on green background (Reverse look) */ text-style: bold; padding-left: 1; } DirectoryTree { background: #0f1f18; color: #8bc34a; /* Light green text */ border: hidden; /* We use the container for the border */ } DirectoryTree:focus { background: #162d23; } /* RIGHT PANE (Table) */ #right_pane { width: 70%; height: 100%; background: #0f1f18; } DataTable { background: #0f1f18; color: #a5d6a7; scrollbar-gutter: stable; } DataTable:focus { border: solid #4caf50; /* Highlight when active */ } /* Highlight the selected row in the table */ DataTable > .datatable--cursor { background: #2e7d32; color: white; } /* LOG PANE */ #log_pane { height: 30%; background: #0a1410; color: #81c784; padding: 1; } """ # KEY BINDINGS BINDINGS = [ ("tab", "toggle_focus", "Switch Pane"), ("p", "publish", "Publish Selected"), ("r", "refresh", "Refresh Scan"), ("q", "quit", "Quit"), ] def get_git_version(self): try: # git describe --tags --always --dirty # Returns things like: "v1.0.2-4-g9a2b3c" or "v1.0.2" return subprocess.check_output( ["git", "describe", "--tags", "--always", "--dirty"], stderr=subprocess.DEVNULL ).decode("utf-8").strip() except (subprocess.CalledProcessError, FileNotFoundError): return "dev" def compose(self) -> ComposeResult: """Create child widgets for the app.""" yield Header(show_clock=True) # Main split view with Container(id="main_container"): # Left Pane: Directory Tree with Container(id="left_pane"): yield Static(" Directory Browser", classes="pane-header") # Start at current directory or mock directory start_path = "./mock_campaign" if os.path.exists("./mock_campaign") else "." yield CampaignTree(start_path, id="tree_view") # Right Pane: Data Table with Container(id="right_pane"): yield FileTable(id="file_table") # Bottom Pane: Log log_widget = Log(id="log_pane", highlight=True) log_widget.can_focus = False yield log_widget yield Footer() def load_history(self): """Load publication history from JSON.""" if not os.path.exists(HISTORY_FILE): return {} try: with open(HISTORY_FILE, 'r') as f: return json.load(f) except (json.JSONDecodeError, IOError): return {} def save_history(self): """Save publication history to JSON.""" try: with open(HISTORY_FILE, 'w') as f: json.dump(self.history, f, indent=4) except IOError as e: self.log_message(f"Warning: Could not save history. {e}") def on_mount(self) -> None: """Called when app starts.""" version_str = self.get_git_version() self.title = f"FVTT PUBLISH {version_str}" self.history = self.load_history() table = self.query_one(FileTable) table.cursor_type = "row" # DEFINING EXPLICIT KEYS HERE IS THE MAGIC SAUCE table.add_column("Select", key="Select") table.add_column("Status", key="Status") table.add_column("Category", key="Category") table.add_column("Filename", key="Filename") table.add_column("Location", key="Location") self.log_message("System initialized.") if not os.path.exists("mock_campaign"): create_mock_files() try: self.query_one(CampaignTree).reload() except: pass # --- ACTIONS & LOGIC --- def log_message(self, msg: str): """Write to the bottom log window.""" log = self.query_one(Log) log.write_line(f"> {msg}") def action_toggle_focus(self): """Switch focus between Tree and Table.""" if self.query_one("#tree_view").has_focus: self.query_one("#file_table").focus() else: self.query_one("#tree_view").focus() @on(DirectoryTree.DirectorySelected) def handle_directory_click(self, event: DirectoryTree.DirectorySelected): """When a directory is selected in the tree, scan it.""" path = event.path self.log_message(f"Scanning directory: {path}") self.populate_table(path) def populate_table(self, root_path): """Clears and refills the table with .webp files from the path.""" self.current_scan_path = root_path table = self.query_one(FileTable) table.clear() root = Path(root_path) # Simple recursive scan mock for path in root.rglob("*.webp"): # Simple heuristic mock cat = "Scenes" if "creatures" in str(path): cat = "Tokens" if "handouts" in str(path): cat = "Handouts" full_path_str = str(path.resolve()) current_mtime = path.stat().st_mtime status = "[NEW]" should_check = False if full_path_str in self.history: last_record = self.history[full_path_str] last_mtime = last_record.get("mtime", 0) if current_mtime > last_mtime: status = "[MOD]" should_check = False else: status = "[OK]" try: display_location = str(path.parent.relative_to(root)) except ValueError: display_location = str(path.parent) initial_icon = table.ICON_UNCHECKED # Add row table.add_row( initial_icon, status, cat, path.name, display_location, key=str(path) ) def action_refresh(self): """ Rescan the current directory. """ if hasattr(self, "current_scan_path") and self.current_scan_path: self.log_message(f"Refreshing: {self.current_scan_path}...") self.populate_table(self.current_scan_path) else: self.log_message("Nothing to refresh. Select a directory first.") def action_publish(self): """Publish selected files using rsync.""" table = self.query_one(FileTable) selected_rows = [] # 1. Identify what to publish for row_key in table.rows: # Check the "Select" column (Column 0) if table.get_cell(row_key, "Select") == table.ICON_CHECKED: selected_rows.append(row_key) if not selected_rows: self.log_message("No files selected.") return self.log_message(f"Starting publish for {len(selected_rows)} files...") # 2. Process each file for row_key in selected_rows: # Recover the full path from the key we saved earlier full_path = Path(row_key.value) filename = full_path.name # Get the category specific to this file category_display = table.get_cell(row_key, "Category") # Map "Scenes" -> "scenes", etc. remote_folder = CONFIG["category_map"].get(category_display, "misc") destination = ( f"{CONFIG['remote_user']}@{CONFIG['remote_host']}:" f"{CONFIG['remote_base_path']}/{remote_folder}/" ) cmd = ["rsync", "-avz", str(full_path), destination] # 4. Execute try: self.log_message(f"Run: {' '.join(cmd)}") # Debug print # # Run actual command # result = subprocess.run( # cmd, # capture_output=True, # text=True, # check=True # ) # 5. Update UI on success self.log_message(f"SUCCESS: {filename}") table.update_cell(row_key, "Status", "[DONE]") table.update_cell(row_key, "Select", table.ICON_UNCHECKED) full_path_str = str(full_path.resolve()) self.history[full_path_str] = { "mtime": full_path.stat().st_mtime, "category": category_display, "published_at": time.time() } except subprocess.CalledProcessError as e: self.log_message(f"FAIL: {filename}") self.log_message(f"Rsync Error: {e.stderr}") except Exception as e: self.log_message(f"Error: {e}") self.save_history() self.log_message("History updated.") if __name__ == "__main__": app = PublisherApp() app.run()