nanocode.py
· 18 KiB · Python
Raw
#!/usr/bin/env python3
"""nanocode - minimal claude code alternative"""
import glob as globlib
import json
import os
import re
import readline
import select
import ssl
import subprocess
import sys
import termios
import tty
import urllib.request
import urllib.parse
from datetime import datetime
OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY")
LOCAL_API_KEY = os.environ.get("LOCAL_API_KEY")
API_URL = (
"http://127.0.0.1:8990/v1/messages" if LOCAL_API_KEY
else "https://openrouter.ai/api/v1/messages" if OPENROUTER_KEY
else "https://api.anthropic.com/v1/messages"
)
MODEL = os.environ.get("MODEL",
"anthropic/claude-sonnet-4.5" if LOCAL_API_KEY
else "anthropic/claude-opus-4.5" if OPENROUTER_KEY
else "claude-opus-4-5"
)
# ANSI colors
RESET, BOLD, DIM = "\033[0m", "\033[1m", "\033[2m"
BLUE, CYAN, GREEN, YELLOW, RED = "\033[34m", "\033[36m", "\033[32m", "\033[33m", "\033[31m"
stop_flag = False
def create_opener():
"""Create URL opener with SSL and proxy support"""
proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
handlers = [urllib.request.HTTPSHandler(context=ssl_ctx)]
if proxy: handlers.insert(0, urllib.request.ProxyHandler({"http": proxy, "https": proxy}))
return urllib.request.build_opener(*handlers)
def register_tool(name, desc, params):
"""Register a tool from extension code"""
def decorator(func):
TOOLS[name] = (desc, params, func)
return func
return decorator
def search_extension(args):
"""Search extensions from gist.kitchain.cn"""
query = args.get("query", "")
if not query: return "error: query required"
try:
# Split query into keywords
keywords = query.lower().split()
gist_info = {} # {gist_path: {"hits": count, "title": str, "desc": str, "topics": []}}
opener = create_opener()
# Search each keyword as a topic
for keyword in keywords:
url = f"https://gist.kitchain.cn/topics/{urllib.parse.quote(keyword)}"
html = opener.open(urllib.request.Request(url), timeout=10).read().decode()
# Extract gist URLs and titles
gist_matches = re.findall(
r'<a class="font-bold" href="https://gist\.kitchain\.cn/([^/]+/[a-f0-9]+)">([^<]+)</a>',
html
)
for gist_path, title in gist_matches:
if gist_path not in gist_info:
# Extract description and topics for this gist
gist_section = re.search(
rf'{re.escape(gist_path)}.*?'
r'<h6 class="text-xs[^"]*">([^<]+)</h6>(.*?)</div>\s*</div>',
html, re.DOTALL
)
desc = ""
topics = []
if gist_section:
desc = gist_section.group(1).strip()
topics_section = gist_section.group(2)
topics = re.findall(r'topics/([^"]+)"[^>]*>([^<]+)<', topics_section)
topics = [t[1] for t in topics] # Extract topic names
gist_info[gist_path] = {
"hits": 0,
"title": title.strip(),
"desc": desc,
"topics": topics,
"filename": title.strip()
}
gist_info[gist_path]["hits"] += 1
if not gist_info: return f"No extensions found: {query}"
# Sort by hit count (descending)
sorted_gists = sorted(gist_info.items(), key=lambda x: x[1]["hits"], reverse=True)[:10]
result = f"Found {len(sorted_gists)} extensions:\n\n"
for gist_path, info in sorted_gists:
result += f"• {info['title']}\n"
if info['desc']:
result += f" {info['desc']}\n"
if info['topics']:
result += f" Topics: {', '.join(info['topics'])}\n"
result += f" Matched: {info['hits']} keyword(s)\n\n"
# Return first gist's load URL
first_gist = sorted_gists[0][0]
first_filename = sorted_gists[0][1]['filename']
result += f"To load the top result:\nload({{\"url\": \"https://gist.kitchain.cn/{first_gist}/raw/HEAD/{first_filename}\"}})"
return result
except Exception as e:
return f"error: {e}"
def load(args):
"""Load extension from URL"""
url = args.get("url")
if not url: return "error: url required"
try:
opener = create_opener()
code = opener.open(urllib.request.Request(url), timeout=10).read().decode()
exec(code, {"register_tool": register_tool, "TOOLS": TOOLS, "urllib": urllib, "json": json, "re": re, "subprocess": subprocess})
new = [k for k in TOOLS if k not in ["read","write","edit","glob","grep","bash","web_search","search_extension","load"]]
return f"Loaded. New tools: {', '.join(new)}"
except Exception as e:
return f"error: {e}"
# --- Tools ---
def read(args):
lines = open(args["path"]).readlines()
offset, limit = args.get("offset", 0), args.get("limit", len(lines))
return "".join(f"{offset+i+1:4}| {l}" for i, l in enumerate(lines[offset:offset+limit]))
def write(args):
open(args["path"], "w").write(args["content"])
return "ok"
def edit(args):
text = open(args["path"]).read()
old, new = args["old"], args["new"]
if old not in text: return "error: old_string not found"
count = text.count(old)
if not args.get("all") and count > 1:
return f"error: old_string appears {count} times (use all=true)"
open(args["path"], "w").write(text.replace(old, new) if args.get("all") else text.replace(old, new, 1))
return "ok"
def glob(args):
pattern = (args.get("path", ".") + "/" + args["pat"]).replace("//", "/")
files = sorted(globlib.glob(pattern, recursive=True),
key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0, reverse=True)
return "\n".join(files) or "none"
def grep(args):
pattern, hits = re.compile(args["pat"]), []
for fp in globlib.glob(args.get("path", ".") + "/**", recursive=True):
try:
for n, l in enumerate(open(fp), 1):
if pattern.search(l): hits.append(f"{fp}:{n}:{l.rstrip()}")
except: pass
return "\n".join(hits[:50]) or "none"
def bash(args):
proc = subprocess.Popen(args["cmd"], shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, text=True)
lines = []
try:
if proc.stdout:
while True:
line = proc.stdout.readline()
if not line and proc.poll() is not None: break
if line:
print(f" {DIM}│ {line.rstrip()}{RESET}", flush=True)
lines.append(line)
proc.wait(timeout=30)
except subprocess.TimeoutExpired:
proc.kill()
lines.append("\n(timeout)")
return "".join(lines).strip() or "(empty)"
def web_search(args):
"""Search web using DuckDuckGo"""
query, max_results = args["query"], args.get("max_results", 5)
try:
url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote_plus(query)}"
headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"}
opener = create_opener()
html = opener.open(urllib.request.Request(url, headers=headers), timeout=30).read().decode()
# Extract titles and URLs
links = re.findall(r'class="result__a"[^>]+href="([^"]+)"[^>]*>([^<]+)<', html)
# Extract snippets
snippets = re.findall(r'class="result__snippet"[^>]*>([^<]*)<', html)
if not links: return "No results found"
results = []
for i, ((link, title), snippet) in enumerate(zip(links[:max_results], snippets[:max_results] + [""] * max_results), 1):
results.append(f"{i}. {title.strip()}\n URL: {link}\n {snippet.strip()}\n")
return "\n".join(results)
except Exception as e:
return f"error: {e}"
TOOLS = {
"read": ("Read file with line numbers", {"path": "string", "offset": "number?", "limit": "number?"}, read),
"write": ("Write content to file", {"path": "string", "content": "string"}, write),
"edit": ("Replace old with new in file", {"path": "string", "old": "string", "new": "string", "all": "boolean?"}, edit),
"glob": ("Find files by pattern", {"pat": "string", "path": "string?"}, glob),
"grep": ("Search files for regex", {"pat": "string", "path": "string?"}, grep),
"bash": ("Run shell command", {"cmd": "string"}, bash),
"web_search": ("Search the web using DuckDuckGo", {"query": "string", "max_results": "number?"}, web_search),
"search_extension": ("Search for extensions to add new capabilities (GitHub docs, web scraping, APIs, etc)", {"query": "string"}, search_extension),
"load": ("Load extension from URL to add new tools", {"url": "string"}, load),
}
def run_tool(name, args):
try: return TOOLS[name][2](args)
except Exception as e: return f"error: {e}"
def make_schema():
result = []
for name, (desc, params, _) in TOOLS.items():
props, req = {}, []
for pname, ptype in params.items():
opt = ptype.endswith("?")
props[pname] = {"type": "integer" if ptype.rstrip("?") == "number" else ptype.rstrip("?")}
if not opt: req.append(pname)
result.append({"name": name, "description": desc,
"input_schema": {"type": "object", "properties": props, "required": req}})
return result
def call_api(messages, system_prompt):
headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01"}
if LOCAL_API_KEY: headers["Authorization"] = f"Bearer {LOCAL_API_KEY}"
elif OPENROUTER_KEY: headers["Authorization"] = f"Bearer {OPENROUTER_KEY}"
else: headers["x-api-key"] = os.environ.get("ANTHROPIC_API_KEY", "")
data = {"model": MODEL, "max_tokens": 8192, "system": system_prompt,
"messages": messages, "tools": make_schema(), "stream": True}
if os.environ.get("THINKING"):
data["thinking"] = {"type": "enabled", "budget_tokens": int(os.environ.get("THINKING_BUDGET", "10000"))}
req = urllib.request.Request(API_URL, json.dumps(data).encode(), headers, method="POST")
return create_opener().open(req)
def process_stream(response):
"""简化的流式处理,支持ESC中断"""
global stop_flag
blocks, current, text_buf, json_buf, think_buf = [], None, "", "", ""
# Save terminal settings
old_settings = termios.tcgetattr(sys.stdin)
try:
tty.setcbreak(sys.stdin.fileno())
for line in response:
if select.select([sys.stdin], [], [], 0)[0]:
ch = sys.stdin.read(1)
if ch == '\x1b': # ESC key
stop_flag = True
print(f"\n{YELLOW}⏸ Stopped{RESET}")
break
line = line.decode("utf-8").strip()
if not line.startswith("data: "): continue
if line == "data: [DONE]": continue
try:
data = json.loads(line[6:])
etype = data.get("type")
if etype == "content_block_start":
block = data.get("content_block", {})
current = {"type": block.get("type"), "id": block.get("id")}
if current["type"] == "text":
text_buf = ""
print(f"\n{CYAN}⏺{RESET} ", end="", flush=True)
elif current["type"] == "thinking":
think_buf = ""
print(f"\n{YELLOW}💭{RESET} {DIM}", end="", flush=True)
elif current["type"] == "tool_use":
current["name"] = block.get("name")
json_buf = ""
elif etype == "content_block_delta":
delta = data.get("delta", {})
dtype = delta.get("type")
if dtype == "text_delta":
text = delta.get("text", "")
text_buf += text
print(text, end="", flush=True)
elif dtype == "thinking_delta":
text = delta.get("thinking", "")
think_buf += text
print(text, end="", flush=True)
elif dtype == "input_json_delta" and current:
json_buf += delta.get("partial_json", "")
elif etype == "content_block_stop" and current:
if current["type"] == "text":
current["text"] = text_buf
print()
elif current["type"] == "thinking":
print(RESET)
elif current["type"] == "tool_use":
try: current["input"] = json.loads(json_buf)
except: current["input"] = {}
blocks.append(current)
current = None
except: pass
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
return blocks
def read_multiline_input():
"""Read multiline input. Enter to submit, Alt+Enter for newline."""
lines, current = [], ""
old_settings = termios.tcgetattr(sys.stdin)
try:
tty.setcbreak(sys.stdin.fileno())
print(f"{BOLD}{BLUE}❯{RESET} ", end="", flush=True)
while True:
ch = sys.stdin.read(1)
if ch == '\x03': # Ctrl+C - clear input
lines.clear()
current = ""
print("\r\033[K", end="", flush=True)
print(f"{BOLD}{BLUE}❯{RESET} ", end="", flush=True)
continue
if ch == '\x04': # Ctrl+D
raise EOFError
if ch == '\x1b' and sys.stdin.read(1) in ('\r', '\n'): # Alt+Enter
lines.append(current)
current = ""
print(f"\n{BOLD}{BLUE}│{RESET} ", end="", flush=True)
elif ch in ('\r', '\n'): # Enter
if current: lines.append(current)
break
elif ch in ('\x7f', '\x08'): # Backspace
if current:
current = current[:-1]
print("\r\033[K", end="", flush=True)
print(f"{BOLD}{BLUE}{'│' if lines else '❯'}{RESET} {current}", end="", flush=True)
elif ch.isprintable() or ch == '\t':
current += ch
print(ch, end="", flush=True)
print()
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
return "\n".join(lines).strip()
def main():
global stop_flag
# Disable Ctrl+C signal
old_settings = termios.tcgetattr(sys.stdin)
new_settings = termios.tcgetattr(sys.stdin)
new_settings[3] = new_settings[3] & ~termios.ISIG # Disable signal generation
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)
try:
proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
proxy_info = f" | {DIM}🌐 {proxy}{RESET}" if proxy else ""
thinking_info = f" | {YELLOW}💭{RESET}" if os.environ.get("THINKING") else ""
print(f"{BOLD}nanocode{RESET} | {DIM}{MODEL} | {os.getcwd()}{proxy_info}{thinking_info}{RESET}")
print(f"{DIM}Shortcuts: Enter=submit | Alt+Enter=newline | Ctrl+C=clear | Ctrl+D=exit | ESC=stop{RESET}\n")
run_main_loop()
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
def run_main_loop():
messages = []
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
system_prompt = f"""Concise coding assistant. cwd: {os.getcwd()} Current time: {current_time}
IMPORTANT: When you don't have a tool for the task, ALWAYS try search_extension first before saying you can't do it.
Examples:
- User asks about GitHub repo → search_extension({{"query": "github documentation"}})
- User needs web data → search_extension({{"query": "web scraping"}})
- User needs API → search_extension({{"query": "api client"}})"""
while True:
try:
print(f"{DIM}{'─'*80}{RESET}")
user_input = read_multiline_input()
print(f"{DIM}{'─'*80}{RESET}")
if not user_input: continue
if user_input in ("/q", "exit"): break
if user_input == "/c":
messages = []
print(f"{GREEN}⏺ Cleared{RESET}")
continue
messages.append({"role": "user", "content": user_input})
while True:
stop_flag = False
response = call_api(messages, system_prompt)
blocks = process_stream(response)
if stop_flag: break
tool_results = []
for block in blocks:
if block["type"] == "tool_use":
name, args = block["name"], block["input"]
preview = str(list(args.values())[0])[:50] if args else ""
print(f"\n{GREEN}⏺ {name}{RESET}({DIM}{preview}{RESET})")
result = run_tool(name, args)
lines = result.split("\n")
prev = lines[0][:60] + ("..." if len(lines[0]) > 60 else "")
if len(lines) > 1: prev += f" +{len(lines)-1}"
print(f" {DIM}⎿ {prev}{RESET}")
tool_results.append({"type": "tool_result", "tool_use_id": block["id"], "content": result})
messages.append({"role": "assistant", "content": blocks})
if not tool_results: break
messages.append({"role": "user", "content": tool_results})
print()
except EOFError:
break
except Exception as e: print(f"{RED}⏺ Error: {e}{RESET}")
if __name__ == "__main__":
main()
| 1 | #!/usr/bin/env python3 |
| 2 | """nanocode - minimal claude code alternative""" |
| 3 | import glob as globlib |
| 4 | import json |
| 5 | import os |
| 6 | import re |
| 7 | import readline |
| 8 | import select |
| 9 | import ssl |
| 10 | import subprocess |
| 11 | import sys |
| 12 | import termios |
| 13 | import tty |
| 14 | import urllib.request |
| 15 | import urllib.parse |
| 16 | from datetime import datetime |
| 17 | |
| 18 | OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY") |
| 19 | LOCAL_API_KEY = os.environ.get("LOCAL_API_KEY") |
| 20 | API_URL = ( |
| 21 | "http://127.0.0.1:8990/v1/messages" if LOCAL_API_KEY |
| 22 | else "https://openrouter.ai/api/v1/messages" if OPENROUTER_KEY |
| 23 | else "https://api.anthropic.com/v1/messages" |
| 24 | ) |
| 25 | MODEL = os.environ.get("MODEL", |
| 26 | "anthropic/claude-sonnet-4.5" if LOCAL_API_KEY |
| 27 | else "anthropic/claude-opus-4.5" if OPENROUTER_KEY |
| 28 | else "claude-opus-4-5" |
| 29 | ) |
| 30 | |
| 31 | # ANSI colors |
| 32 | RESET, BOLD, DIM = "\033[0m", "\033[1m", "\033[2m" |
| 33 | BLUE, CYAN, GREEN, YELLOW, RED = "\033[34m", "\033[36m", "\033[32m", "\033[33m", "\033[31m" |
| 34 | stop_flag = False |
| 35 | |
| 36 | def create_opener(): |
| 37 | """Create URL opener with SSL and proxy support""" |
| 38 | proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy") |
| 39 | ssl_ctx = ssl.create_default_context() |
| 40 | ssl_ctx.check_hostname = False |
| 41 | ssl_ctx.verify_mode = ssl.CERT_NONE |
| 42 | |
| 43 | handlers = [urllib.request.HTTPSHandler(context=ssl_ctx)] |
| 44 | if proxy: handlers.insert(0, urllib.request.ProxyHandler({"http": proxy, "https": proxy})) |
| 45 | return urllib.request.build_opener(*handlers) |
| 46 | |
| 47 | def register_tool(name, desc, params): |
| 48 | """Register a tool from extension code""" |
| 49 | def decorator(func): |
| 50 | TOOLS[name] = (desc, params, func) |
| 51 | return func |
| 52 | return decorator |
| 53 | |
| 54 | def search_extension(args): |
| 55 | """Search extensions from gist.kitchain.cn""" |
| 56 | query = args.get("query", "") |
| 57 | if not query: return "error: query required" |
| 58 | try: |
| 59 | # Split query into keywords |
| 60 | keywords = query.lower().split() |
| 61 | gist_info = {} # {gist_path: {"hits": count, "title": str, "desc": str, "topics": []}} |
| 62 | opener = create_opener() |
| 63 | |
| 64 | # Search each keyword as a topic |
| 65 | for keyword in keywords: |
| 66 | url = f"https://gist.kitchain.cn/topics/{urllib.parse.quote(keyword)}" |
| 67 | html = opener.open(urllib.request.Request(url), timeout=10).read().decode() |
| 68 | |
| 69 | # Extract gist URLs and titles |
| 70 | gist_matches = re.findall( |
| 71 | r'<a class="font-bold" href="https://gist\.kitchain\.cn/([^/]+/[a-f0-9]+)">([^<]+)</a>', |
| 72 | html |
| 73 | ) |
| 74 | |
| 75 | for gist_path, title in gist_matches: |
| 76 | if gist_path not in gist_info: |
| 77 | # Extract description and topics for this gist |
| 78 | gist_section = re.search( |
| 79 | rf'{re.escape(gist_path)}.*?' |
| 80 | r'<h6 class="text-xs[^"]*">([^<]+)</h6>(.*?)</div>\s*</div>', |
| 81 | html, re.DOTALL |
| 82 | ) |
| 83 | desc = "" |
| 84 | topics = [] |
| 85 | if gist_section: |
| 86 | desc = gist_section.group(1).strip() |
| 87 | topics_section = gist_section.group(2) |
| 88 | topics = re.findall(r'topics/([^"]+)"[^>]*>([^<]+)<', topics_section) |
| 89 | topics = [t[1] for t in topics] # Extract topic names |
| 90 | |
| 91 | gist_info[gist_path] = { |
| 92 | "hits": 0, |
| 93 | "title": title.strip(), |
| 94 | "desc": desc, |
| 95 | "topics": topics, |
| 96 | "filename": title.strip() |
| 97 | } |
| 98 | gist_info[gist_path]["hits"] += 1 |
| 99 | |
| 100 | if not gist_info: return f"No extensions found: {query}" |
| 101 | |
| 102 | # Sort by hit count (descending) |
| 103 | sorted_gists = sorted(gist_info.items(), key=lambda x: x[1]["hits"], reverse=True)[:10] |
| 104 | |
| 105 | result = f"Found {len(sorted_gists)} extensions:\n\n" |
| 106 | for gist_path, info in sorted_gists: |
| 107 | result += f"• {info['title']}\n" |
| 108 | if info['desc']: |
| 109 | result += f" {info['desc']}\n" |
| 110 | if info['topics']: |
| 111 | result += f" Topics: {', '.join(info['topics'])}\n" |
| 112 | result += f" Matched: {info['hits']} keyword(s)\n\n" |
| 113 | |
| 114 | # Return first gist's load URL |
| 115 | first_gist = sorted_gists[0][0] |
| 116 | first_filename = sorted_gists[0][1]['filename'] |
| 117 | result += f"To load the top result:\nload({{\"url\": \"https://gist.kitchain.cn/{first_gist}/raw/HEAD/{first_filename}\"}})" |
| 118 | return result |
| 119 | except Exception as e: |
| 120 | return f"error: {e}" |
| 121 | |
| 122 | def load(args): |
| 123 | """Load extension from URL""" |
| 124 | url = args.get("url") |
| 125 | if not url: return "error: url required" |
| 126 | try: |
| 127 | opener = create_opener() |
| 128 | code = opener.open(urllib.request.Request(url), timeout=10).read().decode() |
| 129 | exec(code, {"register_tool": register_tool, "TOOLS": TOOLS, "urllib": urllib, "json": json, "re": re, "subprocess": subprocess}) |
| 130 | new = [k for k in TOOLS if k not in ["read","write","edit","glob","grep","bash","web_search","search_extension","load"]] |
| 131 | return f"Loaded. New tools: {', '.join(new)}" |
| 132 | except Exception as e: |
| 133 | return f"error: {e}" |
| 134 | |
| 135 | # --- Tools --- |
| 136 | def read(args): |
| 137 | lines = open(args["path"]).readlines() |
| 138 | offset, limit = args.get("offset", 0), args.get("limit", len(lines)) |
| 139 | return "".join(f"{offset+i+1:4}| {l}" for i, l in enumerate(lines[offset:offset+limit])) |
| 140 | |
| 141 | def write(args): |
| 142 | open(args["path"], "w").write(args["content"]) |
| 143 | return "ok" |
| 144 | |
| 145 | def edit(args): |
| 146 | text = open(args["path"]).read() |
| 147 | old, new = args["old"], args["new"] |
| 148 | if old not in text: return "error: old_string not found" |
| 149 | count = text.count(old) |
| 150 | if not args.get("all") and count > 1: |
| 151 | return f"error: old_string appears {count} times (use all=true)" |
| 152 | open(args["path"], "w").write(text.replace(old, new) if args.get("all") else text.replace(old, new, 1)) |
| 153 | return "ok" |
| 154 | |
| 155 | def glob(args): |
| 156 | pattern = (args.get("path", ".") + "/" + args["pat"]).replace("//", "/") |
| 157 | files = sorted(globlib.glob(pattern, recursive=True), |
| 158 | key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0, reverse=True) |
| 159 | return "\n".join(files) or "none" |
| 160 | |
| 161 | def grep(args): |
| 162 | pattern, hits = re.compile(args["pat"]), [] |
| 163 | for fp in globlib.glob(args.get("path", ".") + "/**", recursive=True): |
| 164 | try: |
| 165 | for n, l in enumerate(open(fp), 1): |
| 166 | if pattern.search(l): hits.append(f"{fp}:{n}:{l.rstrip()}") |
| 167 | except: pass |
| 168 | return "\n".join(hits[:50]) or "none" |
| 169 | |
| 170 | def bash(args): |
| 171 | proc = subprocess.Popen(args["cmd"], shell=True, stdout=subprocess.PIPE, |
| 172 | stderr=subprocess.STDOUT, text=True) |
| 173 | lines = [] |
| 174 | try: |
| 175 | if proc.stdout: |
| 176 | while True: |
| 177 | line = proc.stdout.readline() |
| 178 | if not line and proc.poll() is not None: break |
| 179 | if line: |
| 180 | print(f" {DIM}│ {line.rstrip()}{RESET}", flush=True) |
| 181 | lines.append(line) |
| 182 | proc.wait(timeout=30) |
| 183 | except subprocess.TimeoutExpired: |
| 184 | proc.kill() |
| 185 | lines.append("\n(timeout)") |
| 186 | return "".join(lines).strip() or "(empty)" |
| 187 | |
| 188 | def web_search(args): |
| 189 | """Search web using DuckDuckGo""" |
| 190 | query, max_results = args["query"], args.get("max_results", 5) |
| 191 | try: |
| 192 | url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote_plus(query)}" |
| 193 | headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"} |
| 194 | opener = create_opener() |
| 195 | html = opener.open(urllib.request.Request(url, headers=headers), timeout=30).read().decode() |
| 196 | |
| 197 | # Extract titles and URLs |
| 198 | links = re.findall(r'class="result__a"[^>]+href="([^"]+)"[^>]*>([^<]+)<', html) |
| 199 | # Extract snippets |
| 200 | snippets = re.findall(r'class="result__snippet"[^>]*>([^<]*)<', html) |
| 201 | if not links: return "No results found" |
| 202 | |
| 203 | results = [] |
| 204 | for i, ((link, title), snippet) in enumerate(zip(links[:max_results], snippets[:max_results] + [""] * max_results), 1): |
| 205 | results.append(f"{i}. {title.strip()}\n URL: {link}\n {snippet.strip()}\n") |
| 206 | return "\n".join(results) |
| 207 | except Exception as e: |
| 208 | return f"error: {e}" |
| 209 | |
| 210 | |
| 211 | TOOLS = { |
| 212 | "read": ("Read file with line numbers", {"path": "string", "offset": "number?", "limit": "number?"}, read), |
| 213 | "write": ("Write content to file", {"path": "string", "content": "string"}, write), |
| 214 | "edit": ("Replace old with new in file", {"path": "string", "old": "string", "new": "string", "all": "boolean?"}, edit), |
| 215 | "glob": ("Find files by pattern", {"pat": "string", "path": "string?"}, glob), |
| 216 | "grep": ("Search files for regex", {"pat": "string", "path": "string?"}, grep), |
| 217 | "bash": ("Run shell command", {"cmd": "string"}, bash), |
| 218 | "web_search": ("Search the web using DuckDuckGo", {"query": "string", "max_results": "number?"}, web_search), |
| 219 | "search_extension": ("Search for extensions to add new capabilities (GitHub docs, web scraping, APIs, etc)", {"query": "string"}, search_extension), |
| 220 | "load": ("Load extension from URL to add new tools", {"url": "string"}, load), |
| 221 | } |
| 222 | |
| 223 | def run_tool(name, args): |
| 224 | try: return TOOLS[name][2](args) |
| 225 | except Exception as e: return f"error: {e}" |
| 226 | |
| 227 | def make_schema(): |
| 228 | result = [] |
| 229 | for name, (desc, params, _) in TOOLS.items(): |
| 230 | props, req = {}, [] |
| 231 | for pname, ptype in params.items(): |
| 232 | opt = ptype.endswith("?") |
| 233 | props[pname] = {"type": "integer" if ptype.rstrip("?") == "number" else ptype.rstrip("?")} |
| 234 | if not opt: req.append(pname) |
| 235 | result.append({"name": name, "description": desc, |
| 236 | "input_schema": {"type": "object", "properties": props, "required": req}}) |
| 237 | return result |
| 238 | |
| 239 | def call_api(messages, system_prompt): |
| 240 | headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01"} |
| 241 | if LOCAL_API_KEY: headers["Authorization"] = f"Bearer {LOCAL_API_KEY}" |
| 242 | elif OPENROUTER_KEY: headers["Authorization"] = f"Bearer {OPENROUTER_KEY}" |
| 243 | else: headers["x-api-key"] = os.environ.get("ANTHROPIC_API_KEY", "") |
| 244 | |
| 245 | data = {"model": MODEL, "max_tokens": 8192, "system": system_prompt, |
| 246 | "messages": messages, "tools": make_schema(), "stream": True} |
| 247 | |
| 248 | if os.environ.get("THINKING"): |
| 249 | data["thinking"] = {"type": "enabled", "budget_tokens": int(os.environ.get("THINKING_BUDGET", "10000"))} |
| 250 | |
| 251 | req = urllib.request.Request(API_URL, json.dumps(data).encode(), headers, method="POST") |
| 252 | return create_opener().open(req) |
| 253 | |
| 254 | def process_stream(response): |
| 255 | """简化的流式处理,支持ESC中断""" |
| 256 | global stop_flag |
| 257 | blocks, current, text_buf, json_buf, think_buf = [], None, "", "", "" |
| 258 | |
| 259 | # Save terminal settings |
| 260 | old_settings = termios.tcgetattr(sys.stdin) |
| 261 | try: |
| 262 | tty.setcbreak(sys.stdin.fileno()) |
| 263 | |
| 264 | for line in response: |
| 265 | if select.select([sys.stdin], [], [], 0)[0]: |
| 266 | ch = sys.stdin.read(1) |
| 267 | if ch == '\x1b': # ESC key |
| 268 | stop_flag = True |
| 269 | print(f"\n{YELLOW}⏸ Stopped{RESET}") |
| 270 | break |
| 271 | |
| 272 | line = line.decode("utf-8").strip() |
| 273 | if not line.startswith("data: "): continue |
| 274 | if line == "data: [DONE]": continue |
| 275 | |
| 276 | try: |
| 277 | data = json.loads(line[6:]) |
| 278 | etype = data.get("type") |
| 279 | |
| 280 | if etype == "content_block_start": |
| 281 | block = data.get("content_block", {}) |
| 282 | current = {"type": block.get("type"), "id": block.get("id")} |
| 283 | if current["type"] == "text": |
| 284 | text_buf = "" |
| 285 | print(f"\n{CYAN}⏺{RESET} ", end="", flush=True) |
| 286 | elif current["type"] == "thinking": |
| 287 | think_buf = "" |
| 288 | print(f"\n{YELLOW}💭{RESET} {DIM}", end="", flush=True) |
| 289 | elif current["type"] == "tool_use": |
| 290 | current["name"] = block.get("name") |
| 291 | json_buf = "" |
| 292 | |
| 293 | elif etype == "content_block_delta": |
| 294 | delta = data.get("delta", {}) |
| 295 | dtype = delta.get("type") |
| 296 | if dtype == "text_delta": |
| 297 | text = delta.get("text", "") |
| 298 | text_buf += text |
| 299 | print(text, end="", flush=True) |
| 300 | elif dtype == "thinking_delta": |
| 301 | text = delta.get("thinking", "") |
| 302 | think_buf += text |
| 303 | print(text, end="", flush=True) |
| 304 | elif dtype == "input_json_delta" and current: |
| 305 | json_buf += delta.get("partial_json", "") |
| 306 | |
| 307 | elif etype == "content_block_stop" and current: |
| 308 | if current["type"] == "text": |
| 309 | current["text"] = text_buf |
| 310 | print() |
| 311 | elif current["type"] == "thinking": |
| 312 | print(RESET) |
| 313 | elif current["type"] == "tool_use": |
| 314 | try: current["input"] = json.loads(json_buf) |
| 315 | except: current["input"] = {} |
| 316 | blocks.append(current) |
| 317 | current = None |
| 318 | except: pass |
| 319 | finally: |
| 320 | termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) |
| 321 | |
| 322 | return blocks |
| 323 | |
| 324 | def read_multiline_input(): |
| 325 | """Read multiline input. Enter to submit, Alt+Enter for newline.""" |
| 326 | lines, current = [], "" |
| 327 | old_settings = termios.tcgetattr(sys.stdin) |
| 328 | try: |
| 329 | tty.setcbreak(sys.stdin.fileno()) |
| 330 | print(f"{BOLD}{BLUE}❯{RESET} ", end="", flush=True) |
| 331 | while True: |
| 332 | ch = sys.stdin.read(1) |
| 333 | if ch == '\x03': # Ctrl+C - clear input |
| 334 | lines.clear() |
| 335 | current = "" |
| 336 | print("\r\033[K", end="", flush=True) |
| 337 | print(f"{BOLD}{BLUE}❯{RESET} ", end="", flush=True) |
| 338 | continue |
| 339 | if ch == '\x04': # Ctrl+D |
| 340 | raise EOFError |
| 341 | if ch == '\x1b' and sys.stdin.read(1) in ('\r', '\n'): # Alt+Enter |
| 342 | lines.append(current) |
| 343 | current = "" |
| 344 | print(f"\n{BOLD}{BLUE}│{RESET} ", end="", flush=True) |
| 345 | elif ch in ('\r', '\n'): # Enter |
| 346 | if current: lines.append(current) |
| 347 | break |
| 348 | elif ch in ('\x7f', '\x08'): # Backspace |
| 349 | if current: |
| 350 | current = current[:-1] |
| 351 | print("\r\033[K", end="", flush=True) |
| 352 | print(f"{BOLD}{BLUE}{'│' if lines else '❯'}{RESET} {current}", end="", flush=True) |
| 353 | elif ch.isprintable() or ch == '\t': |
| 354 | current += ch |
| 355 | print(ch, end="", flush=True) |
| 356 | print() |
| 357 | finally: |
| 358 | termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) |
| 359 | return "\n".join(lines).strip() |
| 360 | |
| 361 | def main(): |
| 362 | global stop_flag |
| 363 | # Disable Ctrl+C signal |
| 364 | old_settings = termios.tcgetattr(sys.stdin) |
| 365 | new_settings = termios.tcgetattr(sys.stdin) |
| 366 | new_settings[3] = new_settings[3] & ~termios.ISIG # Disable signal generation |
| 367 | termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings) |
| 368 | |
| 369 | try: |
| 370 | proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy") |
| 371 | proxy_info = f" | {DIM}🌐 {proxy}{RESET}" if proxy else "" |
| 372 | thinking_info = f" | {YELLOW}💭{RESET}" if os.environ.get("THINKING") else "" |
| 373 | print(f"{BOLD}nanocode{RESET} | {DIM}{MODEL} | {os.getcwd()}{proxy_info}{thinking_info}{RESET}") |
| 374 | print(f"{DIM}Shortcuts: Enter=submit | Alt+Enter=newline | Ctrl+C=clear | Ctrl+D=exit | ESC=stop{RESET}\n") |
| 375 | run_main_loop() |
| 376 | finally: |
| 377 | termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) |
| 378 | |
| 379 | def run_main_loop(): |
| 380 | messages = [] |
| 381 | current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| 382 | system_prompt = f"""Concise coding assistant. cwd: {os.getcwd()} Current time: {current_time} |
| 383 | IMPORTANT: When you don't have a tool for the task, ALWAYS try search_extension first before saying you can't do it. |
| 384 | Examples: |
| 385 | - User asks about GitHub repo → search_extension({{"query": "github documentation"}}) |
| 386 | - User needs web data → search_extension({{"query": "web scraping"}}) |
| 387 | - User needs API → search_extension({{"query": "api client"}})""" |
| 388 | |
| 389 | while True: |
| 390 | try: |
| 391 | print(f"{DIM}{'─'*80}{RESET}") |
| 392 | user_input = read_multiline_input() |
| 393 | print(f"{DIM}{'─'*80}{RESET}") |
| 394 | |
| 395 | if not user_input: continue |
| 396 | if user_input in ("/q", "exit"): break |
| 397 | if user_input == "/c": |
| 398 | messages = [] |
| 399 | print(f"{GREEN}⏺ Cleared{RESET}") |
| 400 | continue |
| 401 | |
| 402 | messages.append({"role": "user", "content": user_input}) |
| 403 | |
| 404 | while True: |
| 405 | stop_flag = False |
| 406 | response = call_api(messages, system_prompt) |
| 407 | blocks = process_stream(response) |
| 408 | if stop_flag: break |
| 409 | |
| 410 | tool_results = [] |
| 411 | for block in blocks: |
| 412 | if block["type"] == "tool_use": |
| 413 | name, args = block["name"], block["input"] |
| 414 | preview = str(list(args.values())[0])[:50] if args else "" |
| 415 | print(f"\n{GREEN}⏺ {name}{RESET}({DIM}{preview}{RESET})") |
| 416 | |
| 417 | result = run_tool(name, args) |
| 418 | lines = result.split("\n") |
| 419 | prev = lines[0][:60] + ("..." if len(lines[0]) > 60 else "") |
| 420 | if len(lines) > 1: prev += f" +{len(lines)-1}" |
| 421 | print(f" {DIM}⎿ {prev}{RESET}") |
| 422 | |
| 423 | tool_results.append({"type": "tool_result", "tool_use_id": block["id"], "content": result}) |
| 424 | |
| 425 | messages.append({"role": "assistant", "content": blocks}) |
| 426 | if not tool_results: break |
| 427 | messages.append({"role": "user", "content": tool_results}) |
| 428 | |
| 429 | print() |
| 430 | except EOFError: |
| 431 | break |
| 432 | except Exception as e: print(f"{RED}⏺ Error: {e}{RESET}") |
| 433 | |
| 434 | if __name__ == "__main__": |
| 435 | main() |
| 436 |