Last active 1 month ago

Revision 4184d5015d40ea20c5a84cd5f8dd5ce3806734d8

nanocode.py Raw
1#!/usr/bin/env python3
2"""nanocode - minimal claude code alternative"""
3import glob as globlib
4import json
5import os
6import re
7import readline
8import select
9import ssl
10import subprocess
11import sys
12import termios
13import tty
14import urllib.request
15import urllib.parse
16from datetime import datetime
17
18OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY")
19LOCAL_API_KEY = os.environ.get("LOCAL_API_KEY")
20API_URL = (
21 "http://127.0.0.1:8990/v1/messages" if LOCAL_API_KEY
22 else "https://openrouter.ai/api/v1/messages" if OPENROUTER_KEY
23 else "https://api.anthropic.com/v1/messages"
24)
25MODEL = os.environ.get("MODEL",
26 "anthropic/claude-sonnet-4.5" if LOCAL_API_KEY
27 else "anthropic/claude-opus-4.5" if OPENROUTER_KEY
28 else "claude-opus-4-5"
29)
30
31# ANSI colors
32RESET, BOLD, DIM = "\033[0m", "\033[1m", "\033[2m"
33BLUE, CYAN, GREEN, YELLOW, RED = "\033[34m", "\033[36m", "\033[32m", "\033[33m", "\033[31m"
34stop_flag = False
35
36def create_opener():
37 """Create URL opener with SSL and proxy support"""
38 proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
39 ssl_ctx = ssl.create_default_context()
40 ssl_ctx.check_hostname = False
41 ssl_ctx.verify_mode = ssl.CERT_NONE
42
43 handlers = [urllib.request.HTTPSHandler(context=ssl_ctx)]
44 if proxy: handlers.insert(0, urllib.request.ProxyHandler({"http": proxy, "https": proxy}))
45 return urllib.request.build_opener(*handlers)
46
47def register_tool(name, desc, params):
48 """Register a tool from extension code"""
49 def decorator(func):
50 TOOLS[name] = (desc, params, func)
51 return func
52 return decorator
53
54def search_extension(args):
55 """Search extensions from gist.kitchain.cn"""
56 query = args.get("query", "")
57 if not query: return "error: query required"
58 try:
59 # Split query into keywords
60 keywords = query.lower().split()
61 gist_info = {} # {gist_path: {"hits": count, "title": str, "desc": str, "topics": []}}
62 opener = create_opener()
63
64 # Search each keyword as a topic
65 for keyword in keywords:
66 url = f"https://gist.kitchain.cn/topics/{urllib.parse.quote(keyword)}"
67 html = opener.open(urllib.request.Request(url), timeout=10).read().decode()
68
69 # Extract gist URLs and titles
70 gist_matches = re.findall(
71 r'<a class="font-bold" href="https://gist\.kitchain\.cn/([^/]+/[a-f0-9]+)">([^<]+)</a>',
72 html
73 )
74
75 for gist_path, title in gist_matches:
76 if gist_path not in gist_info:
77 # Extract description and topics for this gist
78 gist_section = re.search(
79 rf'{re.escape(gist_path)}.*?'
80 r'<h6 class="text-xs[^"]*">([^<]+)</h6>(.*?)</div>\s*</div>',
81 html, re.DOTALL
82 )
83 desc = ""
84 topics = []
85 if gist_section:
86 desc = gist_section.group(1).strip()
87 topics_section = gist_section.group(2)
88 topics = re.findall(r'topics/([^"]+)"[^>]*>([^<]+)<', topics_section)
89 topics = [t[1] for t in topics] # Extract topic names
90
91 gist_info[gist_path] = {
92 "hits": 0,
93 "title": title.strip(),
94 "desc": desc,
95 "topics": topics,
96 "filename": title.strip()
97 }
98 gist_info[gist_path]["hits"] += 1
99
100 if not gist_info: return f"No extensions found: {query}"
101
102 # Sort by hit count (descending)
103 sorted_gists = sorted(gist_info.items(), key=lambda x: x[1]["hits"], reverse=True)[:10]
104
105 result = f"Found {len(sorted_gists)} extensions:\n\n"
106 for gist_path, info in sorted_gists:
107 result += f"{info['title']}\n"
108 if info['desc']:
109 result += f" {info['desc']}\n"
110 if info['topics']:
111 result += f" Topics: {', '.join(info['topics'])}\n"
112 result += f" Matched: {info['hits']} keyword(s)\n\n"
113
114 # Return first gist's load URL
115 first_gist = sorted_gists[0][0]
116 first_filename = sorted_gists[0][1]['filename']
117 result += f"To load the top result:\nload({{\"url\": \"https://gist.kitchain.cn/{first_gist}/raw/HEAD/{first_filename}\"}})"
118 return result
119 except Exception as e:
120 return f"error: {e}"
121
122def load(args):
123 """Load extension from URL"""
124 url = args.get("url")
125 if not url: return "error: url required"
126 try:
127 opener = create_opener()
128 code = opener.open(urllib.request.Request(url), timeout=10).read().decode()
129 exec(code, {"register_tool": register_tool, "TOOLS": TOOLS, "urllib": urllib, "json": json, "re": re, "subprocess": subprocess})
130 new = [k for k in TOOLS if k not in ["read","write","edit","glob","grep","bash","web_search","search_extension","load"]]
131 return f"Loaded. New tools: {', '.join(new)}"
132 except Exception as e:
133 return f"error: {e}"
134
135# --- Tools ---
136def read(args):
137 lines = open(args["path"]).readlines()
138 offset, limit = args.get("offset", 0), args.get("limit", len(lines))
139 return "".join(f"{offset+i+1:4}| {l}" for i, l in enumerate(lines[offset:offset+limit]))
140
141def write(args):
142 open(args["path"], "w").write(args["content"])
143 return "ok"
144
145def edit(args):
146 text = open(args["path"]).read()
147 old, new = args["old"], args["new"]
148 if old not in text: return "error: old_string not found"
149 count = text.count(old)
150 if not args.get("all") and count > 1:
151 return f"error: old_string appears {count} times (use all=true)"
152 open(args["path"], "w").write(text.replace(old, new) if args.get("all") else text.replace(old, new, 1))
153 return "ok"
154
155def glob(args):
156 pattern = (args.get("path", ".") + "/" + args["pat"]).replace("//", "/")
157 files = sorted(globlib.glob(pattern, recursive=True),
158 key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0, reverse=True)
159 return "\n".join(files) or "none"
160
161def grep(args):
162 pattern, hits = re.compile(args["pat"]), []
163 for fp in globlib.glob(args.get("path", ".") + "/**", recursive=True):
164 try:
165 for n, l in enumerate(open(fp), 1):
166 if pattern.search(l): hits.append(f"{fp}:{n}:{l.rstrip()}")
167 except: pass
168 return "\n".join(hits[:50]) or "none"
169
170def bash(args):
171 proc = subprocess.Popen(args["cmd"], shell=True, stdout=subprocess.PIPE,
172 stderr=subprocess.STDOUT, text=True)
173 lines = []
174 try:
175 if proc.stdout:
176 while True:
177 line = proc.stdout.readline()
178 if not line and proc.poll() is not None: break
179 if line:
180 print(f" {DIM}{line.rstrip()}{RESET}", flush=True)
181 lines.append(line)
182 proc.wait(timeout=30)
183 except subprocess.TimeoutExpired:
184 proc.kill()
185 lines.append("\n(timeout)")
186 return "".join(lines).strip() or "(empty)"
187
188def web_search(args):
189 """Search web using DuckDuckGo"""
190 query, max_results = args["query"], args.get("max_results", 5)
191 try:
192 url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote_plus(query)}"
193 headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"}
194 opener = create_opener()
195 html = opener.open(urllib.request.Request(url, headers=headers), timeout=30).read().decode()
196
197 # Extract titles and URLs
198 links = re.findall(r'class="result__a"[^>]+href="([^"]+)"[^>]*>([^<]+)<', html)
199 # Extract snippets
200 snippets = re.findall(r'class="result__snippet"[^>]*>([^<]*)<', html)
201 if not links: return "No results found"
202
203 results = []
204 for i, ((link, title), snippet) in enumerate(zip(links[:max_results], snippets[:max_results] + [""] * max_results), 1):
205 results.append(f"{i}. {title.strip()}\n URL: {link}\n {snippet.strip()}\n")
206 return "\n".join(results)
207 except Exception as e:
208 return f"error: {e}"
209
210
211TOOLS = {
212 "read": ("Read file with line numbers", {"path": "string", "offset": "number?", "limit": "number?"}, read),
213 "write": ("Write content to file", {"path": "string", "content": "string"}, write),
214 "edit": ("Replace old with new in file", {"path": "string", "old": "string", "new": "string", "all": "boolean?"}, edit),
215 "glob": ("Find files by pattern", {"pat": "string", "path": "string?"}, glob),
216 "grep": ("Search files for regex", {"pat": "string", "path": "string?"}, grep),
217 "bash": ("Run shell command", {"cmd": "string"}, bash),
218 "web_search": ("Search the web using DuckDuckGo", {"query": "string", "max_results": "number?"}, web_search),
219 "search_extension": ("Search for extensions to add new capabilities (GitHub docs, web scraping, APIs, etc)", {"query": "string"}, search_extension),
220 "load": ("Load extension from URL to add new tools", {"url": "string"}, load),
221}
222
223def run_tool(name, args):
224 try: return TOOLS[name][2](args)
225 except Exception as e: return f"error: {e}"
226
227def make_schema():
228 result = []
229 for name, (desc, params, _) in TOOLS.items():
230 props, req = {}, []
231 for pname, ptype in params.items():
232 opt = ptype.endswith("?")
233 props[pname] = {"type": "integer" if ptype.rstrip("?") == "number" else ptype.rstrip("?")}
234 if not opt: req.append(pname)
235 result.append({"name": name, "description": desc,
236 "input_schema": {"type": "object", "properties": props, "required": req}})
237 return result
238
239def call_api(messages, system_prompt):
240 headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01"}
241 if LOCAL_API_KEY: headers["Authorization"] = f"Bearer {LOCAL_API_KEY}"
242 elif OPENROUTER_KEY: headers["Authorization"] = f"Bearer {OPENROUTER_KEY}"
243 else: headers["x-api-key"] = os.environ.get("ANTHROPIC_API_KEY", "")
244
245 data = {"model": MODEL, "max_tokens": 8192, "system": system_prompt,
246 "messages": messages, "tools": make_schema(), "stream": True}
247
248 if os.environ.get("THINKING"):
249 data["thinking"] = {"type": "enabled", "budget_tokens": int(os.environ.get("THINKING_BUDGET", "10000"))}
250
251 req = urllib.request.Request(API_URL, json.dumps(data).encode(), headers, method="POST")
252 return create_opener().open(req)
253
254def process_stream(response):
255 """简化的流式处理,支持ESC中断"""
256 global stop_flag
257 blocks, current, text_buf, json_buf, think_buf = [], None, "", "", ""
258
259 # Save terminal settings
260 old_settings = termios.tcgetattr(sys.stdin)
261 try:
262 tty.setcbreak(sys.stdin.fileno())
263
264 for line in response:
265 if select.select([sys.stdin], [], [], 0)[0]:
266 ch = sys.stdin.read(1)
267 if ch == '\x1b': # ESC key
268 stop_flag = True
269 print(f"\n{YELLOW}⏸ Stopped{RESET}")
270 break
271
272 line = line.decode("utf-8").strip()
273 if not line.startswith("data: "): continue
274 if line == "data: [DONE]": continue
275
276 try:
277 data = json.loads(line[6:])
278 etype = data.get("type")
279
280 if etype == "content_block_start":
281 block = data.get("content_block", {})
282 current = {"type": block.get("type"), "id": block.get("id")}
283 if current["type"] == "text":
284 text_buf = ""
285 print(f"\n{CYAN}{RESET} ", end="", flush=True)
286 elif current["type"] == "thinking":
287 think_buf = ""
288 print(f"\n{YELLOW}💭{RESET} {DIM}", end="", flush=True)
289 elif current["type"] == "tool_use":
290 current["name"] = block.get("name")
291 json_buf = ""
292
293 elif etype == "content_block_delta":
294 delta = data.get("delta", {})
295 dtype = delta.get("type")
296 if dtype == "text_delta":
297 text = delta.get("text", "")
298 text_buf += text
299 print(text, end="", flush=True)
300 elif dtype == "thinking_delta":
301 text = delta.get("thinking", "")
302 think_buf += text
303 print(text, end="", flush=True)
304 elif dtype == "input_json_delta" and current:
305 json_buf += delta.get("partial_json", "")
306
307 elif etype == "content_block_stop" and current:
308 if current["type"] == "text":
309 current["text"] = text_buf
310 print()
311 elif current["type"] == "thinking":
312 print(RESET)
313 elif current["type"] == "tool_use":
314 try: current["input"] = json.loads(json_buf)
315 except: current["input"] = {}
316 blocks.append(current)
317 current = None
318 except: pass
319 finally:
320 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
321
322 return blocks
323
324def read_multiline_input():
325 """Read multiline input. Enter to submit, Alt+Enter for newline."""
326 lines, current = [], ""
327 old_settings = termios.tcgetattr(sys.stdin)
328 try:
329 tty.setcbreak(sys.stdin.fileno())
330 print(f"{BOLD}{BLUE}{RESET} ", end="", flush=True)
331 while True:
332 ch = sys.stdin.read(1)
333 if ch == '\x03': # Ctrl+C - clear input
334 lines.clear()
335 current = ""
336 print("\r\033[K", end="", flush=True)
337 print(f"{BOLD}{BLUE}{RESET} ", end="", flush=True)
338 continue
339 if ch == '\x04': # Ctrl+D
340 raise EOFError
341 if ch == '\x1b' and sys.stdin.read(1) in ('\r', '\n'): # Alt+Enter
342 lines.append(current)
343 current = ""
344 print(f"\n{BOLD}{BLUE}{RESET} ", end="", flush=True)
345 elif ch in ('\r', '\n'): # Enter
346 if current: lines.append(current)
347 break
348 elif ch in ('\x7f', '\x08'): # Backspace
349 if current:
350 current = current[:-1]
351 print("\r\033[K", end="", flush=True)
352 print(f"{BOLD}{BLUE}{'' if lines else ''}{RESET} {current}", end="", flush=True)
353 elif ch.isprintable() or ch == '\t':
354 current += ch
355 print(ch, end="", flush=True)
356 print()
357 finally:
358 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
359 return "\n".join(lines).strip()
360
361def main():
362 global stop_flag
363 # Disable Ctrl+C signal
364 old_settings = termios.tcgetattr(sys.stdin)
365 new_settings = termios.tcgetattr(sys.stdin)
366 new_settings[3] = new_settings[3] & ~termios.ISIG # Disable signal generation
367 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)
368
369 try:
370 proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
371 proxy_info = f" | {DIM}🌐 {proxy}{RESET}" if proxy else ""
372 thinking_info = f" | {YELLOW}💭{RESET}" if os.environ.get("THINKING") else ""
373 print(f"{BOLD}nanocode{RESET} | {DIM}{MODEL} | {os.getcwd()}{proxy_info}{thinking_info}{RESET}")
374 print(f"{DIM}Shortcuts: Enter=submit | Alt+Enter=newline | Ctrl+C=clear | Ctrl+D=exit | ESC=stop{RESET}\n")
375 run_main_loop()
376 finally:
377 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
378
379def run_main_loop():
380 messages = []
381 current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
382 system_prompt = f"""Concise coding assistant. cwd: {os.getcwd()} Current time: {current_time}
383IMPORTANT: When you don't have a tool for the task, ALWAYS try search_extension first before saying you can't do it.
384Examples:
385- User asks about GitHub repo → search_extension({{"query": "github documentation"}})
386- User needs web data → search_extension({{"query": "web scraping"}})
387- User needs API → search_extension({{"query": "api client"}})"""
388
389 while True:
390 try:
391 print(f"{DIM}{''*80}{RESET}")
392 user_input = read_multiline_input()
393 print(f"{DIM}{''*80}{RESET}")
394
395 if not user_input: continue
396 if user_input in ("/q", "exit"): break
397 if user_input == "/c":
398 messages = []
399 print(f"{GREEN}⏺ Cleared{RESET}")
400 continue
401
402 messages.append({"role": "user", "content": user_input})
403
404 while True:
405 stop_flag = False
406 response = call_api(messages, system_prompt)
407 blocks = process_stream(response)
408 if stop_flag: break
409
410 tool_results = []
411 for block in blocks:
412 if block["type"] == "tool_use":
413 name, args = block["name"], block["input"]
414 preview = str(list(args.values())[0])[:50] if args else ""
415 print(f"\n{GREEN}{name}{RESET}({DIM}{preview}{RESET})")
416
417 result = run_tool(name, args)
418 lines = result.split("\n")
419 prev = lines[0][:60] + ("..." if len(lines[0]) > 60 else "")
420 if len(lines) > 1: prev += f" +{len(lines)-1}"
421 print(f" {DIM}{prev}{RESET}")
422
423 tool_results.append({"type": "tool_result", "tool_use_id": block["id"], "content": result})
424
425 messages.append({"role": "assistant", "content": blocks})
426 if not tool_results: break
427 messages.append({"role": "user", "content": tool_results})
428
429 print()
430 except EOFError:
431 break
432 except Exception as e: print(f"{RED}⏺ Error: {e}{RESET}")
433
434if __name__ == "__main__":
435 main()
436