# PiConsole v1.2 | kiy.li/go/piconsole1 from flask import Flask, request, jsonify import os import subprocess import psutil import time import requests app = Flask(__name__) version = 'v1.2' time.sleep(10) @app.route('/') def hello(): return f'Running PiConsole {version}' def pi_is_reachable(): try: subprocess.check_output(['ping', '-c', '1', 'localhost']) return True except subprocess.CalledProcessError: return False def get_temperature(): try: temp = subprocess.check_output(['vcgencmd', 'measure_temp']).decode('utf-8') return temp.replace('temp=', '').replace('\'C\n', '') except subprocess.CalledProcessError: return 'N/A' def get_disk_usage(): usage = psutil.disk_usage('/') return f'Total: {usage.total} bytes\nUsed: {usage.used} bytes\nFree: {usage.free} bytes' def get_memory_usage(): virtual_memory = psutil.virtual_memory() return f'Total: {virtual_memory.total} bytes\nAvailable: {virtual_memory.available} bytes' def list_files(): files = os.listdir('.') return '\n'.join(files) def read_file_content(filename): try: with open(filename, 'r') as file: content = file.read() return content except FileNotFoundError: return f'File {filename} not found' def get_network_info(): interfaces = psutil.net_if_addrs() info = '' for iface, addrs in interfaces.items(): info += f'Interface: {iface}\n' for addr in addrs: info += f' {addr.family.name}: {addr.address}\n' return info def update_software(): try: subprocess.check_output(['sudo', 'apt', 'update']) subprocess.check_output(['sudo', 'apt', 'upgrade', '-y']) return 'Software updated successfully.' except subprocess.CalledProcessError: return 'Failed to update software.' def reboot_pi(): subprocess.check_output(['sudo', 'reboot']) def shutdown_pi(): subprocess.check_output(['sudo', 'shutdown', '-h', 'now']) def run_file(filename): try: output = subprocess.check_output(['python3', filename], stderr=subprocess.STDOUT, universal_newlines=True) return f'Output of {filename}:\n{output}' except subprocess.CalledProcessError as e: return f'Error running {filename}:\n{e.output}' def get_system_uptime(): uptime_seconds = int(time.time() - psutil.boot_time()) uptime_minutes = uptime_seconds // 60 uptime_hours = uptime_minutes // 60 return f'System uptime:\nIn seconds: {uptime_seconds} seconds.\nIn minutes: {uptime_minutes} minutes.\nIn hours: {uptime_hours} hours.' def run_external_script(url): try: response = requests.get(url) if response.status_code == 200: script_content = response.text script_filename = 'external_script.py' # Save the script content to a local file with open(script_filename, 'w') as script_file: script_file.write(script_content) # Run the local script and capture output and errors try: completed_process = subprocess.run(['python3', script_filename], check=True, text=True, capture_output=True) output = completed_process.stdout error_output = completed_process.stderr # Clean up by deleting the local script file os.remove(script_filename) response_text = 'External script executed successfully.' if output: response_text += f'\nOutput:\n{output}' if error_output: response_text += f'\nErrors:\n{error_output}' return response_text except subprocess.CalledProcessError as e: return f'Error running the script: {e.output}' else: return f'Failed to fetch external script. Status code: {response.status_code}' except Exception as e: return f'Error executing external script: {str(e)}' @app.route('/piconsole', methods=['POST']) def handle_command(): data = request.json command = data.get('command') if command == 'status': response = {'response': 'Online' if pi_is_reachable() else 'Offline'} elif command == 'temperature': temperature = get_temperature() response = {'response': f'Current temperature: {temperature} °C'} return jsonify(response), 200, {'Content-Type': 'text/json; charset=utf-8'} elif command == 'diskusage': disk_usage = get_disk_usage() response = {'response': f'Disk usage:\n{disk_usage}'} elif command == 'memoryusage': memory_usage = get_memory_usage() response = {'response': f'Memory usage:\n{memory_usage}'} elif command == 'ls': file_list = list_files() response = {'response': f'Listing files:\n{file_list}'} elif command.startswith('cat '): filename = command.split(' ', 1)[1] file_content = read_file_content(filename) response = {'response': f'Contents of {filename}:\n{file_content}'} elif command == 'networkinfo': network_info = get_network_info() response = {'response': f'Network information:\n{network_info}'} elif command == 'update': update_result = update_software() response = {'response': update_result} elif command == 'reboot': reboot_pi() response = {'response': 'Rebooting the Raspberry Pi...'} elif command == 'shutdown': shutdown_pi() response = {'response': 'Shutting down the Raspberry Pi...'} elif command == 'ssh': response = {'response': 'To connect via SSH, use: ssh username@raspberry_ip'} elif command.startswith('run '): filename = command.split(' ', 1)[1] run_output = run_file(filename) response = {'response': run_output} elif command == 'uptime': uptime = get_system_uptime() response = {'response': uptime} elif command.startswith('externalscript '): url = command.split(' ', 1)[1] script_execution_result = run_external_script(url) response = {'response': script_execution_result} elif command == 'version': response = {'response': version} else: response = {'response': f'Unknown command: {command}'} return jsonify(response) if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=57988)