265 lines
11 KiB
Python
Executable File
265 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import json
|
|
import sys
|
|
import time
|
|
import signal
|
|
import socket
|
|
import threading
|
|
import subprocess
|
|
|
|
class NEDMWorkspaceTracker:
|
|
def __init__(self):
|
|
self.current_workspace = 1 # Default workspace (1-based)
|
|
self.current_window_title = "" # Current window title
|
|
self.socket_path = None
|
|
self.connected = False
|
|
|
|
def find_socket(self):
|
|
"""Find NEDM IPC socket"""
|
|
# Check environment variable first
|
|
socket_path = os.getenv('NEDM_SOCKET') or os.getenv('CAGEBREAK_SOCKET')
|
|
if socket_path and os.path.exists(socket_path):
|
|
return socket_path
|
|
|
|
# Search in XDG_RUNTIME_DIR or /tmp for cagebreak-ipc socket
|
|
runtime_dir = os.getenv('XDG_RUNTIME_DIR', '/tmp')
|
|
uid = os.getuid()
|
|
|
|
# Look for socket files matching pattern
|
|
try:
|
|
for file in os.listdir(runtime_dir):
|
|
if file.startswith(f'cagebreak-ipc.{uid}.') and file.endswith('.sock'):
|
|
socket_path = os.path.join(runtime_dir, file)
|
|
if os.path.exists(socket_path):
|
|
return socket_path
|
|
except OSError:
|
|
pass
|
|
|
|
return None
|
|
|
|
def connect_to_nedm(self):
|
|
"""Connect to NEDM IPC socket"""
|
|
self.socket_path = self.find_socket()
|
|
if not self.socket_path:
|
|
self.connected = False
|
|
return False
|
|
|
|
try:
|
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
self.sock.connect(self.socket_path)
|
|
self.connected = True
|
|
# Query current state after connecting
|
|
self.query_current_state()
|
|
return True
|
|
except (socket.error, OSError):
|
|
self.connected = False
|
|
return False
|
|
|
|
def query_current_state(self):
|
|
"""Query NEDM for current workspace and window state"""
|
|
if not self.connected:
|
|
return
|
|
|
|
try:
|
|
# Send dump command to get current state
|
|
self.sock.send(b'dump\n')
|
|
# Give it a moment to respond
|
|
self.sock.settimeout(1.0)
|
|
response = self.sock.recv(8192) # Increased buffer for dump response
|
|
self.sock.settimeout(None)
|
|
|
|
if response:
|
|
response_str = response.decode('utf-8', errors='ignore')
|
|
# Parse the response to extract current workspace and window info
|
|
try:
|
|
# Response format should be: "cg-ipc"<JSON>NULL
|
|
if response_str.startswith('cg-ipc'):
|
|
json_part = response_str[6:].split('\0')[0]
|
|
if json_part:
|
|
dump_data = json.loads(json_part)
|
|
self.parse_dump_data(dump_data)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
except (socket.error, OSError, socket.timeout):
|
|
pass
|
|
|
|
def parse_dump_data(self, dump_data):
|
|
"""Parse dump data to extract current workspace and focused window"""
|
|
try:
|
|
# Get current output name and current workspace from dump data
|
|
curr_output = dump_data.get('curr_output', '')
|
|
views_curr_id = dump_data.get('views_curr_id')
|
|
tiles_curr_id = dump_data.get('tiles_curr_id')
|
|
|
|
# Find the output and get workspace info
|
|
outputs = dump_data.get('outputs', {})
|
|
if curr_output in outputs:
|
|
output_data = outputs[curr_output]
|
|
self.current_workspace = output_data.get('curr_workspace', 0) # Already 1-based in dump output
|
|
|
|
# Find the focused view title by matching view_id and tile_id
|
|
workspaces = output_data.get('workspaces', [])
|
|
curr_ws_idx = output_data.get('curr_workspace', 0) - 1 # Convert to 0-based for array access
|
|
if workspaces and len(workspaces) > curr_ws_idx and curr_ws_idx >= 0:
|
|
workspace = workspaces[curr_ws_idx]
|
|
|
|
# Find the view in the focused tile
|
|
tiles = workspace.get('tiles', [])
|
|
for tile in tiles:
|
|
if tile.get('id') == tiles_curr_id:
|
|
view_id = tile.get('view_id') # NEDM uses 'view_id' not 'view'
|
|
# Now find the view with this ID
|
|
views = workspace.get('views', [])
|
|
for view in views:
|
|
if view.get('id') == view_id:
|
|
# Get window title if available (requires NEDM -b flag for security)
|
|
title = view.get('title', '')
|
|
if title:
|
|
self.current_window_title = title
|
|
else:
|
|
# Try to get process name from PID as fallback
|
|
pid = view.get('pid')
|
|
if pid:
|
|
try:
|
|
import subprocess
|
|
proc_name = subprocess.check_output(['ps', '-p', str(pid), '-o', 'comm='],
|
|
stderr=subprocess.DEVNULL).decode().strip()
|
|
if proc_name:
|
|
self.current_window_title = proc_name
|
|
else:
|
|
self.current_window_title = f"View {view_id}"
|
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
self.current_window_title = f"View {view_id}"
|
|
else:
|
|
self.current_window_title = f"View {view_id}"
|
|
break
|
|
break
|
|
|
|
except (KeyError, TypeError):
|
|
pass
|
|
|
|
def listen_for_events(self):
|
|
"""Listen for workspace events from NEDM"""
|
|
if not self.connected:
|
|
return
|
|
|
|
try:
|
|
while self.connected:
|
|
data = self.sock.recv(4096)
|
|
if not data:
|
|
break
|
|
|
|
# Parse NEDM IPC protocol: "cg-ipc"<JSON>NULL
|
|
# Handle multiple messages concatenated together
|
|
data_str = data.decode('utf-8', errors='ignore')
|
|
|
|
# Split by null terminator to handle multiple messages
|
|
messages = data_str.split('\0')
|
|
|
|
for message in messages:
|
|
if message.startswith('cg-ipc'):
|
|
json_part = message[6:] # Remove "cg-ipc" prefix
|
|
if json_part: # Only process non-empty JSON parts
|
|
try:
|
|
event = json.loads(json_part)
|
|
self.handle_event(event)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
except (socket.error, OSError):
|
|
self.connected = False
|
|
|
|
def handle_event(self, event):
|
|
"""Handle workspace-related events"""
|
|
event_name = event.get('event_name')
|
|
|
|
if event_name == 'switch_ws':
|
|
self.current_workspace = event.get('new_workspace', 1) # Already 1-based
|
|
# Clear window title when switching workspaces since we don't know the focused window
|
|
self.current_window_title = ""
|
|
elif event_name == 'focus_tile':
|
|
workspace = event.get('new_workspace') # Use new_workspace for focus_tile events
|
|
if workspace is not None:
|
|
self.current_workspace = workspace # focus_tile events already use 1-based indexing
|
|
# Get window title from focus_tile event
|
|
view_title = event.get('view_title', '')
|
|
self.current_window_title = view_title
|
|
elif event_name in ['view_map', 'view_unmap']:
|
|
# When windows are mapped/unmapped, query current state to update info
|
|
self.query_current_state()
|
|
elif event_name == 'close':
|
|
# When a window is closed, clear the title
|
|
self.current_window_title = ""
|
|
elif event_name == 'cycle_views':
|
|
# When cycling views, this might change the focused window
|
|
self.query_current_state()
|
|
elif event_name == 'dump':
|
|
# Handle dump responses
|
|
self.parse_dump_data(event)
|
|
|
|
def get_workspace_info(self):
|
|
"""Get current workspace information"""
|
|
if not self.connected:
|
|
return {"text": "NEDM: Disconnected", "class": "disconnected"}
|
|
|
|
# Format the display text
|
|
if self.current_window_title:
|
|
# Truncate long titles to keep the bar readable
|
|
title = self.current_window_title[:30] + "..." if len(self.current_window_title) > 30 else self.current_window_title
|
|
display_text = f"WS: ●{self.current_workspace} | {title}"
|
|
tooltip_text = f"Workspace {self.current_workspace}: {self.current_window_title}"
|
|
else:
|
|
display_text = f"WS: ●{self.current_workspace}"
|
|
tooltip_text = f"Current workspace: {self.current_workspace}"
|
|
|
|
return {
|
|
"text": display_text,
|
|
"class": "workspaces",
|
|
"tooltip": tooltip_text
|
|
}
|
|
|
|
# Global tracker instance
|
|
tracker = NEDMWorkspaceTracker()
|
|
|
|
def signal_handler(signum, frame):
|
|
"""Handle termination signals gracefully"""
|
|
if tracker.connected:
|
|
try:
|
|
tracker.sock.close()
|
|
except:
|
|
pass
|
|
sys.exit(0)
|
|
|
|
def main():
|
|
"""Main loop for waybar module"""
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
# Try to connect to NEDM
|
|
if tracker.connect_to_nedm():
|
|
# Start listening thread for events
|
|
event_thread = threading.Thread(target=tracker.listen_for_events, daemon=True)
|
|
event_thread.start()
|
|
|
|
while True:
|
|
try:
|
|
# Try to reconnect if disconnected
|
|
if not tracker.connected:
|
|
if tracker.connect_to_nedm():
|
|
event_thread = threading.Thread(target=tracker.listen_for_events, daemon=True)
|
|
event_thread.start()
|
|
|
|
workspace_info = tracker.get_workspace_info()
|
|
print(json.dumps(workspace_info), flush=True)
|
|
time.sleep(1) # Update every second
|
|
except KeyboardInterrupt:
|
|
break
|
|
except Exception:
|
|
# If something goes wrong, show error and continue
|
|
error_info = {"text": "NEDM: Error", "class": "error"}
|
|
print(json.dumps(error_info), flush=True)
|
|
time.sleep(5) # Wait longer on error
|
|
|
|
if __name__ == "__main__":
|
|
main() |