Last active 1 month ago

liusijin revised this gist 1 month ago. Go to revision

1 file changed, 435 insertions

nanocode.py(file created)

@@ -0,0 +1,435 @@
1 + #!/usr/bin/env python3
2 + """nanocode - minimal claude code alternative"""
3 + import glob as globlib
4 + import json
5 + import os
6 + import re
7 + import readline
8 + import select
9 + import ssl
10 + import subprocess
11 + import sys
12 + import termios
13 + import tty
14 + import urllib.request
15 + import urllib.parse
16 + from datetime import datetime
17 +
18 + OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY")
19 + LOCAL_API_KEY = os.environ.get("LOCAL_API_KEY")
20 + API_URL = (
21 + "http://127.0.0.1:8990/v1/messages" if LOCAL_API_KEY
22 + else "https://openrouter.ai/api/v1/messages" if OPENROUTER_KEY
23 + else "https://api.anthropic.com/v1/messages"
24 + )
25 + MODEL = os.environ.get("MODEL",
26 + "anthropic/claude-sonnet-4.5" if LOCAL_API_KEY
27 + else "anthropic/claude-opus-4.5" if OPENROUTER_KEY
28 + else "claude-opus-4-5"
29 + )
30 +
31 + # ANSI colors
32 + RESET, BOLD, DIM = "\033[0m", "\033[1m", "\033[2m"
33 + BLUE, CYAN, GREEN, YELLOW, RED = "\033[34m", "\033[36m", "\033[32m", "\033[33m", "\033[31m"
34 + stop_flag = False
35 +
36 + def create_opener():
37 + """Create URL opener with SSL and proxy support"""
38 + proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
39 + ssl_ctx = ssl.create_default_context()
40 + ssl_ctx.check_hostname = False
41 + ssl_ctx.verify_mode = ssl.CERT_NONE
42 +
43 + handlers = [urllib.request.HTTPSHandler(context=ssl_ctx)]
44 + if proxy: handlers.insert(0, urllib.request.ProxyHandler({"http": proxy, "https": proxy}))
45 + return urllib.request.build_opener(*handlers)
46 +
47 + def register_tool(name, desc, params):
48 + """Register a tool from extension code"""
49 + def decorator(func):
50 + TOOLS[name] = (desc, params, func)
51 + return func
52 + return decorator
53 +
54 + def search_extension(args):
55 + """Search extensions from gist.kitchain.cn"""
56 + query = args.get("query", "")
57 + if not query: return "error: query required"
58 + try:
59 + # Split query into keywords
60 + keywords = query.lower().split()
61 + gist_info = {} # {gist_path: {"hits": count, "title": str, "desc": str, "topics": []}}
62 + opener = create_opener()
63 +
64 + # Search each keyword as a topic
65 + for keyword in keywords:
66 + url = f"https://gist.kitchain.cn/topics/{urllib.parse.quote(keyword)}"
67 + html = opener.open(urllib.request.Request(url), timeout=10).read().decode()
68 +
69 + # Extract gist URLs and titles
70 + gist_matches = re.findall(
71 + r'<a class="font-bold" href="https://gist\.kitchain\.cn/([^/]+/[a-f0-9]+)">([^<]+)</a>',
72 + html
73 + )
74 +
75 + for gist_path, title in gist_matches:
76 + if gist_path not in gist_info:
77 + # Extract description and topics for this gist
78 + gist_section = re.search(
79 + rf'{re.escape(gist_path)}.*?'
80 + r'<h6 class="text-xs[^"]*">([^<]+)</h6>(.*?)</div>\s*</div>',
81 + html, re.DOTALL
82 + )
83 + desc = ""
84 + topics = []
85 + if gist_section:
86 + desc = gist_section.group(1).strip()
87 + topics_section = gist_section.group(2)
88 + topics = re.findall(r'topics/([^"]+)"[^>]*>([^<]+)<', topics_section)
89 + topics = [t[1] for t in topics] # Extract topic names
90 +
91 + gist_info[gist_path] = {
92 + "hits": 0,
93 + "title": title.strip(),
94 + "desc": desc,
95 + "topics": topics,
96 + "filename": title.strip()
97 + }
98 + gist_info[gist_path]["hits"] += 1
99 +
100 + if not gist_info: return f"No extensions found: {query}"
101 +
102 + # Sort by hit count (descending)
103 + sorted_gists = sorted(gist_info.items(), key=lambda x: x[1]["hits"], reverse=True)[:10]
104 +
105 + result = f"Found {len(sorted_gists)} extensions:\n\n"
106 + for gist_path, info in sorted_gists:
107 + result += f"• {info['title']}\n"
108 + if info['desc']:
109 + result += f" {info['desc']}\n"
110 + if info['topics']:
111 + result += f" Topics: {', '.join(info['topics'])}\n"
112 + result += f" Matched: {info['hits']} keyword(s)\n\n"
113 +
114 + # Return first gist's load URL
115 + first_gist = sorted_gists[0][0]
116 + first_filename = sorted_gists[0][1]['filename']
117 + result += f"To load the top result:\nload({{\"url\": \"https://gist.kitchain.cn/{first_gist}/raw/HEAD/{first_filename}\"}})"
118 + return result
119 + except Exception as e:
120 + return f"error: {e}"
121 +
122 + def load(args):
123 + """Load extension from URL"""
124 + url = args.get("url")
125 + if not url: return "error: url required"
126 + try:
127 + opener = create_opener()
128 + code = opener.open(urllib.request.Request(url), timeout=10).read().decode()
129 + exec(code, {"register_tool": register_tool, "TOOLS": TOOLS, "urllib": urllib, "json": json, "re": re, "subprocess": subprocess})
130 + new = [k for k in TOOLS if k not in ["read","write","edit","glob","grep","bash","web_search","search_extension","load"]]
131 + return f"Loaded. New tools: {', '.join(new)}"
132 + except Exception as e:
133 + return f"error: {e}"
134 +
135 + # --- Tools ---
136 + def read(args):
137 + lines = open(args["path"]).readlines()
138 + offset, limit = args.get("offset", 0), args.get("limit", len(lines))
139 + return "".join(f"{offset+i+1:4}| {l}" for i, l in enumerate(lines[offset:offset+limit]))
140 +
141 + def write(args):
142 + open(args["path"], "w").write(args["content"])
143 + return "ok"
144 +
145 + def edit(args):
146 + text = open(args["path"]).read()
147 + old, new = args["old"], args["new"]
148 + if old not in text: return "error: old_string not found"
149 + count = text.count(old)
150 + if not args.get("all") and count > 1:
151 + return f"error: old_string appears {count} times (use all=true)"
152 + open(args["path"], "w").write(text.replace(old, new) if args.get("all") else text.replace(old, new, 1))
153 + return "ok"
154 +
155 + def glob(args):
156 + pattern = (args.get("path", ".") + "/" + args["pat"]).replace("//", "/")
157 + files = sorted(globlib.glob(pattern, recursive=True),
158 + key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0, reverse=True)
159 + return "\n".join(files) or "none"
160 +
161 + def grep(args):
162 + pattern, hits = re.compile(args["pat"]), []
163 + for fp in globlib.glob(args.get("path", ".") + "/**", recursive=True):
164 + try:
165 + for n, l in enumerate(open(fp), 1):
166 + if pattern.search(l): hits.append(f"{fp}:{n}:{l.rstrip()}")
167 + except: pass
168 + return "\n".join(hits[:50]) or "none"
169 +
170 + def bash(args):
171 + proc = subprocess.Popen(args["cmd"], shell=True, stdout=subprocess.PIPE,
172 + stderr=subprocess.STDOUT, text=True)
173 + lines = []
174 + try:
175 + if proc.stdout:
176 + while True:
177 + line = proc.stdout.readline()
178 + if not line and proc.poll() is not None: break
179 + if line:
180 + print(f" {DIM}│ {line.rstrip()}{RESET}", flush=True)
181 + lines.append(line)
182 + proc.wait(timeout=30)
183 + except subprocess.TimeoutExpired:
184 + proc.kill()
185 + lines.append("\n(timeout)")
186 + return "".join(lines).strip() or "(empty)"
187 +
188 + def web_search(args):
189 + """Search web using DuckDuckGo"""
190 + query, max_results = args["query"], args.get("max_results", 5)
191 + try:
192 + url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote_plus(query)}"
193 + headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"}
194 + opener = create_opener()
195 + html = opener.open(urllib.request.Request(url, headers=headers), timeout=30).read().decode()
196 +
197 + # Extract titles and URLs
198 + links = re.findall(r'class="result__a"[^>]+href="([^"]+)"[^>]*>([^<]+)<', html)
199 + # Extract snippets
200 + snippets = re.findall(r'class="result__snippet"[^>]*>([^<]*)<', html)
201 + if not links: return "No results found"
202 +
203 + results = []
204 + for i, ((link, title), snippet) in enumerate(zip(links[:max_results], snippets[:max_results] + [""] * max_results), 1):
205 + results.append(f"{i}. {title.strip()}\n URL: {link}\n {snippet.strip()}\n")
206 + return "\n".join(results)
207 + except Exception as e:
208 + return f"error: {e}"
209 +
210 +
211 + TOOLS = {
212 + "read": ("Read file with line numbers", {"path": "string", "offset": "number?", "limit": "number?"}, read),
213 + "write": ("Write content to file", {"path": "string", "content": "string"}, write),
214 + "edit": ("Replace old with new in file", {"path": "string", "old": "string", "new": "string", "all": "boolean?"}, edit),
215 + "glob": ("Find files by pattern", {"pat": "string", "path": "string?"}, glob),
216 + "grep": ("Search files for regex", {"pat": "string", "path": "string?"}, grep),
217 + "bash": ("Run shell command", {"cmd": "string"}, bash),
218 + "web_search": ("Search the web using DuckDuckGo", {"query": "string", "max_results": "number?"}, web_search),
219 + "search_extension": ("Search for extensions to add new capabilities (GitHub docs, web scraping, APIs, etc)", {"query": "string"}, search_extension),
220 + "load": ("Load extension from URL to add new tools", {"url": "string"}, load),
221 + }
222 +
223 + def run_tool(name, args):
224 + try: return TOOLS[name][2](args)
225 + except Exception as e: return f"error: {e}"
226 +
227 + def make_schema():
228 + result = []
229 + for name, (desc, params, _) in TOOLS.items():
230 + props, req = {}, []
231 + for pname, ptype in params.items():
232 + opt = ptype.endswith("?")
233 + props[pname] = {"type": "integer" if ptype.rstrip("?") == "number" else ptype.rstrip("?")}
234 + if not opt: req.append(pname)
235 + result.append({"name": name, "description": desc,
236 + "input_schema": {"type": "object", "properties": props, "required": req}})
237 + return result
238 +
239 + def call_api(messages, system_prompt):
240 + headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01"}
241 + if LOCAL_API_KEY: headers["Authorization"] = f"Bearer {LOCAL_API_KEY}"
242 + elif OPENROUTER_KEY: headers["Authorization"] = f"Bearer {OPENROUTER_KEY}"
243 + else: headers["x-api-key"] = os.environ.get("ANTHROPIC_API_KEY", "")
244 +
245 + data = {"model": MODEL, "max_tokens": 8192, "system": system_prompt,
246 + "messages": messages, "tools": make_schema(), "stream": True}
247 +
248 + if os.environ.get("THINKING"):
249 + data["thinking"] = {"type": "enabled", "budget_tokens": int(os.environ.get("THINKING_BUDGET", "10000"))}
250 +
251 + req = urllib.request.Request(API_URL, json.dumps(data).encode(), headers, method="POST")
252 + return create_opener().open(req)
253 +
254 + def process_stream(response):
255 + """简化的流式处理,支持ESC中断"""
256 + global stop_flag
257 + blocks, current, text_buf, json_buf, think_buf = [], None, "", "", ""
258 +
259 + # Save terminal settings
260 + old_settings = termios.tcgetattr(sys.stdin)
261 + try:
262 + tty.setcbreak(sys.stdin.fileno())
263 +
264 + for line in response:
265 + if select.select([sys.stdin], [], [], 0)[0]:
266 + ch = sys.stdin.read(1)
267 + if ch == '\x1b': # ESC key
268 + stop_flag = True
269 + print(f"\n{YELLOW}⏸ Stopped{RESET}")
270 + break
271 +
272 + line = line.decode("utf-8").strip()
273 + if not line.startswith("data: "): continue
274 + if line == "data: [DONE]": continue
275 +
276 + try:
277 + data = json.loads(line[6:])
278 + etype = data.get("type")
279 +
280 + if etype == "content_block_start":
281 + block = data.get("content_block", {})
282 + current = {"type": block.get("type"), "id": block.get("id")}
283 + if current["type"] == "text":
284 + text_buf = ""
285 + print(f"\n{CYAN}⏺{RESET} ", end="", flush=True)
286 + elif current["type"] == "thinking":
287 + think_buf = ""
288 + print(f"\n{YELLOW}💭{RESET} {DIM}", end="", flush=True)
289 + elif current["type"] == "tool_use":
290 + current["name"] = block.get("name")
291 + json_buf = ""
292 +
293 + elif etype == "content_block_delta":
294 + delta = data.get("delta", {})
295 + dtype = delta.get("type")
296 + if dtype == "text_delta":
297 + text = delta.get("text", "")
298 + text_buf += text
299 + print(text, end="", flush=True)
300 + elif dtype == "thinking_delta":
301 + text = delta.get("thinking", "")
302 + think_buf += text
303 + print(text, end="", flush=True)
304 + elif dtype == "input_json_delta" and current:
305 + json_buf += delta.get("partial_json", "")
306 +
307 + elif etype == "content_block_stop" and current:
308 + if current["type"] == "text":
309 + current["text"] = text_buf
310 + print()
311 + elif current["type"] == "thinking":
312 + print(RESET)
313 + elif current["type"] == "tool_use":
314 + try: current["input"] = json.loads(json_buf)
315 + except: current["input"] = {}
316 + blocks.append(current)
317 + current = None
318 + except: pass
319 + finally:
320 + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
321 +
322 + return blocks
323 +
324 + def read_multiline_input():
325 + """Read multiline input. Enter to submit, Alt+Enter for newline."""
326 + lines, current = [], ""
327 + old_settings = termios.tcgetattr(sys.stdin)
328 + try:
329 + tty.setcbreak(sys.stdin.fileno())
330 + print(f"{BOLD}{BLUE}❯{RESET} ", end="", flush=True)
331 + while True:
332 + ch = sys.stdin.read(1)
333 + if ch == '\x03': # Ctrl+C - clear input
334 + lines.clear()
335 + current = ""
336 + print("\r\033[K", end="", flush=True)
337 + print(f"{BOLD}{BLUE}❯{RESET} ", end="", flush=True)
338 + continue
339 + if ch == '\x04': # Ctrl+D
340 + raise EOFError
341 + if ch == '\x1b' and sys.stdin.read(1) in ('\r', '\n'): # Alt+Enter
342 + lines.append(current)
343 + current = ""
344 + print(f"\n{BOLD}{BLUE}│{RESET} ", end="", flush=True)
345 + elif ch in ('\r', '\n'): # Enter
346 + if current: lines.append(current)
347 + break
348 + elif ch in ('\x7f', '\x08'): # Backspace
349 + if current:
350 + current = current[:-1]
351 + print("\r\033[K", end="", flush=True)
352 + print(f"{BOLD}{BLUE}{'│' if lines else '❯'}{RESET} {current}", end="", flush=True)
353 + elif ch.isprintable() or ch == '\t':
354 + current += ch
355 + print(ch, end="", flush=True)
356 + print()
357 + finally:
358 + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
359 + return "\n".join(lines).strip()
360 +
361 + def main():
362 + global stop_flag
363 + # Disable Ctrl+C signal
364 + old_settings = termios.tcgetattr(sys.stdin)
365 + new_settings = termios.tcgetattr(sys.stdin)
366 + new_settings[3] = new_settings[3] & ~termios.ISIG # Disable signal generation
367 + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)
368 +
369 + try:
370 + proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
371 + proxy_info = f" | {DIM}🌐 {proxy}{RESET}" if proxy else ""
372 + thinking_info = f" | {YELLOW}💭{RESET}" if os.environ.get("THINKING") else ""
373 + print(f"{BOLD}nanocode{RESET} | {DIM}{MODEL} | {os.getcwd()}{proxy_info}{thinking_info}{RESET}")
374 + print(f"{DIM}Shortcuts: Enter=submit | Alt+Enter=newline | Ctrl+C=clear | Ctrl+D=exit | ESC=stop{RESET}\n")
375 + run_main_loop()
376 + finally:
377 + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
378 +
379 + def run_main_loop():
380 + messages = []
381 + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
382 + system_prompt = f"""Concise coding assistant. cwd: {os.getcwd()} Current time: {current_time}
383 + IMPORTANT: When you don't have a tool for the task, ALWAYS try search_extension first before saying you can't do it.
384 + Examples:
385 + - User asks about GitHub repo → search_extension({{"query": "github documentation"}})
386 + - User needs web data → search_extension({{"query": "web scraping"}})
387 + - User needs API → search_extension({{"query": "api client"}})"""
388 +
389 + while True:
390 + try:
391 + print(f"{DIM}{'─'*80}{RESET}")
392 + user_input = read_multiline_input()
393 + print(f"{DIM}{'─'*80}{RESET}")
394 +
395 + if not user_input: continue
396 + if user_input in ("/q", "exit"): break
397 + if user_input == "/c":
398 + messages = []
399 + print(f"{GREEN}⏺ Cleared{RESET}")
400 + continue
401 +
402 + messages.append({"role": "user", "content": user_input})
403 +
404 + while True:
405 + stop_flag = False
406 + response = call_api(messages, system_prompt)
407 + blocks = process_stream(response)
408 + if stop_flag: break
409 +
410 + tool_results = []
411 + for block in blocks:
412 + if block["type"] == "tool_use":
413 + name, args = block["name"], block["input"]
414 + preview = str(list(args.values())[0])[:50] if args else ""
415 + print(f"\n{GREEN}⏺ {name}{RESET}({DIM}{preview}{RESET})")
416 +
417 + result = run_tool(name, args)
418 + lines = result.split("\n")
419 + prev = lines[0][:60] + ("..." if len(lines[0]) > 60 else "")
420 + if len(lines) > 1: prev += f" +{len(lines)-1}"
421 + print(f" {DIM}⎿ {prev}{RESET}")
422 +
423 + tool_results.append({"type": "tool_result", "tool_use_id": block["id"], "content": result})
424 +
425 + messages.append({"role": "assistant", "content": blocks})
426 + if not tool_results: break
427 + messages.append({"role": "user", "content": tool_results})
428 +
429 + print()
430 + except EOFError:
431 + break
432 + except Exception as e: print(f"{RED}⏺ Error: {e}{RESET}")
433 +
434 + if __name__ == "__main__":
435 + main()
Newer Older