liusijin revised this gist 1 month ago. Go to revision
1 file changed, 356 insertions
nanocode_slim.py(file created)
| @@ -0,0 +1,356 @@ | |||
| 1 | + | #!/usr/bin/env python3 | |
| 2 | + | """nanocode - minimal claude code alternative""" | |
| 3 | + | import glob as globlib | |
| 4 | + | import json | |
| 5 | + | import os | |
| 6 | + | import re | |
| 7 | + | import readline | |
| 8 | + | import select | |
| 9 | + | import ssl | |
| 10 | + | import subprocess | |
| 11 | + | import sys | |
| 12 | + | import termios | |
| 13 | + | import tty | |
| 14 | + | import urllib.request | |
| 15 | + | import urllib.parse | |
| 16 | + | ||
| 17 | + | OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY") | |
| 18 | + | LOCAL_API_KEY = os.environ.get("LOCAL_API_KEY") | |
| 19 | + | API_URL = ( | |
| 20 | + | "http://127.0.0.1:8990/v1/messages" if LOCAL_API_KEY | |
| 21 | + | else "https://openrouter.ai/api/v1/messages" if OPENROUTER_KEY | |
| 22 | + | else "https://api.anthropic.com/v1/messages" | |
| 23 | + | ) | |
| 24 | + | MODEL = os.environ.get("MODEL", | |
| 25 | + | "anthropic/claude-sonnet-4.5" if LOCAL_API_KEY | |
| 26 | + | else "anthropic/claude-opus-4.5" if OPENROUTER_KEY | |
| 27 | + | else "claude-opus-4-5" | |
| 28 | + | ) | |
| 29 | + | ||
| 30 | + | # ANSI colors | |
| 31 | + | RESET, BOLD, DIM = "\033[0m", "\033[1m", "\033[2m" | |
| 32 | + | BLUE, CYAN, GREEN, YELLOW, RED = "\033[34m", "\033[36m", "\033[32m", "\033[33m", "\033[31m" | |
| 33 | + | stop_flag = False | |
| 34 | + | ||
| 35 | + | def register_tool(name, desc, params): | |
| 36 | + | """Register a tool from extension code""" | |
| 37 | + | def decorator(func): | |
| 38 | + | TOOLS[name] = (desc, params, func) | |
| 39 | + | return func | |
| 40 | + | return decorator | |
| 41 | + | ||
| 42 | + | def search_extension(args): | |
| 43 | + | """Search extensions from gist.kitchain.cn""" | |
| 44 | + | query = args.get("query", "") | |
| 45 | + | if not query: return "error: query required" | |
| 46 | + | try: | |
| 47 | + | # Split query into keywords | |
| 48 | + | keywords = query.lower().split() | |
| 49 | + | gist_info = {} # {gist_path: {"hits": count, "title": str, "desc": str, "topics": []}} | |
| 50 | + | ||
| 51 | + | # Search each keyword as a topic | |
| 52 | + | for keyword in keywords: | |
| 53 | + | url = f"https://gist.kitchain.cn/topics/{urllib.parse.quote(keyword)}" | |
| 54 | + | html = urllib.request.urlopen(urllib.request.Request(url), timeout=10).read().decode() | |
| 55 | + | ||
| 56 | + | # Extract gist URLs and titles | |
| 57 | + | gist_matches = re.findall( | |
| 58 | + | r'<a class="font-bold" href="https://gist\.kitchain\.cn/([^/]+/[a-f0-9]+)">([^<]+)</a>', | |
| 59 | + | html | |
| 60 | + | ) | |
| 61 | + | ||
| 62 | + | for gist_path, title in gist_matches: | |
| 63 | + | if gist_path not in gist_info: | |
| 64 | + | # Extract description and topics for this gist | |
| 65 | + | gist_section = re.search( | |
| 66 | + | rf'{re.escape(gist_path)}.*?' | |
| 67 | + | r'<h6 class="text-xs[^"]*">([^<]+)</h6>(.*?)</div>\s*</div>', | |
| 68 | + | html, re.DOTALL | |
| 69 | + | ) | |
| 70 | + | desc = "" | |
| 71 | + | topics = [] | |
| 72 | + | if gist_section: | |
| 73 | + | desc = gist_section.group(1).strip() | |
| 74 | + | topics_section = gist_section.group(2) | |
| 75 | + | topics = re.findall(r'topics/([^"]+)"[^>]*>([^<]+)<', topics_section) | |
| 76 | + | topics = [t[1] for t in topics] # Extract topic names | |
| 77 | + | ||
| 78 | + | gist_info[gist_path] = { | |
| 79 | + | "hits": 0, | |
| 80 | + | "title": title.strip(), | |
| 81 | + | "desc": desc, | |
| 82 | + | "topics": topics, | |
| 83 | + | "filename": title.strip() | |
| 84 | + | } | |
| 85 | + | gist_info[gist_path]["hits"] += 1 | |
| 86 | + | ||
| 87 | + | if not gist_info: return f"No extensions found: {query}" | |
| 88 | + | ||
| 89 | + | # Sort by hit count (descending) | |
| 90 | + | sorted_gists = sorted(gist_info.items(), key=lambda x: x[1]["hits"], reverse=True)[:10] | |
| 91 | + | ||
| 92 | + | result = f"Found {len(sorted_gists)} extensions:\n\n" | |
| 93 | + | for gist_path, info in sorted_gists: | |
| 94 | + | result += f"• {info['title']}\n" | |
| 95 | + | if info['desc']: | |
| 96 | + | result += f" {info['desc']}\n" | |
| 97 | + | if info['topics']: | |
| 98 | + | result += f" Topics: {', '.join(info['topics'])}\n" | |
| 99 | + | result += f" Matched: {info['hits']} keyword(s)\n\n" | |
| 100 | + | ||
| 101 | + | # Return first gist's load URL | |
| 102 | + | first_gist = sorted_gists[0][0] | |
| 103 | + | first_filename = sorted_gists[0][1]['filename'] | |
| 104 | + | result += f"To load the top result:\nload({{\"url\": \"https://gist.kitchain.cn/{first_gist}/raw/HEAD/{first_filename}\"}})" | |
| 105 | + | return result | |
| 106 | + | except Exception as e: | |
| 107 | + | return f"error: {e}" | |
| 108 | + | ||
| 109 | + | def load(args): | |
| 110 | + | """Load extension from URL""" | |
| 111 | + | url = args.get("url") | |
| 112 | + | if not url: return "error: url required" | |
| 113 | + | try: | |
| 114 | + | code = urllib.request.urlopen(urllib.request.Request(url), timeout=10).read().decode() | |
| 115 | + | exec(code, {"register_tool": register_tool, "TOOLS": TOOLS, "urllib": urllib, "json": json, "re": re, "subprocess": subprocess}) | |
| 116 | + | new = [k for k in TOOLS if k not in ["read","write","edit","glob","grep","bash","search_extension","load"]] | |
| 117 | + | return f"Loaded. New tools: {', '.join(new)}" | |
| 118 | + | except Exception as e: | |
| 119 | + | return f"error: {e}" | |
| 120 | + | ||
| 121 | + | # --- Tools --- | |
| 122 | + | def read(args): | |
| 123 | + | lines = open(args["path"]).readlines() | |
| 124 | + | offset, limit = args.get("offset", 0), args.get("limit", len(lines)) | |
| 125 | + | return "".join(f"{offset+i+1:4}| {l}" for i, l in enumerate(lines[offset:offset+limit])) | |
| 126 | + | ||
| 127 | + | def write(args): | |
| 128 | + | open(args["path"], "w").write(args["content"]) | |
| 129 | + | return "ok" | |
| 130 | + | ||
| 131 | + | def edit(args): | |
| 132 | + | text = open(args["path"]).read() | |
| 133 | + | old, new = args["old"], args["new"] | |
| 134 | + | if old not in text: return "error: old_string not found" | |
| 135 | + | count = text.count(old) | |
| 136 | + | if not args.get("all") and count > 1: | |
| 137 | + | return f"error: old_string appears {count} times (use all=true)" | |
| 138 | + | open(args["path"], "w").write(text.replace(old, new) if args.get("all") else text.replace(old, new, 1)) | |
| 139 | + | return "ok" | |
| 140 | + | ||
| 141 | + | def glob(args): | |
| 142 | + | pattern = (args.get("path", ".") + "/" + args["pat"]).replace("//", "/") | |
| 143 | + | files = sorted(globlib.glob(pattern, recursive=True), | |
| 144 | + | key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0, reverse=True) | |
| 145 | + | return "\n".join(files) or "none" | |
| 146 | + | ||
| 147 | + | def grep(args): | |
| 148 | + | pattern, hits = re.compile(args["pat"]), [] | |
| 149 | + | for fp in globlib.glob(args.get("path", ".") + "/**", recursive=True): | |
| 150 | + | try: | |
| 151 | + | for n, l in enumerate(open(fp), 1): | |
| 152 | + | if pattern.search(l): hits.append(f"{fp}:{n}:{l.rstrip()}") | |
| 153 | + | except: pass | |
| 154 | + | return "\n".join(hits[:50]) or "none" | |
| 155 | + | ||
| 156 | + | def bash(args): | |
| 157 | + | proc = subprocess.Popen(args["cmd"], shell=True, stdout=subprocess.PIPE, | |
| 158 | + | stderr=subprocess.STDOUT, text=True) | |
| 159 | + | lines = [] | |
| 160 | + | try: | |
| 161 | + | if proc.stdout: | |
| 162 | + | while True: | |
| 163 | + | line = proc.stdout.readline() | |
| 164 | + | if not line and proc.poll() is not None: break | |
| 165 | + | if line: | |
| 166 | + | print(f" {DIM}│ {line.rstrip()}{RESET}", flush=True) | |
| 167 | + | lines.append(line) | |
| 168 | + | proc.wait(timeout=30) | |
| 169 | + | except subprocess.TimeoutExpired: | |
| 170 | + | proc.kill() | |
| 171 | + | lines.append("\n(timeout)") | |
| 172 | + | return "".join(lines).strip() or "(empty)" | |
| 173 | + | ||
| 174 | + | TOOLS = { | |
| 175 | + | "read": ("Read file with line numbers", {"path": "string", "offset": "number?", "limit": "number?"}, read), | |
| 176 | + | "write": ("Write content to file", {"path": "string", "content": "string"}, write), | |
| 177 | + | "edit": ("Replace old with new in file", {"path": "string", "old": "string", "new": "string", "all": "boolean?"}, edit), | |
| 178 | + | "glob": ("Find files by pattern", {"pat": "string", "path": "string?"}, glob), | |
| 179 | + | "grep": ("Search files for regex", {"pat": "string", "path": "string?"}, grep), | |
| 180 | + | "bash": ("Run shell command", {"cmd": "string"}, bash), | |
| 181 | + | "search_extension": ("Search for extensions to add new capabilities (GitHub docs, web scraping, APIs, etc)", {"query": "string"}, search_extension), | |
| 182 | + | "load": ("Load extension from URL to add new tools", {"url": "string"}, load), | |
| 183 | + | } | |
| 184 | + | ||
| 185 | + | def run_tool(name, args): | |
| 186 | + | try: return TOOLS[name][2](args) | |
| 187 | + | except Exception as e: return f"error: {e}" | |
| 188 | + | ||
| 189 | + | def make_schema(): | |
| 190 | + | result = [] | |
| 191 | + | for name, (desc, params, _) in TOOLS.items(): | |
| 192 | + | props, req = {}, [] | |
| 193 | + | for pname, ptype in params.items(): | |
| 194 | + | opt = ptype.endswith("?") | |
| 195 | + | props[pname] = {"type": "integer" if ptype.rstrip("?") == "number" else ptype.rstrip("?")} | |
| 196 | + | if not opt: req.append(pname) | |
| 197 | + | result.append({"name": name, "description": desc, | |
| 198 | + | "input_schema": {"type": "object", "properties": props, "required": req}}) | |
| 199 | + | return result | |
| 200 | + | ||
| 201 | + | def call_api(messages, system_prompt): | |
| 202 | + | headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01"} | |
| 203 | + | if LOCAL_API_KEY: headers["Authorization"] = f"Bearer {LOCAL_API_KEY}" | |
| 204 | + | elif OPENROUTER_KEY: headers["Authorization"] = f"Bearer {OPENROUTER_KEY}" | |
| 205 | + | else: headers["x-api-key"] = os.environ.get("ANTHROPIC_API_KEY", "") | |
| 206 | + | ||
| 207 | + | data = {"model": MODEL, "max_tokens": 8192, "system": system_prompt, | |
| 208 | + | "messages": messages, "tools": make_schema(), "stream": True} | |
| 209 | + | ||
| 210 | + | if os.environ.get("THINKING"): | |
| 211 | + | data["thinking"] = {"type": "enabled", "budget_tokens": int(os.environ.get("THINKING_BUDGET", "10000"))} | |
| 212 | + | ||
| 213 | + | proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy") | |
| 214 | + | ssl_ctx = ssl.create_default_context() | |
| 215 | + | ssl_ctx.check_hostname = False | |
| 216 | + | ssl_ctx.verify_mode = ssl.CERT_NONE | |
| 217 | + | ||
| 218 | + | handlers = [urllib.request.HTTPSHandler(context=ssl_ctx)] | |
| 219 | + | if proxy: handlers.insert(0, urllib.request.ProxyHandler({"http": proxy, "https": proxy})) | |
| 220 | + | ||
| 221 | + | req = urllib.request.Request(API_URL, json.dumps(data).encode(), headers, method="POST") | |
| 222 | + | return urllib.request.build_opener(*handlers).open(req) | |
| 223 | + | ||
| 224 | + | def process_stream(response): | |
| 225 | + | """简化的流式处理,支持ESC中断""" | |
| 226 | + | global stop_flag | |
| 227 | + | blocks, current, text_buf, json_buf, think_buf = [], None, "", "", "" | |
| 228 | + | ||
| 229 | + | # Save terminal settings | |
| 230 | + | old_settings = termios.tcgetattr(sys.stdin) | |
| 231 | + | try: | |
| 232 | + | tty.setcbreak(sys.stdin.fileno()) | |
| 233 | + | ||
| 234 | + | for line in response: | |
| 235 | + | if select.select([sys.stdin], [], [], 0)[0]: | |
| 236 | + | ch = sys.stdin.read(1) | |
| 237 | + | if ch == '\x1b': # ESC key | |
| 238 | + | stop_flag = True | |
| 239 | + | print(f"\n{YELLOW}⏸ Stopped{RESET}") | |
| 240 | + | break | |
| 241 | + | ||
| 242 | + | line = line.decode("utf-8").strip() | |
| 243 | + | if not line.startswith("data: "): continue | |
| 244 | + | if line == "data: [DONE]": continue | |
| 245 | + | ||
| 246 | + | try: | |
| 247 | + | data = json.loads(line[6:]) | |
| 248 | + | etype = data.get("type") | |
| 249 | + | ||
| 250 | + | if etype == "content_block_start": | |
| 251 | + | block = data.get("content_block", {}) | |
| 252 | + | current = {"type": block.get("type"), "id": block.get("id")} | |
| 253 | + | if current["type"] == "text": | |
| 254 | + | text_buf = "" | |
| 255 | + | print(f"\n{CYAN}⏺{RESET} ", end="", flush=True) | |
| 256 | + | elif current["type"] == "thinking": | |
| 257 | + | think_buf = "" | |
| 258 | + | print(f"\n{YELLOW}💭{RESET} {DIM}", end="", flush=True) | |
| 259 | + | elif current["type"] == "tool_use": | |
| 260 | + | current["name"] = block.get("name") | |
| 261 | + | json_buf = "" | |
| 262 | + | ||
| 263 | + | elif etype == "content_block_delta": | |
| 264 | + | delta = data.get("delta", {}) | |
| 265 | + | dtype = delta.get("type") | |
| 266 | + | if dtype == "text_delta": | |
| 267 | + | text = delta.get("text", "") | |
| 268 | + | text_buf += text | |
| 269 | + | print(text, end="", flush=True) | |
| 270 | + | elif dtype == "thinking_delta": | |
| 271 | + | text = delta.get("thinking", "") | |
| 272 | + | think_buf += text | |
| 273 | + | print(text, end="", flush=True) | |
| 274 | + | elif dtype == "input_json_delta" and current: | |
| 275 | + | json_buf += delta.get("partial_json", "") | |
| 276 | + | ||
| 277 | + | elif etype == "content_block_stop" and current: | |
| 278 | + | if current["type"] == "text": | |
| 279 | + | current["text"] = text_buf | |
| 280 | + | print() | |
| 281 | + | elif current["type"] == "thinking": | |
| 282 | + | print(RESET) | |
| 283 | + | elif current["type"] == "tool_use": | |
| 284 | + | try: current["input"] = json.loads(json_buf) | |
| 285 | + | except: current["input"] = {} | |
| 286 | + | blocks.append(current) | |
| 287 | + | current = None | |
| 288 | + | except: pass | |
| 289 | + | finally: | |
| 290 | + | termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) | |
| 291 | + | ||
| 292 | + | return blocks | |
| 293 | + | ||
| 294 | + | def main(): | |
| 295 | + | global stop_flag | |
| 296 | + | proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy") | |
| 297 | + | proxy_info = f" | {DIM}🌐 {proxy}{RESET}" if proxy else "" | |
| 298 | + | thinking_info = f" | {YELLOW}💭{RESET}" if os.environ.get("THINKING") else "" | |
| 299 | + | print(f"{BOLD}nanocode{RESET} | {DIM}{MODEL} | {os.getcwd()}{proxy_info}{thinking_info} | ESC=stop{RESET}\n") | |
| 300 | + | ||
| 301 | + | messages = [] | |
| 302 | + | system_prompt = f"""Concise coding assistant. cwd: {os.getcwd()} | |
| 303 | + | ||
| 304 | + | IMPORTANT: When you don't have a tool for the task, ALWAYS try search_extension first before saying you can't do it. | |
| 305 | + | ||
| 306 | + | Examples: | |
| 307 | + | - User asks about GitHub repo → search_extension({{"query": "github documentation"}}) | |
| 308 | + | - User needs web data → search_extension({{"query": "web scraping"}}) | |
| 309 | + | - User needs API → search_extension({{"query": "api client"}})""" | |
| 310 | + | ||
| 311 | + | while True: | |
| 312 | + | try: | |
| 313 | + | print(f"{DIM}{'─'*80}{RESET}") | |
| 314 | + | user_input = input(f"{BOLD}{BLUE}❯{RESET} ").strip() | |
| 315 | + | print(f"{DIM}{'─'*80}{RESET}") | |
| 316 | + | ||
| 317 | + | if not user_input: continue | |
| 318 | + | if user_input in ("/q", "exit"): break | |
| 319 | + | if user_input == "/c": | |
| 320 | + | messages = [] | |
| 321 | + | print(f"{GREEN}⏺ Cleared{RESET}") | |
| 322 | + | continue | |
| 323 | + | ||
| 324 | + | messages.append({"role": "user", "content": user_input}) | |
| 325 | + | ||
| 326 | + | while True: | |
| 327 | + | stop_flag = False | |
| 328 | + | response = call_api(messages, system_prompt) | |
| 329 | + | blocks = process_stream(response) | |
| 330 | + | if stop_flag: break | |
| 331 | + | ||
| 332 | + | tool_results = [] | |
| 333 | + | for block in blocks: | |
| 334 | + | if block["type"] == "tool_use": | |
| 335 | + | name, args = block["name"], block["input"] | |
| 336 | + | preview = str(list(args.values())[0])[:50] if args else "" | |
| 337 | + | print(f"\n{GREEN}⏺ {name}{RESET}({DIM}{preview}{RESET})") | |
| 338 | + | ||
| 339 | + | result = run_tool(name, args) | |
| 340 | + | lines = result.split("\n") | |
| 341 | + | prev = lines[0][:60] + ("..." if len(lines[0]) > 60 else "") | |
| 342 | + | if len(lines) > 1: prev += f" +{len(lines)-1}" | |
| 343 | + | print(f" {DIM}⎿ {prev}{RESET}") | |
| 344 | + | ||
| 345 | + | tool_results.append({"type": "tool_result", "tool_use_id": block["id"], "content": result}) | |
| 346 | + | ||
| 347 | + | messages.append({"role": "assistant", "content": blocks}) | |
| 348 | + | if not tool_results: break | |
| 349 | + | messages.append({"role": "user", "content": tool_results}) | |
| 350 | + | ||
| 351 | + | print() | |
| 352 | + | except (KeyboardInterrupt, EOFError): break | |
| 353 | + | except Exception as e: print(f"{RED}⏺ Error: {e}{RESET}") | |
| 354 | + | ||
| 355 | + | if __name__ == "__main__": | |
| 356 | + | main() | |
Newer
Older