nanocode_slim.py
· 15 KiB · Python
Raw
#!/usr/bin/env python3
"""nanocode - minimal claude code alternative"""
import glob as globlib
import json
import os
import re
import readline
import select
import ssl
import subprocess
import sys
import termios
import tty
import urllib.request
import urllib.parse
OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY")
LOCAL_API_KEY = os.environ.get("LOCAL_API_KEY")
API_URL = (
"http://127.0.0.1:8990/v1/messages" if LOCAL_API_KEY
else "https://openrouter.ai/api/v1/messages" if OPENROUTER_KEY
else "https://api.anthropic.com/v1/messages"
)
MODEL = os.environ.get("MODEL",
"anthropic/claude-sonnet-4.5" if LOCAL_API_KEY
else "anthropic/claude-opus-4.5" if OPENROUTER_KEY
else "claude-opus-4-5"
)
# ANSI colors
RESET, BOLD, DIM = "\033[0m", "\033[1m", "\033[2m"
BLUE, CYAN, GREEN, YELLOW, RED = "\033[34m", "\033[36m", "\033[32m", "\033[33m", "\033[31m"
stop_flag = False
def register_tool(name, desc, params):
"""Register a tool from extension code"""
def decorator(func):
TOOLS[name] = (desc, params, func)
return func
return decorator
def search_extension(args):
"""Search extensions from gist.kitchain.cn"""
query = args.get("query", "")
if not query: return "error: query required"
try:
# Split query into keywords
keywords = query.lower().split()
gist_info = {} # {gist_path: {"hits": count, "title": str, "desc": str, "topics": []}}
# Search each keyword as a topic
for keyword in keywords:
url = f"https://gist.kitchain.cn/topics/{urllib.parse.quote(keyword)}"
html = urllib.request.urlopen(urllib.request.Request(url), timeout=10).read().decode()
# Extract gist URLs and titles
gist_matches = re.findall(
r'<a class="font-bold" href="https://gist\.kitchain\.cn/([^/]+/[a-f0-9]+)">([^<]+)</a>',
html
)
for gist_path, title in gist_matches:
if gist_path not in gist_info:
# Extract description and topics for this gist
gist_section = re.search(
rf'{re.escape(gist_path)}.*?'
r'<h6 class="text-xs[^"]*">([^<]+)</h6>(.*?)</div>\s*</div>',
html, re.DOTALL
)
desc = ""
topics = []
if gist_section:
desc = gist_section.group(1).strip()
topics_section = gist_section.group(2)
topics = re.findall(r'topics/([^"]+)"[^>]*>([^<]+)<', topics_section)
topics = [t[1] for t in topics] # Extract topic names
gist_info[gist_path] = {
"hits": 0,
"title": title.strip(),
"desc": desc,
"topics": topics,
"filename": title.strip()
}
gist_info[gist_path]["hits"] += 1
if not gist_info: return f"No extensions found: {query}"
# Sort by hit count (descending)
sorted_gists = sorted(gist_info.items(), key=lambda x: x[1]["hits"], reverse=True)[:10]
result = f"Found {len(sorted_gists)} extensions:\n\n"
for gist_path, info in sorted_gists:
result += f"• {info['title']}\n"
if info['desc']:
result += f" {info['desc']}\n"
if info['topics']:
result += f" Topics: {', '.join(info['topics'])}\n"
result += f" Matched: {info['hits']} keyword(s)\n\n"
# Return first gist's load URL
first_gist = sorted_gists[0][0]
first_filename = sorted_gists[0][1]['filename']
result += f"To load the top result:\nload({{\"url\": \"https://gist.kitchain.cn/{first_gist}/raw/HEAD/{first_filename}\"}})"
return result
except Exception as e:
return f"error: {e}"
def load(args):
"""Load extension from URL"""
url = args.get("url")
if not url: return "error: url required"
try:
code = urllib.request.urlopen(urllib.request.Request(url), timeout=10).read().decode()
exec(code, {"register_tool": register_tool, "TOOLS": TOOLS, "urllib": urllib, "json": json, "re": re, "subprocess": subprocess})
new = [k for k in TOOLS if k not in ["read","write","edit","glob","grep","bash","search_extension","load"]]
return f"Loaded. New tools: {', '.join(new)}"
except Exception as e:
return f"error: {e}"
# --- Tools ---
def read(args):
lines = open(args["path"]).readlines()
offset, limit = args.get("offset", 0), args.get("limit", len(lines))
return "".join(f"{offset+i+1:4}| {l}" for i, l in enumerate(lines[offset:offset+limit]))
def write(args):
open(args["path"], "w").write(args["content"])
return "ok"
def edit(args):
text = open(args["path"]).read()
old, new = args["old"], args["new"]
if old not in text: return "error: old_string not found"
count = text.count(old)
if not args.get("all") and count > 1:
return f"error: old_string appears {count} times (use all=true)"
open(args["path"], "w").write(text.replace(old, new) if args.get("all") else text.replace(old, new, 1))
return "ok"
def glob(args):
pattern = (args.get("path", ".") + "/" + args["pat"]).replace("//", "/")
files = sorted(globlib.glob(pattern, recursive=True),
key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0, reverse=True)
return "\n".join(files) or "none"
def grep(args):
pattern, hits = re.compile(args["pat"]), []
for fp in globlib.glob(args.get("path", ".") + "/**", recursive=True):
try:
for n, l in enumerate(open(fp), 1):
if pattern.search(l): hits.append(f"{fp}:{n}:{l.rstrip()}")
except: pass
return "\n".join(hits[:50]) or "none"
def bash(args):
proc = subprocess.Popen(args["cmd"], shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, text=True)
lines = []
try:
if proc.stdout:
while True:
line = proc.stdout.readline()
if not line and proc.poll() is not None: break
if line:
print(f" {DIM}│ {line.rstrip()}{RESET}", flush=True)
lines.append(line)
proc.wait(timeout=30)
except subprocess.TimeoutExpired:
proc.kill()
lines.append("\n(timeout)")
return "".join(lines).strip() or "(empty)"
TOOLS = {
"read": ("Read file with line numbers", {"path": "string", "offset": "number?", "limit": "number?"}, read),
"write": ("Write content to file", {"path": "string", "content": "string"}, write),
"edit": ("Replace old with new in file", {"path": "string", "old": "string", "new": "string", "all": "boolean?"}, edit),
"glob": ("Find files by pattern", {"pat": "string", "path": "string?"}, glob),
"grep": ("Search files for regex", {"pat": "string", "path": "string?"}, grep),
"bash": ("Run shell command", {"cmd": "string"}, bash),
"search_extension": ("Search for extensions to add new capabilities (GitHub docs, web scraping, APIs, etc)", {"query": "string"}, search_extension),
"load": ("Load extension from URL to add new tools", {"url": "string"}, load),
}
def run_tool(name, args):
try: return TOOLS[name][2](args)
except Exception as e: return f"error: {e}"
def make_schema():
result = []
for name, (desc, params, _) in TOOLS.items():
props, req = {}, []
for pname, ptype in params.items():
opt = ptype.endswith("?")
props[pname] = {"type": "integer" if ptype.rstrip("?") == "number" else ptype.rstrip("?")}
if not opt: req.append(pname)
result.append({"name": name, "description": desc,
"input_schema": {"type": "object", "properties": props, "required": req}})
return result
def call_api(messages, system_prompt):
headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01"}
if LOCAL_API_KEY: headers["Authorization"] = f"Bearer {LOCAL_API_KEY}"
elif OPENROUTER_KEY: headers["Authorization"] = f"Bearer {OPENROUTER_KEY}"
else: headers["x-api-key"] = os.environ.get("ANTHROPIC_API_KEY", "")
data = {"model": MODEL, "max_tokens": 8192, "system": system_prompt,
"messages": messages, "tools": make_schema(), "stream": True}
if os.environ.get("THINKING"):
data["thinking"] = {"type": "enabled", "budget_tokens": int(os.environ.get("THINKING_BUDGET", "10000"))}
proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
handlers = [urllib.request.HTTPSHandler(context=ssl_ctx)]
if proxy: handlers.insert(0, urllib.request.ProxyHandler({"http": proxy, "https": proxy}))
req = urllib.request.Request(API_URL, json.dumps(data).encode(), headers, method="POST")
return urllib.request.build_opener(*handlers).open(req)
def process_stream(response):
"""简化的流式处理,支持ESC中断"""
global stop_flag
blocks, current, text_buf, json_buf, think_buf = [], None, "", "", ""
# Save terminal settings
old_settings = termios.tcgetattr(sys.stdin)
try:
tty.setcbreak(sys.stdin.fileno())
for line in response:
if select.select([sys.stdin], [], [], 0)[0]:
ch = sys.stdin.read(1)
if ch == '\x1b': # ESC key
stop_flag = True
print(f"\n{YELLOW}⏸ Stopped{RESET}")
break
line = line.decode("utf-8").strip()
if not line.startswith("data: "): continue
if line == "data: [DONE]": continue
try:
data = json.loads(line[6:])
etype = data.get("type")
if etype == "content_block_start":
block = data.get("content_block", {})
current = {"type": block.get("type"), "id": block.get("id")}
if current["type"] == "text":
text_buf = ""
print(f"\n{CYAN}⏺{RESET} ", end="", flush=True)
elif current["type"] == "thinking":
think_buf = ""
print(f"\n{YELLOW}💭{RESET} {DIM}", end="", flush=True)
elif current["type"] == "tool_use":
current["name"] = block.get("name")
json_buf = ""
elif etype == "content_block_delta":
delta = data.get("delta", {})
dtype = delta.get("type")
if dtype == "text_delta":
text = delta.get("text", "")
text_buf += text
print(text, end="", flush=True)
elif dtype == "thinking_delta":
text = delta.get("thinking", "")
think_buf += text
print(text, end="", flush=True)
elif dtype == "input_json_delta" and current:
json_buf += delta.get("partial_json", "")
elif etype == "content_block_stop" and current:
if current["type"] == "text":
current["text"] = text_buf
print()
elif current["type"] == "thinking":
print(RESET)
elif current["type"] == "tool_use":
try: current["input"] = json.loads(json_buf)
except: current["input"] = {}
blocks.append(current)
current = None
except: pass
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
return blocks
def main():
global stop_flag
proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
proxy_info = f" | {DIM}🌐 {proxy}{RESET}" if proxy else ""
thinking_info = f" | {YELLOW}💭{RESET}" if os.environ.get("THINKING") else ""
print(f"{BOLD}nanocode{RESET} | {DIM}{MODEL} | {os.getcwd()}{proxy_info}{thinking_info} | ESC=stop{RESET}\n")
messages = []
system_prompt = f"""Concise coding assistant. cwd: {os.getcwd()}
IMPORTANT: When you don't have a tool for the task, ALWAYS try search_extension first before saying you can't do it.
Examples:
- User asks about GitHub repo → search_extension({{"query": "github documentation"}})
- User needs web data → search_extension({{"query": "web scraping"}})
- User needs API → search_extension({{"query": "api client"}})"""
while True:
try:
print(f"{DIM}{'─'*80}{RESET}")
user_input = input(f"{BOLD}{BLUE}❯{RESET} ").strip()
print(f"{DIM}{'─'*80}{RESET}")
if not user_input: continue
if user_input in ("/q", "exit"): break
if user_input == "/c":
messages = []
print(f"{GREEN}⏺ Cleared{RESET}")
continue
messages.append({"role": "user", "content": user_input})
while True:
stop_flag = False
response = call_api(messages, system_prompt)
blocks = process_stream(response)
if stop_flag: break
tool_results = []
for block in blocks:
if block["type"] == "tool_use":
name, args = block["name"], block["input"]
preview = str(list(args.values())[0])[:50] if args else ""
print(f"\n{GREEN}⏺ {name}{RESET}({DIM}{preview}{RESET})")
result = run_tool(name, args)
lines = result.split("\n")
prev = lines[0][:60] + ("..." if len(lines[0]) > 60 else "")
if len(lines) > 1: prev += f" +{len(lines)-1}"
print(f" {DIM}⎿ {prev}{RESET}")
tool_results.append({"type": "tool_result", "tool_use_id": block["id"], "content": result})
messages.append({"role": "assistant", "content": blocks})
if not tool_results: break
messages.append({"role": "user", "content": tool_results})
print()
except (KeyboardInterrupt, EOFError): break
except Exception as e: print(f"{RED}⏺ Error: {e}{RESET}")
if __name__ == "__main__":
main()
| 1 | #!/usr/bin/env python3 |
| 2 | """nanocode - minimal claude code alternative""" |
| 3 | import glob as globlib |
| 4 | import json |
| 5 | import os |
| 6 | import re |
| 7 | import readline |
| 8 | import select |
| 9 | import ssl |
| 10 | import subprocess |
| 11 | import sys |
| 12 | import termios |
| 13 | import tty |
| 14 | import urllib.request |
| 15 | import urllib.parse |
| 16 | |
| 17 | OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY") |
| 18 | LOCAL_API_KEY = os.environ.get("LOCAL_API_KEY") |
| 19 | API_URL = ( |
| 20 | "http://127.0.0.1:8990/v1/messages" if LOCAL_API_KEY |
| 21 | else "https://openrouter.ai/api/v1/messages" if OPENROUTER_KEY |
| 22 | else "https://api.anthropic.com/v1/messages" |
| 23 | ) |
| 24 | MODEL = os.environ.get("MODEL", |
| 25 | "anthropic/claude-sonnet-4.5" if LOCAL_API_KEY |
| 26 | else "anthropic/claude-opus-4.5" if OPENROUTER_KEY |
| 27 | else "claude-opus-4-5" |
| 28 | ) |
| 29 | |
| 30 | # ANSI colors |
| 31 | RESET, BOLD, DIM = "\033[0m", "\033[1m", "\033[2m" |
| 32 | BLUE, CYAN, GREEN, YELLOW, RED = "\033[34m", "\033[36m", "\033[32m", "\033[33m", "\033[31m" |
| 33 | stop_flag = False |
| 34 | |
| 35 | def register_tool(name, desc, params): |
| 36 | """Register a tool from extension code""" |
| 37 | def decorator(func): |
| 38 | TOOLS[name] = (desc, params, func) |
| 39 | return func |
| 40 | return decorator |
| 41 | |
| 42 | def search_extension(args): |
| 43 | """Search extensions from gist.kitchain.cn""" |
| 44 | query = args.get("query", "") |
| 45 | if not query: return "error: query required" |
| 46 | try: |
| 47 | # Split query into keywords |
| 48 | keywords = query.lower().split() |
| 49 | gist_info = {} # {gist_path: {"hits": count, "title": str, "desc": str, "topics": []}} |
| 50 | |
| 51 | # Search each keyword as a topic |
| 52 | for keyword in keywords: |
| 53 | url = f"https://gist.kitchain.cn/topics/{urllib.parse.quote(keyword)}" |
| 54 | html = urllib.request.urlopen(urllib.request.Request(url), timeout=10).read().decode() |
| 55 | |
| 56 | # Extract gist URLs and titles |
| 57 | gist_matches = re.findall( |
| 58 | r'<a class="font-bold" href="https://gist\.kitchain\.cn/([^/]+/[a-f0-9]+)">([^<]+)</a>', |
| 59 | html |
| 60 | ) |
| 61 | |
| 62 | for gist_path, title in gist_matches: |
| 63 | if gist_path not in gist_info: |
| 64 | # Extract description and topics for this gist |
| 65 | gist_section = re.search( |
| 66 | rf'{re.escape(gist_path)}.*?' |
| 67 | r'<h6 class="text-xs[^"]*">([^<]+)</h6>(.*?)</div>\s*</div>', |
| 68 | html, re.DOTALL |
| 69 | ) |
| 70 | desc = "" |
| 71 | topics = [] |
| 72 | if gist_section: |
| 73 | desc = gist_section.group(1).strip() |
| 74 | topics_section = gist_section.group(2) |
| 75 | topics = re.findall(r'topics/([^"]+)"[^>]*>([^<]+)<', topics_section) |
| 76 | topics = [t[1] for t in topics] # Extract topic names |
| 77 | |
| 78 | gist_info[gist_path] = { |
| 79 | "hits": 0, |
| 80 | "title": title.strip(), |
| 81 | "desc": desc, |
| 82 | "topics": topics, |
| 83 | "filename": title.strip() |
| 84 | } |
| 85 | gist_info[gist_path]["hits"] += 1 |
| 86 | |
| 87 | if not gist_info: return f"No extensions found: {query}" |
| 88 | |
| 89 | # Sort by hit count (descending) |
| 90 | sorted_gists = sorted(gist_info.items(), key=lambda x: x[1]["hits"], reverse=True)[:10] |
| 91 | |
| 92 | result = f"Found {len(sorted_gists)} extensions:\n\n" |
| 93 | for gist_path, info in sorted_gists: |
| 94 | result += f"• {info['title']}\n" |
| 95 | if info['desc']: |
| 96 | result += f" {info['desc']}\n" |
| 97 | if info['topics']: |
| 98 | result += f" Topics: {', '.join(info['topics'])}\n" |
| 99 | result += f" Matched: {info['hits']} keyword(s)\n\n" |
| 100 | |
| 101 | # Return first gist's load URL |
| 102 | first_gist = sorted_gists[0][0] |
| 103 | first_filename = sorted_gists[0][1]['filename'] |
| 104 | result += f"To load the top result:\nload({{\"url\": \"https://gist.kitchain.cn/{first_gist}/raw/HEAD/{first_filename}\"}})" |
| 105 | return result |
| 106 | except Exception as e: |
| 107 | return f"error: {e}" |
| 108 | |
| 109 | def load(args): |
| 110 | """Load extension from URL""" |
| 111 | url = args.get("url") |
| 112 | if not url: return "error: url required" |
| 113 | try: |
| 114 | code = urllib.request.urlopen(urllib.request.Request(url), timeout=10).read().decode() |
| 115 | exec(code, {"register_tool": register_tool, "TOOLS": TOOLS, "urllib": urllib, "json": json, "re": re, "subprocess": subprocess}) |
| 116 | new = [k for k in TOOLS if k not in ["read","write","edit","glob","grep","bash","search_extension","load"]] |
| 117 | return f"Loaded. New tools: {', '.join(new)}" |
| 118 | except Exception as e: |
| 119 | return f"error: {e}" |
| 120 | |
| 121 | # --- Tools --- |
| 122 | def read(args): |
| 123 | lines = open(args["path"]).readlines() |
| 124 | offset, limit = args.get("offset", 0), args.get("limit", len(lines)) |
| 125 | return "".join(f"{offset+i+1:4}| {l}" for i, l in enumerate(lines[offset:offset+limit])) |
| 126 | |
| 127 | def write(args): |
| 128 | open(args["path"], "w").write(args["content"]) |
| 129 | return "ok" |
| 130 | |
| 131 | def edit(args): |
| 132 | text = open(args["path"]).read() |
| 133 | old, new = args["old"], args["new"] |
| 134 | if old not in text: return "error: old_string not found" |
| 135 | count = text.count(old) |
| 136 | if not args.get("all") and count > 1: |
| 137 | return f"error: old_string appears {count} times (use all=true)" |
| 138 | open(args["path"], "w").write(text.replace(old, new) if args.get("all") else text.replace(old, new, 1)) |
| 139 | return "ok" |
| 140 | |
| 141 | def glob(args): |
| 142 | pattern = (args.get("path", ".") + "/" + args["pat"]).replace("//", "/") |
| 143 | files = sorted(globlib.glob(pattern, recursive=True), |
| 144 | key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0, reverse=True) |
| 145 | return "\n".join(files) or "none" |
| 146 | |
| 147 | def grep(args): |
| 148 | pattern, hits = re.compile(args["pat"]), [] |
| 149 | for fp in globlib.glob(args.get("path", ".") + "/**", recursive=True): |
| 150 | try: |
| 151 | for n, l in enumerate(open(fp), 1): |
| 152 | if pattern.search(l): hits.append(f"{fp}:{n}:{l.rstrip()}") |
| 153 | except: pass |
| 154 | return "\n".join(hits[:50]) or "none" |
| 155 | |
| 156 | def bash(args): |
| 157 | proc = subprocess.Popen(args["cmd"], shell=True, stdout=subprocess.PIPE, |
| 158 | stderr=subprocess.STDOUT, text=True) |
| 159 | lines = [] |
| 160 | try: |
| 161 | if proc.stdout: |
| 162 | while True: |
| 163 | line = proc.stdout.readline() |
| 164 | if not line and proc.poll() is not None: break |
| 165 | if line: |
| 166 | print(f" {DIM}│ {line.rstrip()}{RESET}", flush=True) |
| 167 | lines.append(line) |
| 168 | proc.wait(timeout=30) |
| 169 | except subprocess.TimeoutExpired: |
| 170 | proc.kill() |
| 171 | lines.append("\n(timeout)") |
| 172 | return "".join(lines).strip() or "(empty)" |
| 173 | |
| 174 | TOOLS = { |
| 175 | "read": ("Read file with line numbers", {"path": "string", "offset": "number?", "limit": "number?"}, read), |
| 176 | "write": ("Write content to file", {"path": "string", "content": "string"}, write), |
| 177 | "edit": ("Replace old with new in file", {"path": "string", "old": "string", "new": "string", "all": "boolean?"}, edit), |
| 178 | "glob": ("Find files by pattern", {"pat": "string", "path": "string?"}, glob), |
| 179 | "grep": ("Search files for regex", {"pat": "string", "path": "string?"}, grep), |
| 180 | "bash": ("Run shell command", {"cmd": "string"}, bash), |
| 181 | "search_extension": ("Search for extensions to add new capabilities (GitHub docs, web scraping, APIs, etc)", {"query": "string"}, search_extension), |
| 182 | "load": ("Load extension from URL to add new tools", {"url": "string"}, load), |
| 183 | } |
| 184 | |
| 185 | def run_tool(name, args): |
| 186 | try: return TOOLS[name][2](args) |
| 187 | except Exception as e: return f"error: {e}" |
| 188 | |
| 189 | def make_schema(): |
| 190 | result = [] |
| 191 | for name, (desc, params, _) in TOOLS.items(): |
| 192 | props, req = {}, [] |
| 193 | for pname, ptype in params.items(): |
| 194 | opt = ptype.endswith("?") |
| 195 | props[pname] = {"type": "integer" if ptype.rstrip("?") == "number" else ptype.rstrip("?")} |
| 196 | if not opt: req.append(pname) |
| 197 | result.append({"name": name, "description": desc, |
| 198 | "input_schema": {"type": "object", "properties": props, "required": req}}) |
| 199 | return result |
| 200 | |
| 201 | def call_api(messages, system_prompt): |
| 202 | headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01"} |
| 203 | if LOCAL_API_KEY: headers["Authorization"] = f"Bearer {LOCAL_API_KEY}" |
| 204 | elif OPENROUTER_KEY: headers["Authorization"] = f"Bearer {OPENROUTER_KEY}" |
| 205 | else: headers["x-api-key"] = os.environ.get("ANTHROPIC_API_KEY", "") |
| 206 | |
| 207 | data = {"model": MODEL, "max_tokens": 8192, "system": system_prompt, |
| 208 | "messages": messages, "tools": make_schema(), "stream": True} |
| 209 | |
| 210 | if os.environ.get("THINKING"): |
| 211 | data["thinking"] = {"type": "enabled", "budget_tokens": int(os.environ.get("THINKING_BUDGET", "10000"))} |
| 212 | |
| 213 | proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy") |
| 214 | ssl_ctx = ssl.create_default_context() |
| 215 | ssl_ctx.check_hostname = False |
| 216 | ssl_ctx.verify_mode = ssl.CERT_NONE |
| 217 | |
| 218 | handlers = [urllib.request.HTTPSHandler(context=ssl_ctx)] |
| 219 | if proxy: handlers.insert(0, urllib.request.ProxyHandler({"http": proxy, "https": proxy})) |
| 220 | |
| 221 | req = urllib.request.Request(API_URL, json.dumps(data).encode(), headers, method="POST") |
| 222 | return urllib.request.build_opener(*handlers).open(req) |
| 223 | |
| 224 | def process_stream(response): |
| 225 | """简化的流式处理,支持ESC中断""" |
| 226 | global stop_flag |
| 227 | blocks, current, text_buf, json_buf, think_buf = [], None, "", "", "" |
| 228 | |
| 229 | # Save terminal settings |
| 230 | old_settings = termios.tcgetattr(sys.stdin) |
| 231 | try: |
| 232 | tty.setcbreak(sys.stdin.fileno()) |
| 233 | |
| 234 | for line in response: |
| 235 | if select.select([sys.stdin], [], [], 0)[0]: |
| 236 | ch = sys.stdin.read(1) |
| 237 | if ch == '\x1b': # ESC key |
| 238 | stop_flag = True |
| 239 | print(f"\n{YELLOW}⏸ Stopped{RESET}") |
| 240 | break |
| 241 | |
| 242 | line = line.decode("utf-8").strip() |
| 243 | if not line.startswith("data: "): continue |
| 244 | if line == "data: [DONE]": continue |
| 245 | |
| 246 | try: |
| 247 | data = json.loads(line[6:]) |
| 248 | etype = data.get("type") |
| 249 | |
| 250 | if etype == "content_block_start": |
| 251 | block = data.get("content_block", {}) |
| 252 | current = {"type": block.get("type"), "id": block.get("id")} |
| 253 | if current["type"] == "text": |
| 254 | text_buf = "" |
| 255 | print(f"\n{CYAN}⏺{RESET} ", end="", flush=True) |
| 256 | elif current["type"] == "thinking": |
| 257 | think_buf = "" |
| 258 | print(f"\n{YELLOW}💭{RESET} {DIM}", end="", flush=True) |
| 259 | elif current["type"] == "tool_use": |
| 260 | current["name"] = block.get("name") |
| 261 | json_buf = "" |
| 262 | |
| 263 | elif etype == "content_block_delta": |
| 264 | delta = data.get("delta", {}) |
| 265 | dtype = delta.get("type") |
| 266 | if dtype == "text_delta": |
| 267 | text = delta.get("text", "") |
| 268 | text_buf += text |
| 269 | print(text, end="", flush=True) |
| 270 | elif dtype == "thinking_delta": |
| 271 | text = delta.get("thinking", "") |
| 272 | think_buf += text |
| 273 | print(text, end="", flush=True) |
| 274 | elif dtype == "input_json_delta" and current: |
| 275 | json_buf += delta.get("partial_json", "") |
| 276 | |
| 277 | elif etype == "content_block_stop" and current: |
| 278 | if current["type"] == "text": |
| 279 | current["text"] = text_buf |
| 280 | print() |
| 281 | elif current["type"] == "thinking": |
| 282 | print(RESET) |
| 283 | elif current["type"] == "tool_use": |
| 284 | try: current["input"] = json.loads(json_buf) |
| 285 | except: current["input"] = {} |
| 286 | blocks.append(current) |
| 287 | current = None |
| 288 | except: pass |
| 289 | finally: |
| 290 | termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) |
| 291 | |
| 292 | return blocks |
| 293 | |
| 294 | def main(): |
| 295 | global stop_flag |
| 296 | proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy") |
| 297 | proxy_info = f" | {DIM}🌐 {proxy}{RESET}" if proxy else "" |
| 298 | thinking_info = f" | {YELLOW}💭{RESET}" if os.environ.get("THINKING") else "" |
| 299 | print(f"{BOLD}nanocode{RESET} | {DIM}{MODEL} | {os.getcwd()}{proxy_info}{thinking_info} | ESC=stop{RESET}\n") |
| 300 | |
| 301 | messages = [] |
| 302 | system_prompt = f"""Concise coding assistant. cwd: {os.getcwd()} |
| 303 | |
| 304 | IMPORTANT: When you don't have a tool for the task, ALWAYS try search_extension first before saying you can't do it. |
| 305 | |
| 306 | Examples: |
| 307 | - User asks about GitHub repo → search_extension({{"query": "github documentation"}}) |
| 308 | - User needs web data → search_extension({{"query": "web scraping"}}) |
| 309 | - User needs API → search_extension({{"query": "api client"}})""" |
| 310 | |
| 311 | while True: |
| 312 | try: |
| 313 | print(f"{DIM}{'─'*80}{RESET}") |
| 314 | user_input = input(f"{BOLD}{BLUE}❯{RESET} ").strip() |
| 315 | print(f"{DIM}{'─'*80}{RESET}") |
| 316 | |
| 317 | if not user_input: continue |
| 318 | if user_input in ("/q", "exit"): break |
| 319 | if user_input == "/c": |
| 320 | messages = [] |
| 321 | print(f"{GREEN}⏺ Cleared{RESET}") |
| 322 | continue |
| 323 | |
| 324 | messages.append({"role": "user", "content": user_input}) |
| 325 | |
| 326 | while True: |
| 327 | stop_flag = False |
| 328 | response = call_api(messages, system_prompt) |
| 329 | blocks = process_stream(response) |
| 330 | if stop_flag: break |
| 331 | |
| 332 | tool_results = [] |
| 333 | for block in blocks: |
| 334 | if block["type"] == "tool_use": |
| 335 | name, args = block["name"], block["input"] |
| 336 | preview = str(list(args.values())[0])[:50] if args else "" |
| 337 | print(f"\n{GREEN}⏺ {name}{RESET}({DIM}{preview}{RESET})") |
| 338 | |
| 339 | result = run_tool(name, args) |
| 340 | lines = result.split("\n") |
| 341 | prev = lines[0][:60] + ("..." if len(lines[0]) > 60 else "") |
| 342 | if len(lines) > 1: prev += f" +{len(lines)-1}" |
| 343 | print(f" {DIM}⎿ {prev}{RESET}") |
| 344 | |
| 345 | tool_results.append({"type": "tool_result", "tool_use_id": block["id"], "content": result}) |
| 346 | |
| 347 | messages.append({"role": "assistant", "content": blocks}) |
| 348 | if not tool_results: break |
| 349 | messages.append({"role": "user", "content": tool_results}) |
| 350 | |
| 351 | print() |
| 352 | except (KeyboardInterrupt, EOFError): break |
| 353 | except Exception as e: print(f"{RED}⏺ Error: {e}{RESET}") |
| 354 | |
| 355 | if __name__ == "__main__": |
| 356 | main() |
| 357 |