import asyncio import websockets import subprocess import os import psutil async def handle_command(websocket, path): async for command in websocket: response = process_command(command) await websocket.send(response) def process_command(command): if command == 'status': return 'Online' if pi_is_reachable() else 'Offline' elif command == 'temperature': temperature = get_temperature() return f'Current temperature: {temperature} °C' elif command == 'diskusage': disk_usage = get_disk_usage() return f'Disk usage:\n{disk_usage}' elif command == 'memoryusage': memory_usage = get_memory_usage() return f'Memory usage:\n{memory_usage}' elif command == 'ls': file_list = list_files() return f'Listing files:\n{file_list}' elif command.startswith('cat '): filename = command.split(' ', 1)[1] file_content = read_file_content(filename) return f'Contents of {filename}:\n{file_content}' elif command == 'networkinfo': network_info = get_network_info() return f'Network information:\n{network_info}' elif command == 'update': update_result = update_software() return update_result elif command == 'reboot': reboot_pi() return 'Rebooting the Raspberry Pi...' elif command == 'shutdown': shutdown_pi() return 'Shutting down the Raspberry Pi...' elif command == 'ssh': return 'To connect via SSH, use: ssh username@raspberry_ip' elif command.startswith('run '): filename = command.split(' ', 1)[1] return run_file(filename) else: return f'Unknown command: {command}' def pi_is_reachable(): # Check if the Raspberry Pi is reachable by pinging try: subprocess.check_output(['ping', '-c', '1', 'localhost']) return True except subprocess.CalledProcessError: return False def get_temperature(): # Get the CPU temperature try: temp = subprocess.check_output(['vcgencmd', 'measure_temp']).decode('utf-8') return temp.replace('temp=', '').replace('\'C\n', '') except subprocess.CalledProcessError: return 'N/A' def get_disk_usage(): # Get disk usage details usage = psutil.disk_usage('/') return f'Total: {usage.total} bytes\nUsed: {usage.used} bytes\nFree: {usage.free} bytes' def get_memory_usage(): # Get memory usage details virtual_memory = psutil.virtual_memory() return f'Total: {virtual_memory.total} bytes\nAvailable: {virtual_memory.available} bytes' def list_files(): # List files and directories in the current directory files = os.listdir('.') return '\n'.join(files) def read_file_content(filename): # Read file content try: with open(filename, 'r') as file: content = file.read() return content except FileNotFoundError: return f'File {filename} not found' def get_network_info(): # Get network information interfaces = psutil.net_if_addrs() info = '' for iface, addrs in interfaces.items(): info += f'Interface: {iface}\n' for addr in addrs: info += f' {addr.family.name}: {addr.address}\n' return info def update_software(): # Update software packages try: subprocess.check_output(['sudo', 'apt', 'update']) subprocess.check_output(['sudo', 'apt', 'upgrade', '-y']) return 'Software updated successfully.' except subprocess.CalledProcessError: return 'Failed to update software.' def reboot_pi(): # Reboot the Raspberry Pi subprocess.check_output(['sudo', 'reboot']) def shutdown_pi(): # Shutdown the Raspberry Pi subprocess.check_output(['sudo', 'shutdown', '-h', 'now']) def run_file(filename): try: output = subprocess.check_output(['python3', filename], stderr=subprocess.STDOUT, universal_newlines=True) return f'Output of {filename}:\n{output}' except subprocess.CalledProcessError as e: return f'Error running {filename}:\n{e.output}' start_server = websockets.serve(handle_command, "0.0.0.0", 8765) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()