Last active 1 month ago

liusijin revised this gist 1 month ago. Go to revision

1 file changed, 356 insertions

nanocode_slim.py(file created)

@@ -0,0 +1,356 @@
1 + #!/usr/bin/env python3
2 + """nanocode - minimal claude code alternative"""
3 + import glob as globlib
4 + import json
5 + import os
6 + import re
7 + import readline
8 + import select
9 + import ssl
10 + import subprocess
11 + import sys
12 + import termios
13 + import tty
14 + import urllib.request
15 + import urllib.parse
16 +
17 + OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY")
18 + LOCAL_API_KEY = os.environ.get("LOCAL_API_KEY")
19 + API_URL = (
20 + "http://127.0.0.1:8990/v1/messages" if LOCAL_API_KEY
21 + else "https://openrouter.ai/api/v1/messages" if OPENROUTER_KEY
22 + else "https://api.anthropic.com/v1/messages"
23 + )
24 + MODEL = os.environ.get("MODEL",
25 + "anthropic/claude-sonnet-4.5" if LOCAL_API_KEY
26 + else "anthropic/claude-opus-4.5" if OPENROUTER_KEY
27 + else "claude-opus-4-5"
28 + )
29 +
30 + # ANSI colors
31 + RESET, BOLD, DIM = "\033[0m", "\033[1m", "\033[2m"
32 + BLUE, CYAN, GREEN, YELLOW, RED = "\033[34m", "\033[36m", "\033[32m", "\033[33m", "\033[31m"
33 + stop_flag = False
34 +
35 + def register_tool(name, desc, params):
36 + """Register a tool from extension code"""
37 + def decorator(func):
38 + TOOLS[name] = (desc, params, func)
39 + return func
40 + return decorator
41 +
42 + def search_extension(args):
43 + """Search extensions from gist.kitchain.cn"""
44 + query = args.get("query", "")
45 + if not query: return "error: query required"
46 + try:
47 + # Split query into keywords
48 + keywords = query.lower().split()
49 + gist_info = {} # {gist_path: {"hits": count, "title": str, "desc": str, "topics": []}}
50 +
51 + # Search each keyword as a topic
52 + for keyword in keywords:
53 + url = f"https://gist.kitchain.cn/topics/{urllib.parse.quote(keyword)}"
54 + html = urllib.request.urlopen(urllib.request.Request(url), timeout=10).read().decode()
55 +
56 + # Extract gist URLs and titles
57 + gist_matches = re.findall(
58 + r'<a class="font-bold" href="https://gist\.kitchain\.cn/([^/]+/[a-f0-9]+)">([^<]+)</a>',
59 + html
60 + )
61 +
62 + for gist_path, title in gist_matches:
63 + if gist_path not in gist_info:
64 + # Extract description and topics for this gist
65 + gist_section = re.search(
66 + rf'{re.escape(gist_path)}.*?'
67 + r'<h6 class="text-xs[^"]*">([^<]+)</h6>(.*?)</div>\s*</div>',
68 + html, re.DOTALL
69 + )
70 + desc = ""
71 + topics = []
72 + if gist_section:
73 + desc = gist_section.group(1).strip()
74 + topics_section = gist_section.group(2)
75 + topics = re.findall(r'topics/([^"]+)"[^>]*>([^<]+)<', topics_section)
76 + topics = [t[1] for t in topics] # Extract topic names
77 +
78 + gist_info[gist_path] = {
79 + "hits": 0,
80 + "title": title.strip(),
81 + "desc": desc,
82 + "topics": topics,
83 + "filename": title.strip()
84 + }
85 + gist_info[gist_path]["hits"] += 1
86 +
87 + if not gist_info: return f"No extensions found: {query}"
88 +
89 + # Sort by hit count (descending)
90 + sorted_gists = sorted(gist_info.items(), key=lambda x: x[1]["hits"], reverse=True)[:10]
91 +
92 + result = f"Found {len(sorted_gists)} extensions:\n\n"
93 + for gist_path, info in sorted_gists:
94 + result += f"• {info['title']}\n"
95 + if info['desc']:
96 + result += f" {info['desc']}\n"
97 + if info['topics']:
98 + result += f" Topics: {', '.join(info['topics'])}\n"
99 + result += f" Matched: {info['hits']} keyword(s)\n\n"
100 +
101 + # Return first gist's load URL
102 + first_gist = sorted_gists[0][0]
103 + first_filename = sorted_gists[0][1]['filename']
104 + result += f"To load the top result:\nload({{\"url\": \"https://gist.kitchain.cn/{first_gist}/raw/HEAD/{first_filename}\"}})"
105 + return result
106 + except Exception as e:
107 + return f"error: {e}"
108 +
109 + def load(args):
110 + """Load extension from URL"""
111 + url = args.get("url")
112 + if not url: return "error: url required"
113 + try:
114 + code = urllib.request.urlopen(urllib.request.Request(url), timeout=10).read().decode()
115 + exec(code, {"register_tool": register_tool, "TOOLS": TOOLS, "urllib": urllib, "json": json, "re": re, "subprocess": subprocess})
116 + new = [k for k in TOOLS if k not in ["read","write","edit","glob","grep","bash","search_extension","load"]]
117 + return f"Loaded. New tools: {', '.join(new)}"
118 + except Exception as e:
119 + return f"error: {e}"
120 +
121 + # --- Tools ---
122 + def read(args):
123 + lines = open(args["path"]).readlines()
124 + offset, limit = args.get("offset", 0), args.get("limit", len(lines))
125 + return "".join(f"{offset+i+1:4}| {l}" for i, l in enumerate(lines[offset:offset+limit]))
126 +
127 + def write(args):
128 + open(args["path"], "w").write(args["content"])
129 + return "ok"
130 +
131 + def edit(args):
132 + text = open(args["path"]).read()
133 + old, new = args["old"], args["new"]
134 + if old not in text: return "error: old_string not found"
135 + count = text.count(old)
136 + if not args.get("all") and count > 1:
137 + return f"error: old_string appears {count} times (use all=true)"
138 + open(args["path"], "w").write(text.replace(old, new) if args.get("all") else text.replace(old, new, 1))
139 + return "ok"
140 +
141 + def glob(args):
142 + pattern = (args.get("path", ".") + "/" + args["pat"]).replace("//", "/")
143 + files = sorted(globlib.glob(pattern, recursive=True),
144 + key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0, reverse=True)
145 + return "\n".join(files) or "none"
146 +
147 + def grep(args):
148 + pattern, hits = re.compile(args["pat"]), []
149 + for fp in globlib.glob(args.get("path", ".") + "/**", recursive=True):
150 + try:
151 + for n, l in enumerate(open(fp), 1):
152 + if pattern.search(l): hits.append(f"{fp}:{n}:{l.rstrip()}")
153 + except: pass
154 + return "\n".join(hits[:50]) or "none"
155 +
156 + def bash(args):
157 + proc = subprocess.Popen(args["cmd"], shell=True, stdout=subprocess.PIPE,
158 + stderr=subprocess.STDOUT, text=True)
159 + lines = []
160 + try:
161 + if proc.stdout:
162 + while True:
163 + line = proc.stdout.readline()
164 + if not line and proc.poll() is not None: break
165 + if line:
166 + print(f" {DIM}│ {line.rstrip()}{RESET}", flush=True)
167 + lines.append(line)
168 + proc.wait(timeout=30)
169 + except subprocess.TimeoutExpired:
170 + proc.kill()
171 + lines.append("\n(timeout)")
172 + return "".join(lines).strip() or "(empty)"
173 +
174 + TOOLS = {
175 + "read": ("Read file with line numbers", {"path": "string", "offset": "number?", "limit": "number?"}, read),
176 + "write": ("Write content to file", {"path": "string", "content": "string"}, write),
177 + "edit": ("Replace old with new in file", {"path": "string", "old": "string", "new": "string", "all": "boolean?"}, edit),
178 + "glob": ("Find files by pattern", {"pat": "string", "path": "string?"}, glob),
179 + "grep": ("Search files for regex", {"pat": "string", "path": "string?"}, grep),
180 + "bash": ("Run shell command", {"cmd": "string"}, bash),
181 + "search_extension": ("Search for extensions to add new capabilities (GitHub docs, web scraping, APIs, etc)", {"query": "string"}, search_extension),
182 + "load": ("Load extension from URL to add new tools", {"url": "string"}, load),
183 + }
184 +
185 + def run_tool(name, args):
186 + try: return TOOLS[name][2](args)
187 + except Exception as e: return f"error: {e}"
188 +
189 + def make_schema():
190 + result = []
191 + for name, (desc, params, _) in TOOLS.items():
192 + props, req = {}, []
193 + for pname, ptype in params.items():
194 + opt = ptype.endswith("?")
195 + props[pname] = {"type": "integer" if ptype.rstrip("?") == "number" else ptype.rstrip("?")}
196 + if not opt: req.append(pname)
197 + result.append({"name": name, "description": desc,
198 + "input_schema": {"type": "object", "properties": props, "required": req}})
199 + return result
200 +
201 + def call_api(messages, system_prompt):
202 + headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01"}
203 + if LOCAL_API_KEY: headers["Authorization"] = f"Bearer {LOCAL_API_KEY}"
204 + elif OPENROUTER_KEY: headers["Authorization"] = f"Bearer {OPENROUTER_KEY}"
205 + else: headers["x-api-key"] = os.environ.get("ANTHROPIC_API_KEY", "")
206 +
207 + data = {"model": MODEL, "max_tokens": 8192, "system": system_prompt,
208 + "messages": messages, "tools": make_schema(), "stream": True}
209 +
210 + if os.environ.get("THINKING"):
211 + data["thinking"] = {"type": "enabled", "budget_tokens": int(os.environ.get("THINKING_BUDGET", "10000"))}
212 +
213 + proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
214 + ssl_ctx = ssl.create_default_context()
215 + ssl_ctx.check_hostname = False
216 + ssl_ctx.verify_mode = ssl.CERT_NONE
217 +
218 + handlers = [urllib.request.HTTPSHandler(context=ssl_ctx)]
219 + if proxy: handlers.insert(0, urllib.request.ProxyHandler({"http": proxy, "https": proxy}))
220 +
221 + req = urllib.request.Request(API_URL, json.dumps(data).encode(), headers, method="POST")
222 + return urllib.request.build_opener(*handlers).open(req)
223 +
224 + def process_stream(response):
225 + """简化的流式处理,支持ESC中断"""
226 + global stop_flag
227 + blocks, current, text_buf, json_buf, think_buf = [], None, "", "", ""
228 +
229 + # Save terminal settings
230 + old_settings = termios.tcgetattr(sys.stdin)
231 + try:
232 + tty.setcbreak(sys.stdin.fileno())
233 +
234 + for line in response:
235 + if select.select([sys.stdin], [], [], 0)[0]:
236 + ch = sys.stdin.read(1)
237 + if ch == '\x1b': # ESC key
238 + stop_flag = True
239 + print(f"\n{YELLOW}⏸ Stopped{RESET}")
240 + break
241 +
242 + line = line.decode("utf-8").strip()
243 + if not line.startswith("data: "): continue
244 + if line == "data: [DONE]": continue
245 +
246 + try:
247 + data = json.loads(line[6:])
248 + etype = data.get("type")
249 +
250 + if etype == "content_block_start":
251 + block = data.get("content_block", {})
252 + current = {"type": block.get("type"), "id": block.get("id")}
253 + if current["type"] == "text":
254 + text_buf = ""
255 + print(f"\n{CYAN}⏺{RESET} ", end="", flush=True)
256 + elif current["type"] == "thinking":
257 + think_buf = ""
258 + print(f"\n{YELLOW}💭{RESET} {DIM}", end="", flush=True)
259 + elif current["type"] == "tool_use":
260 + current["name"] = block.get("name")
261 + json_buf = ""
262 +
263 + elif etype == "content_block_delta":
264 + delta = data.get("delta", {})
265 + dtype = delta.get("type")
266 + if dtype == "text_delta":
267 + text = delta.get("text", "")
268 + text_buf += text
269 + print(text, end="", flush=True)
270 + elif dtype == "thinking_delta":
271 + text = delta.get("thinking", "")
272 + think_buf += text
273 + print(text, end="", flush=True)
274 + elif dtype == "input_json_delta" and current:
275 + json_buf += delta.get("partial_json", "")
276 +
277 + elif etype == "content_block_stop" and current:
278 + if current["type"] == "text":
279 + current["text"] = text_buf
280 + print()
281 + elif current["type"] == "thinking":
282 + print(RESET)
283 + elif current["type"] == "tool_use":
284 + try: current["input"] = json.loads(json_buf)
285 + except: current["input"] = {}
286 + blocks.append(current)
287 + current = None
288 + except: pass
289 + finally:
290 + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
291 +
292 + return blocks
293 +
294 + def main():
295 + global stop_flag
296 + proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
297 + proxy_info = f" | {DIM}🌐 {proxy}{RESET}" if proxy else ""
298 + thinking_info = f" | {YELLOW}💭{RESET}" if os.environ.get("THINKING") else ""
299 + print(f"{BOLD}nanocode{RESET} | {DIM}{MODEL} | {os.getcwd()}{proxy_info}{thinking_info} | ESC=stop{RESET}\n")
300 +
301 + messages = []
302 + system_prompt = f"""Concise coding assistant. cwd: {os.getcwd()}
303 +
304 + IMPORTANT: When you don't have a tool for the task, ALWAYS try search_extension first before saying you can't do it.
305 +
306 + Examples:
307 + - User asks about GitHub repo → search_extension({{"query": "github documentation"}})
308 + - User needs web data → search_extension({{"query": "web scraping"}})
309 + - User needs API → search_extension({{"query": "api client"}})"""
310 +
311 + while True:
312 + try:
313 + print(f"{DIM}{'─'*80}{RESET}")
314 + user_input = input(f"{BOLD}{BLUE}❯{RESET} ").strip()
315 + print(f"{DIM}{'─'*80}{RESET}")
316 +
317 + if not user_input: continue
318 + if user_input in ("/q", "exit"): break
319 + if user_input == "/c":
320 + messages = []
321 + print(f"{GREEN}⏺ Cleared{RESET}")
322 + continue
323 +
324 + messages.append({"role": "user", "content": user_input})
325 +
326 + while True:
327 + stop_flag = False
328 + response = call_api(messages, system_prompt)
329 + blocks = process_stream(response)
330 + if stop_flag: break
331 +
332 + tool_results = []
333 + for block in blocks:
334 + if block["type"] == "tool_use":
335 + name, args = block["name"], block["input"]
336 + preview = str(list(args.values())[0])[:50] if args else ""
337 + print(f"\n{GREEN}⏺ {name}{RESET}({DIM}{preview}{RESET})")
338 +
339 + result = run_tool(name, args)
340 + lines = result.split("\n")
341 + prev = lines[0][:60] + ("..." if len(lines[0]) > 60 else "")
342 + if len(lines) > 1: prev += f" +{len(lines)-1}"
343 + print(f" {DIM}⎿ {prev}{RESET}")
344 +
345 + tool_results.append({"type": "tool_result", "tool_use_id": block["id"], "content": result})
346 +
347 + messages.append({"role": "assistant", "content": blocks})
348 + if not tool_results: break
349 + messages.append({"role": "user", "content": tool_results})
350 +
351 + print()
352 + except (KeyboardInterrupt, EOFError): break
353 + except Exception as e: print(f"{RED}⏺ Error: {e}{RESET}")
354 +
355 + if __name__ == "__main__":
356 + main()
Newer Older