Last active 2 months ago

Revision 8a4fb76e1aea7950972bdb86587eebc13c575ee9

nanocode_slim.py Raw
1#!/usr/bin/env python3
2"""nanocode - minimal claude code alternative"""
3import glob as globlib
4import json
5import os
6import re
7import readline
8import select
9import ssl
10import subprocess
11import sys
12import termios
13import tty
14import urllib.request
15import urllib.parse
16
17OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY")
18LOCAL_API_KEY = os.environ.get("LOCAL_API_KEY")
19API_URL = (
20 "http://127.0.0.1:8990/v1/messages" if LOCAL_API_KEY
21 else "https://openrouter.ai/api/v1/messages" if OPENROUTER_KEY
22 else "https://api.anthropic.com/v1/messages"
23)
24MODEL = os.environ.get("MODEL",
25 "anthropic/claude-sonnet-4.5" if LOCAL_API_KEY
26 else "anthropic/claude-opus-4.5" if OPENROUTER_KEY
27 else "claude-opus-4-5"
28)
29
30# ANSI colors
31RESET, BOLD, DIM = "\033[0m", "\033[1m", "\033[2m"
32BLUE, CYAN, GREEN, YELLOW, RED = "\033[34m", "\033[36m", "\033[32m", "\033[33m", "\033[31m"
33stop_flag = False
34
35def register_tool(name, desc, params):
36 """Register a tool from extension code"""
37 def decorator(func):
38 TOOLS[name] = (desc, params, func)
39 return func
40 return decorator
41
42def search_extension(args):
43 """Search extensions from gist.kitchain.cn"""
44 query = args.get("query", "")
45 if not query: return "error: query required"
46 try:
47 # Split query into keywords
48 keywords = query.lower().split()
49 gist_info = {} # {gist_path: {"hits": count, "title": str, "desc": str, "topics": []}}
50
51 # Search each keyword as a topic
52 for keyword in keywords:
53 url = f"https://gist.kitchain.cn/topics/{urllib.parse.quote(keyword)}"
54 html = urllib.request.urlopen(urllib.request.Request(url), timeout=10).read().decode()
55
56 # Extract gist URLs and titles
57 gist_matches = re.findall(
58 r'<a class="font-bold" href="https://gist\.kitchain\.cn/([^/]+/[a-f0-9]+)">([^<]+)</a>',
59 html
60 )
61
62 for gist_path, title in gist_matches:
63 if gist_path not in gist_info:
64 # Extract description and topics for this gist
65 gist_section = re.search(
66 rf'{re.escape(gist_path)}.*?'
67 r'<h6 class="text-xs[^"]*">([^<]+)</h6>(.*?)</div>\s*</div>',
68 html, re.DOTALL
69 )
70 desc = ""
71 topics = []
72 if gist_section:
73 desc = gist_section.group(1).strip()
74 topics_section = gist_section.group(2)
75 topics = re.findall(r'topics/([^"]+)"[^>]*>([^<]+)<', topics_section)
76 topics = [t[1] for t in topics] # Extract topic names
77
78 gist_info[gist_path] = {
79 "hits": 0,
80 "title": title.strip(),
81 "desc": desc,
82 "topics": topics,
83 "filename": title.strip()
84 }
85 gist_info[gist_path]["hits"] += 1
86
87 if not gist_info: return f"No extensions found: {query}"
88
89 # Sort by hit count (descending)
90 sorted_gists = sorted(gist_info.items(), key=lambda x: x[1]["hits"], reverse=True)[:10]
91
92 result = f"Found {len(sorted_gists)} extensions:\n\n"
93 for gist_path, info in sorted_gists:
94 result += f"{info['title']}\n"
95 if info['desc']:
96 result += f" {info['desc']}\n"
97 if info['topics']:
98 result += f" Topics: {', '.join(info['topics'])}\n"
99 result += f" Matched: {info['hits']} keyword(s)\n\n"
100
101 # Return first gist's load URL
102 first_gist = sorted_gists[0][0]
103 first_filename = sorted_gists[0][1]['filename']
104 result += f"To load the top result:\nload({{\"url\": \"https://gist.kitchain.cn/{first_gist}/raw/HEAD/{first_filename}\"}})"
105 return result
106 except Exception as e:
107 return f"error: {e}"
108
109def load(args):
110 """Load extension from URL"""
111 url = args.get("url")
112 if not url: return "error: url required"
113 try:
114 code = urllib.request.urlopen(urllib.request.Request(url), timeout=10).read().decode()
115 exec(code, {"register_tool": register_tool, "TOOLS": TOOLS, "urllib": urllib, "json": json, "re": re, "subprocess": subprocess})
116 new = [k for k in TOOLS if k not in ["read","write","edit","glob","grep","bash","search_extension","load"]]
117 return f"Loaded. New tools: {', '.join(new)}"
118 except Exception as e:
119 return f"error: {e}"
120
121# --- Tools ---
122def read(args):
123 lines = open(args["path"]).readlines()
124 offset, limit = args.get("offset", 0), args.get("limit", len(lines))
125 return "".join(f"{offset+i+1:4}| {l}" for i, l in enumerate(lines[offset:offset+limit]))
126
127def write(args):
128 open(args["path"], "w").write(args["content"])
129 return "ok"
130
131def edit(args):
132 text = open(args["path"]).read()
133 old, new = args["old"], args["new"]
134 if old not in text: return "error: old_string not found"
135 count = text.count(old)
136 if not args.get("all") and count > 1:
137 return f"error: old_string appears {count} times (use all=true)"
138 open(args["path"], "w").write(text.replace(old, new) if args.get("all") else text.replace(old, new, 1))
139 return "ok"
140
141def glob(args):
142 pattern = (args.get("path", ".") + "/" + args["pat"]).replace("//", "/")
143 files = sorted(globlib.glob(pattern, recursive=True),
144 key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0, reverse=True)
145 return "\n".join(files) or "none"
146
147def grep(args):
148 pattern, hits = re.compile(args["pat"]), []
149 for fp in globlib.glob(args.get("path", ".") + "/**", recursive=True):
150 try:
151 for n, l in enumerate(open(fp), 1):
152 if pattern.search(l): hits.append(f"{fp}:{n}:{l.rstrip()}")
153 except: pass
154 return "\n".join(hits[:50]) or "none"
155
156def bash(args):
157 proc = subprocess.Popen(args["cmd"], shell=True, stdout=subprocess.PIPE,
158 stderr=subprocess.STDOUT, text=True)
159 lines = []
160 try:
161 if proc.stdout:
162 while True:
163 line = proc.stdout.readline()
164 if not line and proc.poll() is not None: break
165 if line:
166 print(f" {DIM}{line.rstrip()}{RESET}", flush=True)
167 lines.append(line)
168 proc.wait(timeout=30)
169 except subprocess.TimeoutExpired:
170 proc.kill()
171 lines.append("\n(timeout)")
172 return "".join(lines).strip() or "(empty)"
173
174TOOLS = {
175 "read": ("Read file with line numbers", {"path": "string", "offset": "number?", "limit": "number?"}, read),
176 "write": ("Write content to file", {"path": "string", "content": "string"}, write),
177 "edit": ("Replace old with new in file", {"path": "string", "old": "string", "new": "string", "all": "boolean?"}, edit),
178 "glob": ("Find files by pattern", {"pat": "string", "path": "string?"}, glob),
179 "grep": ("Search files for regex", {"pat": "string", "path": "string?"}, grep),
180 "bash": ("Run shell command", {"cmd": "string"}, bash),
181 "search_extension": ("Search for extensions to add new capabilities (GitHub docs, web scraping, APIs, etc)", {"query": "string"}, search_extension),
182 "load": ("Load extension from URL to add new tools", {"url": "string"}, load),
183}
184
185def run_tool(name, args):
186 try: return TOOLS[name][2](args)
187 except Exception as e: return f"error: {e}"
188
189def make_schema():
190 result = []
191 for name, (desc, params, _) in TOOLS.items():
192 props, req = {}, []
193 for pname, ptype in params.items():
194 opt = ptype.endswith("?")
195 props[pname] = {"type": "integer" if ptype.rstrip("?") == "number" else ptype.rstrip("?")}
196 if not opt: req.append(pname)
197 result.append({"name": name, "description": desc,
198 "input_schema": {"type": "object", "properties": props, "required": req}})
199 return result
200
201def call_api(messages, system_prompt):
202 headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01"}
203 if LOCAL_API_KEY: headers["Authorization"] = f"Bearer {LOCAL_API_KEY}"
204 elif OPENROUTER_KEY: headers["Authorization"] = f"Bearer {OPENROUTER_KEY}"
205 else: headers["x-api-key"] = os.environ.get("ANTHROPIC_API_KEY", "")
206
207 data = {"model": MODEL, "max_tokens": 8192, "system": system_prompt,
208 "messages": messages, "tools": make_schema(), "stream": True}
209
210 if os.environ.get("THINKING"):
211 data["thinking"] = {"type": "enabled", "budget_tokens": int(os.environ.get("THINKING_BUDGET", "10000"))}
212
213 proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
214 ssl_ctx = ssl.create_default_context()
215 ssl_ctx.check_hostname = False
216 ssl_ctx.verify_mode = ssl.CERT_NONE
217
218 handlers = [urllib.request.HTTPSHandler(context=ssl_ctx)]
219 if proxy: handlers.insert(0, urllib.request.ProxyHandler({"http": proxy, "https": proxy}))
220
221 req = urllib.request.Request(API_URL, json.dumps(data).encode(), headers, method="POST")
222 return urllib.request.build_opener(*handlers).open(req)
223
224def process_stream(response):
225 """简化的流式处理,支持ESC中断"""
226 global stop_flag
227 blocks, current, text_buf, json_buf, think_buf = [], None, "", "", ""
228
229 # Save terminal settings
230 old_settings = termios.tcgetattr(sys.stdin)
231 try:
232 tty.setcbreak(sys.stdin.fileno())
233
234 for line in response:
235 if select.select([sys.stdin], [], [], 0)[0]:
236 ch = sys.stdin.read(1)
237 if ch == '\x1b': # ESC key
238 stop_flag = True
239 print(f"\n{YELLOW}⏸ Stopped{RESET}")
240 break
241
242 line = line.decode("utf-8").strip()
243 if not line.startswith("data: "): continue
244 if line == "data: [DONE]": continue
245
246 try:
247 data = json.loads(line[6:])
248 etype = data.get("type")
249
250 if etype == "content_block_start":
251 block = data.get("content_block", {})
252 current = {"type": block.get("type"), "id": block.get("id")}
253 if current["type"] == "text":
254 text_buf = ""
255 print(f"\n{CYAN}{RESET} ", end="", flush=True)
256 elif current["type"] == "thinking":
257 think_buf = ""
258 print(f"\n{YELLOW}💭{RESET} {DIM}", end="", flush=True)
259 elif current["type"] == "tool_use":
260 current["name"] = block.get("name")
261 json_buf = ""
262
263 elif etype == "content_block_delta":
264 delta = data.get("delta", {})
265 dtype = delta.get("type")
266 if dtype == "text_delta":
267 text = delta.get("text", "")
268 text_buf += text
269 print(text, end="", flush=True)
270 elif dtype == "thinking_delta":
271 text = delta.get("thinking", "")
272 think_buf += text
273 print(text, end="", flush=True)
274 elif dtype == "input_json_delta" and current:
275 json_buf += delta.get("partial_json", "")
276
277 elif etype == "content_block_stop" and current:
278 if current["type"] == "text":
279 current["text"] = text_buf
280 print()
281 elif current["type"] == "thinking":
282 print(RESET)
283 elif current["type"] == "tool_use":
284 try: current["input"] = json.loads(json_buf)
285 except: current["input"] = {}
286 blocks.append(current)
287 current = None
288 except: pass
289 finally:
290 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
291
292 return blocks
293
294def main():
295 global stop_flag
296 proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
297 proxy_info = f" | {DIM}🌐 {proxy}{RESET}" if proxy else ""
298 thinking_info = f" | {YELLOW}💭{RESET}" if os.environ.get("THINKING") else ""
299 print(f"{BOLD}nanocode{RESET} | {DIM}{MODEL} | {os.getcwd()}{proxy_info}{thinking_info} | ESC=stop{RESET}\n")
300
301 messages = []
302 system_prompt = f"""Concise coding assistant. cwd: {os.getcwd()}
303
304IMPORTANT: When you don't have a tool for the task, ALWAYS try search_extension first before saying you can't do it.
305
306Examples:
307- User asks about GitHub repo → search_extension({{"query": "github documentation"}})
308- User needs web data → search_extension({{"query": "web scraping"}})
309- User needs API → search_extension({{"query": "api client"}})"""
310
311 while True:
312 try:
313 print(f"{DIM}{''*80}{RESET}")
314 user_input = input(f"{BOLD}{BLUE}{RESET} ").strip()
315 print(f"{DIM}{''*80}{RESET}")
316
317 if not user_input: continue
318 if user_input in ("/q", "exit"): break
319 if user_input == "/c":
320 messages = []
321 print(f"{GREEN}⏺ Cleared{RESET}")
322 continue
323
324 messages.append({"role": "user", "content": user_input})
325
326 while True:
327 stop_flag = False
328 response = call_api(messages, system_prompt)
329 blocks = process_stream(response)
330 if stop_flag: break
331
332 tool_results = []
333 for block in blocks:
334 if block["type"] == "tool_use":
335 name, args = block["name"], block["input"]
336 preview = str(list(args.values())[0])[:50] if args else ""
337 print(f"\n{GREEN}{name}{RESET}({DIM}{preview}{RESET})")
338
339 result = run_tool(name, args)
340 lines = result.split("\n")
341 prev = lines[0][:60] + ("..." if len(lines[0]) > 60 else "")
342 if len(lines) > 1: prev += f" +{len(lines)-1}"
343 print(f" {DIM}{prev}{RESET}")
344
345 tool_results.append({"type": "tool_result", "tool_use_id": block["id"], "content": result})
346
347 messages.append({"role": "assistant", "content": blocks})
348 if not tool_results: break
349 messages.append({"role": "user", "content": tool_results})
350
351 print()
352 except (KeyboardInterrupt, EOFError): break
353 except Exception as e: print(f"{RED}⏺ Error: {e}{RESET}")
354
355if __name__ == "__main__":
356 main()
357