#!/usr/bin/env ruby require 'json' require 'securerandom' require 'shellwords' require 'net/http' require 'uri' class OllamaClient def initialize(options = {}) @model = options[:model] || 'llama3.2' @uri = URI("http://127.0.0.1:11434/api/chat") end def chat(messages, &block) Net::HTTP.start(@uri.hostname, @uri.port, http_options) do |http| http.request(build_request(messages)) do |response| if response.is_a?(Net::HTTPSuccess) response.read_body(&block) else raise "Ollama API error: #{response.code} #{response.message}" end end end end private def http_options { open_timeout: 10, read_timeout: 3600, use_ssl: @uri.scheme == "https" } end def build_request(messages) Net::HTTP::Post.new(@uri.path).tap do |request| request["Accept"] = "application/json" request["Content-Type"] = "application/json" request["User-Agent"] = "ruby-cli/1.0.0" request.body = build_payload(messages).to_json end end def build_payload(messages) { messages: messages, model: @model, stream: true, keep_alive: "5m", options: { temperature: 0.1 } } end end class ShellParser SHELL_OPERATORS = %w[|| && ;; |& <( <<< >> >& <& & ; ( ) | < >].freeze SHELL_META_CHARS = '|&;()<> \t'.freeze def initialize @operator_pattern = build_operator_pattern end def parse_shell_command(command, env = {}) tokens = tokenize(command) return { command: nil, args: [] } if tokens.empty? processed_tokens = substitute_variables(tokens, env) { command: processed_tokens.first, args: processed_tokens[1..-1] || [] } end private def build_operator_pattern escaped = SHELL_OPERATORS.map { |op| Regexp.escape(op) } Regexp.new("^(#{escaped.join('|')})$") end def tokenize(command) tokens = [] current_token = '' in_quotes = false quote_char = nil escaped = false command.each_char.with_index do |char, i| if escaped current_token += char escaped = false elsif in_quotes if char == quote_char in_quotes = false quote_char = nil else current_token += char end elsif char == '\\' escaped = true elsif char == '"' || char == "'" in_quotes = true quote_char = char elsif char.match?(/\s/) if !current_token.empty? tokens << current_token current_token = '' end else current_token += char end end tokens << current_token unless current_token.empty? tokens end def substitute_variables(tokens, env) tokens.map do |token| token.gsub(/\$\{([^}]+)\}|\$([A-Za-z_][A-Za-z0-9_]*)/) do var_name = $1 || $2 env[var_name] || ENV[var_name] || '' end end end end class HttpClient def initialize(options = {}) @timeout = options[:timeout] || 30 @retries = options[:retries] || 3 end def get(url) uri = URI(url) @retries.times do |attempt| begin response = Net::HTTP.start(uri.hostname, uri.port, use_ssl: uri.scheme == 'https', read_timeout: @timeout) do |http| http.get(uri.path.empty? ? '/' : uri.path) end return { status: response.code.to_i, data: parse_response_body(response), headers: response.to_hash } rescue => error raise error if attempt == @retries - 1 sleep(2 ** attempt) end end end private def parse_response_body(response) content_type = response['content-type'] if content_type&.include?('application/json') JSON.parse(response.body) else response.body end rescue JSON::ParserError response.body end end class CLIApplication BUILTIN_COMMANDS = %w[cd pwd echo export set history help version exit quit ask].freeze def initialize(options = {}) @options = { debug: options[:debug] || false, timeout: options[:timeout] || 30000 }.merge(options) @shell_parser = ShellParser.new @http_client = HttpClient.new @ollama_client = OllamaClient.new(options) @active_commands = {} @command_history = [] @conversation_history = [] @environment = detect_runtime @current_dir = Dir.pwd init end def execute_command(command, context = {}) command_id = SecureRandom.uuid start_time = Time.now begin parsed = @shell_parser.parse_shell_command(command, context[:env] || {}) raise "Invalid command: empty or malformed" if parsed[:command].nil? || parsed[:command].empty? # Check if it's a builtin command if builtin_command?(parsed[:command]) result = execute_builtin_command(parsed, context) # Check if it's an executable system command elsif system_command_exists?(parsed[:command]) @active_commands[command_id] = { command: parsed[:command], args: parsed[:args], start_time: start_time } result = execute_external_command(parsed, context) else # Command not found, send to AI return execute_ask(command.split) end add_to_history(command, result) result rescue => error handle_error(error, "command-execution", { command: command, command_id: command_id }) raise ensure @active_commands.delete(command_id) end end def start_repl puts "CLI Application Starting..." if @options[:debug] puts "Runtime: #{@environment[:name]} #{@environment[:version]}" if @options[:debug] puts "Type 'help' for available commands, 'exit' to quit.\n\n" loop do print "#{File.basename(@current_dir)} > " command = gets&.strip break if command.nil? next if command.empty? begin result = execute_command(command) puts result[:stdout] unless result[:stdout].empty? $stderr.puts result[:stderr] unless result[:stderr].empty? break if result[:should_exit] rescue => error puts "Error: #{error.message}" end end end private def init log_startup_info if @options[:debug] end def detect_runtime { name: "Ruby", version: RUBY_VERSION, platform: RUBY_PLATFORM } end def log_startup_info puts "Ruby CLI Application v1.0.0" puts "Runtime: #{@environment[:name]} #{@environment[:version]}" puts "Platform: #{@environment[:platform]}" end def builtin_command?(command) BUILTIN_COMMANDS.include?(command) end def execute_builtin_command(parsed, context) command = parsed[:command] args = parsed[:args] case command when 'echo' execute_echo(args) when 'pwd' execute_pwd when 'cd' execute_cd(args.first) when 'export' execute_export(args, context) when 'set' execute_set(args, context) when 'history' execute_history(args) when 'help' execute_help when 'version' execute_version when 'exit', 'quit' execute_exit(args) when 'ask' execute_ask(args) else raise "Unknown builtin command: #{command}" end end def execute_external_command(parsed, context) command = parsed[:command] args = parsed[:args] case command when 'ls', 'dir' execute_ls(args) when 'cat', 'type' execute_cat(args) when 'curl', 'wget' execute_curl(args) else execute_system_command(command, args) end end def execute_echo(args) output = args.join(' ') { exit_code: 0, stdout: output, stderr: "" } end def execute_pwd { exit_code: 0, stdout: @current_dir, stderr: "" } end def execute_cd(path) path = ENV['HOME'] if path.nil? || path == '~' begin expanded_path = File.expand_path(path, @current_dir) if Dir.exist?(expanded_path) @current_dir = expanded_path { exit_code: 0, stdout: "", stderr: "", cwd: @current_dir } else { exit_code: 1, stdout: "", stderr: "cd: #{path}: No such file or directory" } end rescue => error { exit_code: 1, stdout: "", stderr: "cd: #{error.message}" } end end def execute_export(args, context) if args.empty? vars = ENV.map { |key, value| "#{key}=#{value}" }.join("\n") return { exit_code: 0, stdout: vars, stderr: "" } end args.each do |arg| key, value = arg.split('=', 2) if key && !value.nil? ENV[key] = value context[:env] ||= {} context[:env][key] = value end end { exit_code: 0, stdout: "", stderr: "" } end def execute_set(args, context) execute_export(args, context) end def execute_history(args) count = args.first ? args.first.to_i : @command_history.length history = @command_history.last(count) .each_with_index .map { |entry, index| "#{index + 1} #{entry[:command]}" } .join("\n") { exit_code: 0, stdout: history, stderr: "" } end def execute_help help_text = <<~HELP CLI Application Help Builtin Commands: cd [path] Change directory pwd Print working directory echo [args] Print arguments export [var] Set environment variable set [var] Set shell variable history [n] Show command history ask [question] Ask AI assistant a question help Show this help information version Show version information exit/quit Exit the application External Commands: System commands are executed directly. AI Assistant: Any input that isn't a valid command is sent to AI automatically. You can also use 'ask' command explicitly. Examples: cd /home/user echo "Hello World" ls -la what time is it? ask "What is Ruby?" how do I install ruby? HELP { exit_code: 0, stdout: help_text.strip, stderr: "" } end def execute_version version_text = <<~VERSION Ruby CLI Application v1.0.0 Runtime: #{@environment[:name]} #{@environment[:version]} Platform: #{@environment[:platform]} VERSION { exit_code: 0, stdout: version_text.strip, stderr: "" } end def execute_ask(args) if args.empty? return { exit_code: 1, stdout: "", stderr: "ask: missing question" } end question = args.join(' ') @conversation_history << { role: "user", content: question } begin response_content = "" print "🤖 " @ollama_client.chat(@conversation_history) do |chunk| chunk.split("\n").each do |line| next if line.strip.empty? begin data = JSON.parse(line) if data['message'] && data['message']['content'] content = data['message']['content'] print content response_content += content end rescue JSON::ParserError end end end puts @conversation_history << { role: "assistant", content: response_content } { exit_code: 0, stdout: response_content, stderr: "" } rescue => error { exit_code: 1, stdout: "", stderr: "ask: #{error.message}" } end end def execute_exit(args) exit_code = args.first ? args.first.to_i : 0 { exit_code: exit_code, stdout: "Goodbye!", stderr: "", should_exit: true } end def execute_ls(args) begin path = args.empty? ? @current_dir : File.expand_path(args.first, @current_dir) unless Dir.exist?(path) return { exit_code: 1, stdout: "", stderr: "ls: #{args.first}: No such file or directory" } end entries = Dir.entries(path) if args.include?('-a') || args.include?('-la') || args.include?('-al') files = entries else files = entries.reject { |f| f.start_with?('.') } end if args.include?('-l') || args.include?('-la') || args.include?('-al') output = files.map do |file| filepath = File.join(path, file) stat = File.lstat(filepath) permissions = format_permissions(stat.mode) size = stat.size mtime = stat.mtime.strftime('%b %d %H:%M') "#{permissions} #{stat.nlink} #{stat.uid} #{stat.gid} #{size} #{mtime} #{file}" end.join("\n") else output = files.join("\n") end { exit_code: 0, stdout: output, stderr: "" } rescue => error { exit_code: 1, stdout: "", stderr: "ls: #{error.message}" } end end def execute_cat(args) if args.empty? return { exit_code: 1, stdout: "", stderr: "cat: missing file operand" } end output = "" stderr = "" exit_code = 0 args.each do |filename| filepath = File.expand_path(filename, @current_dir) begin content = File.read(filepath) output += content rescue => error stderr += "cat: #{filename}: #{error.message}\n" exit_code = 1 end end { exit_code: exit_code, stdout: output, stderr: stderr.chomp } end def execute_curl(args) if args.empty? return { exit_code: 1, stdout: "", stderr: "curl: missing URL" } end url = args.last begin response = @http_client.get(url) { exit_code: 0, stdout: response[:data].is_a?(Hash) ? JSON.pretty_generate(response[:data]) : response[:data].to_s, stderr: "" } rescue => error { exit_code: 1, stdout: "", stderr: "curl: #{error.message}" } end end def execute_system_command(command, args) begin full_command = Shellwords.join([command] + args) output = `cd #{Shellwords.escape(@current_dir)} && #{full_command} 2>&1` exit_code = $?.exitstatus { exit_code: exit_code, stdout: output, stderr: "" } rescue => error { exit_code: 1, stdout: "", stderr: "#{command}: #{error.message}" } end end private def system_command_exists?(command) # Check if command exists in PATH ENV['PATH'].split(':').any? do |path| File.executable?(File.join(path, command)) end end def format_permissions(mode) perms = "" perms += File.directory?(mode) ? "d" : "-" perms += (mode & 0o400 != 0) ? "r" : "-" perms += (mode & 0o200 != 0) ? "w" : "-" perms += (mode & 0o100 != 0) ? "x" : "-" perms += (mode & 0o040 != 0) ? "r" : "-" perms += (mode & 0o020 != 0) ? "w" : "-" perms += (mode & 0o010 != 0) ? "x" : "-" perms += (mode & 0o004 != 0) ? "r" : "-" perms += (mode & 0o002 != 0) ? "w" : "-" perms += (mode & 0o001 != 0) ? "x" : "-" perms end def add_to_history(command, result) @command_history << { command: command, timestamp: Time.now, exit_code: result[:exit_code], duration: result[:duration] } @command_history = @command_history.last(500) if @command_history.length > 1000 end def handle_error(error, context, metadata = {}) error_info = { message: error.message, backtrace: error.backtrace&.first(5), context: context, timestamp: Time.now }.merge(metadata) if @options[:debug] puts "CLI Error: #{error_info}" end end end def create_cli(options = {}) CLIApplication.new(options) end def main options = parse_options(ARGV.dup) if options[:command] app = create_cli(options) begin result = app.execute_command(options[:command]) puts result[:stdout] $stderr.puts result[:stderr] unless result[:stderr].empty? exit(result[:exit_code]) rescue => error $stderr.puts "Command failed: #{error.message}" exit(1) end else app = create_cli(options) app.start_repl end end def parse_options(argv) options = { debug: false, host: nil, model: nil, command: nil } while arg = argv.shift case arg when '--debug' options[:debug] = true when '--host' options[:host] = argv.shift when '--model' options[:model] = argv.shift when '--help' show_usage exit(0) else options[:command] = ([arg] + argv).join(' ') break end end options end def show_usage puts <<~USAGE Usage: #{$0} [options] [command] Options: --debug Enable debug mode --host HOST Set Ollama host (default: localhost:11434) --model MODEL Set Ollama model (default: llama3.2) --help Show this help Examples: #{$0} # Start interactive REPL #{$0} --host localhost:11434 # Connect to specific Ollama instance #{$0} --model qwen2.5 # Use different model #{$0} echo "Hello World" # Execute single command #{$0} ask "What is Ruby?" # Ask AI question directly USAGE end if __FILE__ == $0 main end