Last active 1 month ago

Revision a89522fd1a5005735bcd73d95e14689458594983

nano.py Raw
1#!/usr/bin/env python3
2"""nanocode - minimal claude code alternative"""
3import glob as globlib
4import hashlib
5import json
6import os
7import random
8import re
9import readline
10import select
11import ssl
12import subprocess
13import sys
14import termios
15import time
16import tty
17import unicodedata
18import urllib.request
19import urllib.parse
20from datetime import datetime
21
22OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY")
23LOCAL_API_KEY = os.environ.get("LOCAL_API_KEY")
24API_URL = (
25 "http://127.0.0.1:8990/v1/messages" if LOCAL_API_KEY
26 else "https://openrouter.ai/api/v1/messages" if OPENROUTER_KEY
27 else "https://api.anthropic.com/v1/messages"
28)
29MODEL = os.environ.get("MODEL",
30 "anthropic/claude-sonnet-4.5" if LOCAL_API_KEY
31 else "anthropic/claude-opus-4.5" if OPENROUTER_KEY
32 else "claude-opus-4-5"
33)
34
35# ANSI colors
36RESET, BOLD, DIM = "\033[0m", "\033[1m", "\033[2m"
37BLUE, CYAN, GREEN, YELLOW, RED = "\033[34m", "\033[36m", "\033[32m", "\033[33m", "\033[31m"
38stop_flag = False
39
40def detect_clipboard_image():
41 """Detect and return image from clipboard using system commands
42
43 Returns:
44 dict or None: Image content block with metadata, or None if no image
45 """
46 try:
47 import base64
48 import tempfile
49
50 # Save clipboard image to temp file
51 temp_file = None
52 image_data = None
53
54 if sys.platform == 'darwin':
55 # macOS: use pngpaste or osascript
56 temp_file = tempfile.NamedTemporaryFile(suffix='.png', delete=False)
57 temp_file.close()
58
59 # Try pngpaste first (faster)
60 result = subprocess.run(
61 ['pngpaste', temp_file.name],
62 capture_output=True, timeout=2
63 )
64
65 if result.returncode != 0:
66 # Fallback to osascript
67 script = f'''
68 set theFile to POSIX file "{temp_file.name}"
69 try
70 set theImage to the clipboard as «class PNGf»
71 set fileRef to open for access theFile with write permission
72 write theImage to fileRef
73 close access fileRef
74 return "success"
75 on error
76 return "error"
77 end try
78 '''
79 result = subprocess.run(
80 ['osascript', '-e', script],
81 capture_output=True, timeout=2, text=True
82 )
83 if result.stdout.strip() != 'success':
84 os.unlink(temp_file.name)
85 return None
86
87 # Read the file
88 with open(temp_file.name, 'rb') as f:
89 image_data = f.read()
90 os.unlink(temp_file.name)
91
92 elif sys.platform.startswith('linux'):
93 # Linux: use xclip
94 result = subprocess.run(
95 ['xclip', '-selection', 'clipboard', '-t', 'image/png', '-o'],
96 capture_output=True, timeout=2
97 )
98 if result.returncode == 0 and result.stdout:
99 image_data = result.stdout
100 else:
101 return None
102
103 elif sys.platform == 'win32':
104 # Windows: use PowerShell
105 temp_file = tempfile.NamedTemporaryFile(suffix='.png', delete=False)
106 temp_file.close()
107
108 ps_script = f'''
109 Add-Type -AssemblyName System.Windows.Forms
110 $img = [System.Windows.Forms.Clipboard]::GetImage()
111 if ($img) {{
112 $img.Save("{temp_file.name}", [System.Drawing.Imaging.ImageFormat]::Png)
113 exit 0
114 }} else {{
115 exit 1
116 }}
117 '''
118 result = subprocess.run(
119 ['powershell', '-Command', ps_script],
120 capture_output=True, timeout=2
121 )
122
123 if result.returncode == 0:
124 with open(temp_file.name, 'rb') as f:
125 image_data = f.read()
126 os.unlink(temp_file.name)
127 else:
128 os.unlink(temp_file.name)
129 return None
130 else:
131 return None
132
133 if not image_data:
134 return None
135
136 # Get image size (read PNG header)
137 # PNG signature: 8 bytes, then IHDR chunk with width/height
138 if len(image_data) > 24 and image_data[:8] == b'\x89PNG\r\n\x1a\n':
139 width = int.from_bytes(image_data[16:20], 'big')
140 height = int.from_bytes(image_data[20:24], 'big')
141 size = (width, height)
142 else:
143 size = (0, 0)
144
145 # Check if resize needed (Claude API limit: 1568x1568)
146 max_size = 1568
147 needs_resize = size[0] > max_size or size[1] > max_size
148
149 if needs_resize:
150 # Need PIL for resize, skip for now
151 # User can manually resize before pasting
152 pass
153
154 # Encode to base64
155 base64_data = base64.b64encode(image_data).decode('utf-8')
156
157 return {
158 "type": "image",
159 "source": {
160 "type": "base64",
161 "media_type": "image/png",
162 "data": base64_data
163 },
164 "_metadata": {
165 "size": size,
166 "resized": None
167 }
168 }
169 except Exception:
170 return None
171
172def create_opener():
173 """Create URL opener with SSL and proxy support"""
174 proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
175 ssl_ctx = ssl.create_default_context()
176 ssl_ctx.check_hostname = False
177 ssl_ctx.verify_mode = ssl.CERT_NONE
178
179 handlers = [urllib.request.HTTPSHandler(context=ssl_ctx)]
180 if proxy: handlers.insert(0, urllib.request.ProxyHandler({"http": proxy, "https": proxy}))
181 return urllib.request.build_opener(*handlers)
182
183def register_tool(name, desc, params):
184 """Register a tool from extension code"""
185 def decorator(func):
186 TOOLS[name] = (desc, params, func)
187 return func
188 return decorator
189
190def search_extension(args):
191 """Search extensions from gist.kitchain.cn"""
192 query = args.get("query", "")
193 if not query: return "error: query required"
194 try:
195 # Split query into keywords
196 keywords = query.lower().split()
197 gist_info = {} # {gist_path: {"hits": count, "title": str, "desc": str, "topics": []}}
198 opener = create_opener()
199
200 # Search each keyword as a topic
201 for keyword in keywords:
202 url = f"https://gist.kitchain.cn/topics/{urllib.parse.quote(keyword)}"
203 html = opener.open(urllib.request.Request(url), timeout=10).read().decode()
204
205 # Extract gist URLs and titles
206 gist_matches = re.findall(
207 r'<a class="font-bold" href="https://gist\.kitchain\.cn/([^/]+/[a-f0-9]+)">([^<]+)</a>',
208 html
209 )
210
211 for gist_path, title in gist_matches:
212 if gist_path not in gist_info:
213 # Extract description and topics for this gist
214 gist_section = re.search(
215 rf'{re.escape(gist_path)}.*?'
216 r'<h6 class="text-xs[^"]*">([^<]+)</h6>(.*?)</div>\s*</div>',
217 html, re.DOTALL
218 )
219 desc = ""
220 topics = []
221 if gist_section:
222 desc = gist_section.group(1).strip()
223 topics_section = gist_section.group(2)
224 topics = re.findall(r'topics/([^"]+)"[^>]*>([^<]+)<', topics_section)
225 topics = [t[1] for t in topics] # Extract topic names
226
227 gist_info[gist_path] = {
228 "hits": 0,
229 "title": title.strip(),
230 "desc": desc,
231 "topics": topics,
232 "filename": title.strip()
233 }
234 gist_info[gist_path]["hits"] += 1
235
236 if not gist_info: return f"No extensions found: {query}"
237
238 # Sort by hit count (descending)
239 sorted_gists = sorted(gist_info.items(), key=lambda x: x[1]["hits"], reverse=True)[:10]
240
241 result = f"Found {len(sorted_gists)} extensions:\n\n"
242 for gist_path, info in sorted_gists:
243 result += f"{info['title']}\n"
244 if info['desc']:
245 result += f" {info['desc']}\n"
246 if info['topics']:
247 result += f" Topics: {', '.join(info['topics'])}\n"
248 result += f" Matched: {info['hits']} keyword(s)\n\n"
249
250 # Return first gist's load URL
251 first_gist = sorted_gists[0][0]
252 first_filename = sorted_gists[0][1]['filename']
253 result += f"To load the top result:\nload({{\"url\": \"https://gist.kitchain.cn/{first_gist}/raw/HEAD/{first_filename}\"}})"
254 return result
255 except Exception as e:
256 return f"error: {e}"
257
258def load(args):
259 """Load extension from URL"""
260 url = args.get("url")
261 if not url: return "error: url required"
262 try:
263 opener = create_opener()
264 code = opener.open(urllib.request.Request(url), timeout=10).read().decode()
265 exec(code, {"register_tool": register_tool, "TOOLS": TOOLS, "urllib": urllib, "json": json, "re": re, "subprocess": subprocess})
266 new = [k for k in TOOLS if k not in ["read","write","edit","glob","grep","bash","web_search","search_extension","load"]]
267 return f"Loaded. New tools: {', '.join(new)}"
268 except Exception as e:
269 return f"error: {e}"
270
271# --- Tools ---
272def read(args):
273 lines = open(args["path"]).readlines()
274 offset, limit = args.get("offset", 0), args.get("limit", len(lines))
275 return "".join(f"{offset+i+1:4}| {l}" for i, l in enumerate(lines[offset:offset+limit]))
276
277def write(args):
278 filepath = args["path"]
279 content = args["content"]
280 print(f"{DIM}[LOG] write: {filepath} ({len(content)} bytes){RESET}", flush=True)
281 open(filepath, "w").write(content)
282 print(f"{DIM}[LOG] write completed: {filepath}{RESET}", flush=True)
283 return "ok"
284
285def edit(args):
286 filepath = args["path"]
287 print(f"{DIM}[LOG] edit: {filepath}{RESET}", flush=True)
288 text = open(filepath).read()
289 print(f"{DIM}[LOG] edit read: {len(text)} bytes{RESET}", flush=True)
290 old, new = args["old"], args["new"]
291 if old not in text: return "error: old_string not found"
292 count = text.count(old)
293 if not args.get("all") and count > 1:
294 return f"error: old_string appears {count} times (use all=true)"
295 result = text.replace(old, new) if args.get("all") else text.replace(old, new, 1)
296 print(f"{DIM}[LOG] edit writing: {len(result)} bytes{RESET}", flush=True)
297 open(filepath, "w").write(result)
298 print(f"{DIM}[LOG] edit completed: {filepath}{RESET}", flush=True)
299 return "ok"
300
301def glob(args):
302 pattern = (args.get("path", ".") + "/" + args["pat"]).replace("//", "/")
303 files = sorted(globlib.glob(pattern, recursive=True),
304 key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0, reverse=True)
305 return "\n".join(files) or "none"
306
307def grep(args):
308 pattern, hits = re.compile(args["pat"]), []
309 for fp in globlib.glob(args.get("path", ".") + "/**", recursive=True):
310 try:
311 for n, l in enumerate(open(fp), 1):
312 if pattern.search(l): hits.append(f"{fp}:{n}:{l.rstrip()}")
313 except: pass
314 return "\n".join(hits[:50]) or "none"
315
316def bash(args):
317 global stop_flag
318 proc = subprocess.Popen(args["cmd"], shell=True, stdout=subprocess.PIPE,
319 stderr=subprocess.STDOUT, text=True)
320 lines = []
321 old_settings = termios.tcgetattr(sys.stdin)
322 try:
323 tty.setcbreak(sys.stdin.fileno())
324 if proc.stdout:
325 import fcntl
326 fd = proc.stdout.fileno()
327 fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
328
329 while True:
330 # Check ESC key
331 if select.select([sys.stdin], [], [], 0)[0]:
332 if sys.stdin.read(1) == '\x1b':
333 stop_flag = True
334 proc.kill()
335 lines.append("\n(stopped)")
336 print(f"\n{YELLOW}⏸ Stopped{RESET}")
337 break
338
339 # Read output
340 if select.select([proc.stdout], [], [], 0.1)[0]:
341 line = proc.stdout.readline()
342 if line:
343 print(f" {DIM}{line.rstrip()}{RESET}", flush=True)
344 lines.append(line)
345
346 # Check if done
347 if proc.poll() is not None:
348 remaining = proc.stdout.read()
349 if remaining:
350 for line in remaining.split('\n'):
351 if line:
352 print(f" {DIM}{line.rstrip()}{RESET}", flush=True)
353 lines.append(line + '\n')
354 break
355
356 if not stop_flag:
357 proc.wait(timeout=30)
358 except subprocess.TimeoutExpired:
359 proc.kill()
360 lines.append("\n(timeout)")
361 finally:
362 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
363
364 return "".join(lines).strip() or "(empty)"
365
366def web_search(args):
367 """Search web using DuckDuckGo"""
368 query, max_results = args["query"], args.get("max_results", 5)
369 try:
370 url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote_plus(query)}"
371 headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"}
372 opener = create_opener()
373 html = opener.open(urllib.request.Request(url, headers=headers), timeout=30).read().decode()
374
375 # Extract titles and URLs
376 links = re.findall(r'class="result__a"[^>]+href="([^"]+)"[^>]*>([^<]+)<', html)
377 # Extract snippets
378 snippets = re.findall(r'class="result__snippet"[^>]*>([^<]*)<', html)
379 if not links: return "No results found"
380
381 results = []
382 for i, ((link, title), snippet) in enumerate(zip(links[:max_results], snippets[:max_results] + [""] * max_results), 1):
383 results.append(f"{i}. {title.strip()}\n URL: {link}\n {snippet.strip()}\n")
384 return "\n".join(results)
385 except Exception as e:
386 return f"error: {e}"
387
388
389TOOLS = {
390 "read": ("Read file with line numbers", {"path": "string", "offset": "number?", "limit": "number?"}, read),
391 "write": ("Write content to file", {"path": "string", "content": "string"}, write),
392 "edit": ("Replace old with new in file", {"path": "string", "old": "string", "new": "string", "all": "boolean?"}, edit),
393 "glob": ("Find files by pattern", {"pat": "string", "path": "string?"}, glob),
394 "grep": ("Search files for regex", {"pat": "string", "path": "string?"}, grep),
395 "bash": ("Run shell command", {"cmd": "string"}, bash),
396 "web_search": ("Search the web using DuckDuckGo", {"query": "string", "max_results": "number?"}, web_search),
397 "search_extension": ("Search for extensions to add new capabilities (GitHub docs, web scraping, APIs, etc)", {"query": "string"}, search_extension),
398 "load": ("Load extension from URL to add new tools", {"url": "string"}, load),
399}
400
401def run_tool(name, args):
402 try: return TOOLS[name][2](args)
403 except Exception as e: return f"error: {e}"
404
405def make_schema():
406 result = []
407 for name, (desc, params, _) in TOOLS.items():
408 props, req = {}, []
409 for pname, ptype in params.items():
410 opt = ptype.endswith("?")
411 props[pname] = {"type": "integer" if ptype.rstrip("?") == "number" else ptype.rstrip("?")}
412 if not opt: req.append(pname)
413 result.append({"name": name, "description": desc,
414 "input_schema": {"type": "object", "properties": props, "required": req}})
415 return result
416
417def call_api(messages, system_prompt, stream=True, enable_thinking=True, use_tools=True):
418 headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01"}
419 if LOCAL_API_KEY: headers["Authorization"] = f"Bearer {LOCAL_API_KEY}"
420 elif OPENROUTER_KEY: headers["Authorization"] = f"Bearer {OPENROUTER_KEY}"
421 else: headers["x-api-key"] = os.environ.get("ANTHROPIC_API_KEY", "")
422
423 data = {"model": MODEL, "max_tokens": 8192, "system": system_prompt,
424 "messages": messages, "stream": stream}
425
426 if use_tools:
427 data["tools"] = make_schema()
428
429 if enable_thinking and os.environ.get("THINKING"):
430 data["thinking"] = {"type": "enabled", "budget_tokens": int(os.environ.get("THINKING_BUDGET", "10000"))}
431
432 req = urllib.request.Request(API_URL, json.dumps(data).encode(), headers, method="POST")
433 return create_opener().open(req)
434
435def summarize_changes(user_input, files_modified, checkpoint_manager, checkpoint_id):
436 """Use LLM to summarize the changes made in this turn
437
438 Args:
439 user_input: User's request
440 files_modified: Set of modified file paths
441 checkpoint_manager: CheckpointManager instance
442 checkpoint_id: Checkpoint hash to get diff from
443
444 Returns:
445 str: One-line summary of changes
446 """
447 if not files_modified or not checkpoint_id:
448 return user_input[:50]
449
450 try:
451 # Get diff from git
452 diff_output = checkpoint_manager._git_command(
453 "--git-dir", checkpoint_manager.bare_repo,
454 "show", "--format=", checkpoint_id
455 )
456
457 # Check if diff is empty or error - no actual changes
458 if not diff_output or diff_output.startswith("error") or len(diff_output.strip()) == 0:
459 # No diff available, just use user input
460 return user_input[:50]
461
462 # Limit diff size to avoid token overflow (max ~3000 chars)
463 if len(diff_output) > 3000:
464 diff_output = diff_output[:3000] + "\n... (truncated)"
465
466 summary_prompt = f"""Based on the actual code changes (diff), generate a brief Chinese summary (max 30 Chinese characters).
467
468IMPORTANT: Must be based on the actual code changes, not the user's description.
469
470Code changes (diff):
471{diff_output}
472
473User description (for reference only): {user_input}
474
475Requirements:
4761. Describe what code/functionality was actually modified
4772. Reply in Chinese only, no explanation
4783. No quotes
4794. Max 30 Chinese characters
480
481Good examples:
482- 在 auth.py 添加 JWT 验证
483- 修复 parser.py 空指针异常
484- 重构 database.py 连接池
485- 更新 README 添加安装说明
486"""
487
488 messages = [{"role": "user", "content": summary_prompt}]
489 response = call_api(messages, "You are a code change analyzer, skilled at extracting key information from diffs. Reply in Chinese.",
490 stream=False, enable_thinking=False, use_tools=False)
491
492 # Parse non-streaming response
493 data = json.loads(response.read().decode())
494 blocks = data.get("content", [])
495
496 for block in blocks:
497 if block.get("type") == "text":
498 summary = block.get("text", "").strip()
499
500 # Remove thinking tags if present
501 if "<thinking>" in summary:
502 # Extract content after </thinking>
503 parts = summary.split("</thinking>")
504 if len(parts) > 1:
505 summary = parts[-1].strip()
506
507 # Clean up and limit length
508 summary = summary.replace('"', '').replace("'", "")
509 if summary and len(summary) <= 80:
510 return summary
511
512 # Fallback to user input
513 return user_input[:50]
514 except Exception as e:
515 # On error, fallback to user input
516 return user_input[:50]
517
518def process_stream(response):
519 """简化的流式处理,支持ESC中断"""
520 global stop_flag
521 blocks, current, text_buf, json_buf, think_buf = [], None, "", "", ""
522
523 # Save terminal settings
524 old_settings = termios.tcgetattr(sys.stdin)
525 try:
526 tty.setcbreak(sys.stdin.fileno())
527
528 for line in response:
529 if select.select([sys.stdin], [], [], 0)[0]:
530 ch = sys.stdin.read(1)
531 if ch == '\x1b': # ESC key
532 stop_flag = True
533 print(f"\n{YELLOW}⏸ Stopped{RESET}")
534 break
535
536 line = line.decode("utf-8").strip()
537 if not line.startswith("data: "): continue
538 if line == "data: [DONE]": continue
539
540 try:
541 data = json.loads(line[6:])
542 etype = data.get("type")
543
544 if etype == "content_block_start":
545 block = data.get("content_block", {})
546 current = {"type": block.get("type"), "id": block.get("id")}
547 if current["type"] == "text":
548 text_buf = ""
549 print(f"\n{CYAN}{RESET} ", end="", flush=True)
550 elif current["type"] == "thinking":
551 think_buf = ""
552 print(f"\n{YELLOW}💭{RESET} {DIM}", end="", flush=True)
553 elif current["type"] == "tool_use":
554 current["name"] = block.get("name")
555 json_buf = ""
556
557 elif etype == "content_block_delta":
558 delta = data.get("delta", {})
559 dtype = delta.get("type")
560 if dtype == "text_delta":
561 text = delta.get("text", "")
562 text_buf += text
563 print(text, end="", flush=True)
564 elif dtype == "thinking_delta":
565 text = delta.get("thinking", "")
566 think_buf += text
567 print(text, end="", flush=True)
568 elif dtype == "input_json_delta" and current:
569 json_buf += delta.get("partial_json", "")
570
571 elif etype == "content_block_stop" and current:
572 if current["type"] == "text":
573 current["text"] = text_buf
574 print()
575 elif current["type"] == "thinking":
576 print(RESET)
577 elif current["type"] == "tool_use":
578 try: current["input"] = json.loads(json_buf)
579 except: current["input"] = {}
580 blocks.append(current)
581 current = None
582 except: pass
583 finally:
584 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
585
586 return blocks
587
588def get_display_width(text):
589 """Get display width of text (CJK chars = 2, others = 1)"""
590 width = 0
591 for char in text:
592 if unicodedata.east_asian_width(char) in ('F', 'W'):
593 width += 2
594 else:
595 width += 1
596 return width
597
598def format_tool_preview(name, args):
599 """Format tool call preview based on tool type"""
600 if name == "read":
601 path = args.get("path", "")
602 offset = args.get("offset")
603 limit = args.get("limit")
604 if offset or limit:
605 return f"{path}, offset={offset or 0}, limit={limit or 'all'}"
606 return path
607
608 elif name == "write":
609 path = args.get("path", "")
610 content = args.get("content", "")
611 return f"{path}, {len(content)} bytes"
612
613 elif name == "edit":
614 path = args.get("path", "")
615 old = args.get("old", "")
616 new = args.get("new", "")
617 all_flag = args.get("all", False)
618 old_preview = old[:20] + "..." if len(old) > 20 else old
619 new_preview = new[:20] + "..." if len(new) > 20 else new
620 flag_str = ", all=true" if all_flag else ""
621 return f"{path}: '{old_preview}''{new_preview}'{flag_str}"
622
623 elif name == "bash":
624 return args.get("cmd", "")
625
626 elif name == "glob":
627 pattern = args.get("pat", "")
628 path = args.get("path", ".")
629 return f"{pattern} in {path}"
630
631 elif name == "grep":
632 pattern = args.get("pat", "")
633 path = args.get("path", ".")
634 return f"/{pattern}/ in {path}"
635
636 elif name == "web_search":
637 query = args.get("query", "")
638 max_results = args.get("max_results", 5)
639 return f"{query}, max={max_results}"
640
641 elif name == "search_extension":
642 return args.get("query", "")
643
644 elif name == "load":
645 url = args.get("url", "")
646 # Show filename from URL
647 filename = url.split("/")[-1] if url else ""
648 return filename or url
649
650 else:
651 # Fallback: show first value
652 return str(list(args.values())[0])[:50] if args else ""
653
654def format_tool_result(name, result):
655 """Format tool result preview based on tool type"""
656 if not result:
657 return "(empty)"
658
659 lines = result.split("\n")
660
661 # For simple results (ok, error, etc), show as-is
662 if len(lines) == 1 and len(result) < 80:
663 return result
664
665 # For multi-line results
666 first_line = lines[0][:60]
667 if len(lines[0]) > 60:
668 first_line += "..."
669
670 if len(lines) > 1:
671 return f"{first_line} +{len(lines)-1} lines"
672
673 return first_line
674
675def is_file_in_project(filepath, project_path):
676 """Check if file is within project directory"""
677 try:
678 abs_file = os.path.abspath(filepath)
679 abs_project = os.path.abspath(project_path)
680 # Check if file is under project directory
681 return abs_file.startswith(abs_project + os.sep) or abs_file == abs_project
682 except:
683 return False
684
685def read_multiline_input(prefill=""):
686 """Read multiline input. Enter to submit, Alt+Enter for newline.
687 Supports pasting images from clipboard.
688
689 Args:
690 prefill: Text to prefill in the input box
691
692 Returns:
693 tuple: (text: str, images: list[dict])
694 """
695 lines = []
696 current = prefill
697 cursor_pos = len(prefill) # Cursor at end of prefill text
698 images = [] # Store pasted images
699
700 # Enable bracketed paste mode
701 print("\033[?2004h", end="", flush=True)
702
703 old_settings = termios.tcgetattr(sys.stdin)
704 try:
705 tty.setcbreak(sys.stdin.fileno())
706 print(f"{BOLD}{BLUE}{RESET} {current}", end="", flush=True)
707
708 while True:
709 ch = sys.stdin.read(1)
710
711 if ch == '\x03': # Ctrl+C - clear input
712 lines.clear()
713 current = ""
714 cursor_pos = 0
715 print("\r\033[K", end="", flush=True)
716 print(f"{BOLD}{BLUE}{RESET} ", end="", flush=True)
717 continue
718
719 if ch == '\x05': # Ctrl+E
720 raise EOFError
721
722 if ch == '\x16': # Ctrl+V - paste image
723 img_block = detect_clipboard_image()
724
725 if img_block:
726 # Image detected!
727 images.append(img_block)
728 size = img_block.get('_metadata', {}).get('size', (0, 0))
729 resized = img_block.get('_metadata', {}).get('resized')
730
731 # Create image marker
732 if resized:
733 img_marker = f"[📷 Image {len(images)}: {size[0]}x{size[1]}{resized[0]}x{resized[1]}]"
734 else:
735 img_marker = f"[📷 Image {len(images)}: {size[0]}x{size[1]}]"
736
737 # Insert marker at cursor position
738 current = current[:cursor_pos] + img_marker + current[cursor_pos:]
739 cursor_pos += len(img_marker)
740
741 # Redraw current line
742 prefix = f"{BOLD}{BLUE}{'' if lines else ''}{RESET} "
743 print(f"\r\033[K{prefix}{current}", end="", flush=True)
744 else:
745 # No image in clipboard
746 print(f"\r{YELLOW}⚠ No image in clipboard{RESET}")
747 import time
748 time.sleep(1)
749 # Redraw current line
750 prefix = f"{BOLD}{BLUE}{'' if lines else ''}{RESET} "
751 print(f"\r\033[K{prefix}{current}", end="", flush=True)
752
753 continue
754
755 if ch == '\x1b': # Escape sequence
756 next_ch = sys.stdin.read(1)
757 if next_ch in ('\r', '\n'): # Alt+Enter
758 lines.append(current)
759 current = ""
760 cursor_pos = 0
761 print(f"\n{BOLD}{BLUE}{RESET} ", end="", flush=True)
762 elif next_ch == '[': # Escape sequence
763 seq = sys.stdin.read(1)
764 if seq == 'C': # Right arrow
765 if cursor_pos < len(current):
766 # Get display width of character at cursor
767 char_width = get_display_width(current[cursor_pos])
768 cursor_pos += 1
769 # Move cursor by actual display width
770 if char_width == 2:
771 print("\033[2C", end="", flush=True)
772 else:
773 print("\033[C", end="", flush=True)
774 elif seq == 'D': # Left arrow
775 if cursor_pos > 0:
776 cursor_pos -= 1
777 # Get display width of character before cursor
778 char_width = get_display_width(current[cursor_pos])
779 # Move cursor by actual display width
780 if char_width == 2:
781 print("\033[2D", end="", flush=True)
782 else:
783 print("\033[D", end="", flush=True)
784 elif seq == '2': # Bracketed paste start: ESC[200~
785 rest = sys.stdin.read(3) # Read "00~"
786 if rest == '00~':
787 # Read pasted content until ESC[201~
788 paste_buf = ""
789 while True:
790 c = sys.stdin.read(1)
791 if c == '\x1b':
792 # Check for [201~
793 peek = sys.stdin.read(5)
794 if peek == '[201~':
795 break
796 else:
797 paste_buf += c + peek
798 else:
799 paste_buf += c
800
801 # Process pasted content
802 paste_lines = paste_buf.split('\n')
803
804 if len(paste_lines) == 1:
805 # Single line paste
806 current = current[:cursor_pos] + paste_lines[0] + current[cursor_pos:]
807 cursor_pos += len(paste_lines[0])
808 prefix = f"{BOLD}{BLUE}{'' if lines else ''}{RESET} "
809 print(f"\r\033[K{prefix}{current}", end="", flush=True)
810 else:
811 # Multi-line paste
812 # First line appends to current
813 first_line = current[:cursor_pos] + paste_lines[0]
814 print(paste_lines[0], end="", flush=True)
815 if first_line:
816 lines.append(first_line)
817
818 # Middle lines
819 for line in paste_lines[1:-1]:
820 print(f"\n{BOLD}{BLUE}{RESET} {line}", end="", flush=True)
821 lines.append(line)
822
823 # Last line becomes new current
824 current = paste_lines[-1]
825 cursor_pos = len(current)
826 print(f"\n{BOLD}{BLUE}{RESET} {current}", end="", flush=True)
827 continue
828
829 if ch in ('\r', '\n'): # Enter - submit
830 if current:
831 lines.append(current)
832 print()
833 break
834
835 if ch in ('\x7f', '\x08'): # Backspace
836 if cursor_pos > 0:
837 # Delete character before cursor
838 current = current[:cursor_pos-1] + current[cursor_pos:]
839 cursor_pos -= 1
840 # Redraw current line
841 prefix = f"{BOLD}{BLUE}{'' if lines else ''}{RESET} "
842 print(f"\r\033[K{prefix}{current}", end="", flush=True)
843 # Move cursor back to position
844 if cursor_pos < len(current):
845 # Calculate display width from cursor to end
846 remaining_width = get_display_width(current[cursor_pos:])
847 print(f"\033[{remaining_width}D", end="", flush=True)
848 elif lines:
849 # Merge with previous line
850 prev_line = lines.pop()
851 cursor_pos = len(prev_line) # Cursor at end of previous line
852 current = prev_line + current
853 # Move up and redraw
854 print("\033[A\033[K", end="", flush=True)
855 prefix = f"{BOLD}{BLUE}{'' if lines else ''}{RESET} "
856 print(f"\r{prefix}{current}", end="", flush=True)
857 if cursor_pos < len(current):
858 # Calculate display width from cursor to end
859 remaining_width = get_display_width(current[cursor_pos:])
860 print(f"\033[{remaining_width}D", end="", flush=True)
861 continue
862
863 if ch.isprintable() or ch == '\t':
864 # Insert character at cursor position
865 current = current[:cursor_pos] + ch + current[cursor_pos:]
866 cursor_pos += 1
867 # Redraw from cursor position
868 print(f"{ch}{current[cursor_pos:]}", end="", flush=True)
869 # Move cursor back if needed
870 if cursor_pos < len(current):
871 # Calculate display width from cursor to end
872 remaining_width = get_display_width(current[cursor_pos:])
873 print(f"\033[{remaining_width}D", end="", flush=True)
874
875 finally:
876 # Disable bracketed paste mode
877 print("\033[?2004l", end="", flush=True)
878 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
879
880 text = "\n".join(lines).strip()
881 return text, images
882
883def main():
884 global stop_flag
885 # Parse command line arguments
886 continue_session = "-c" in sys.argv or "--continue" in sys.argv
887 list_sessions = "-l" in sys.argv or "--list" in sys.argv
888
889 # Disable Ctrl+C signal
890 old_settings = termios.tcgetattr(sys.stdin)
891 new_settings = termios.tcgetattr(sys.stdin)
892 new_settings[3] = new_settings[3] & ~termios.ISIG # Disable signal generation
893 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)
894
895 try:
896 proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
897 proxy_info = f" | {DIM}🌐 {proxy}{RESET}" if proxy else ""
898 thinking_info = f" | {YELLOW}💭{RESET}" if os.environ.get("THINKING") else ""
899
900 if list_sessions:
901 session_mode = f" | {YELLOW}Select{RESET}"
902 elif continue_session:
903 session_mode = f" | {GREEN}Continue{RESET}"
904 else:
905 session_mode = f" | {CYAN}New{RESET}"
906
907 print(f"{BOLD}nanocode{RESET} | {DIM}{MODEL} | {os.getcwd()}{proxy_info}{thinking_info}{session_mode}{RESET}")
908 print(f"{DIM}Shortcuts: Enter=submit | Alt+Enter=newline | Ctrl+V=paste image | Ctrl+C=clear | Ctrl+E=exit | ESC=stop{RESET}")
909 print(f"{DIM}Commands: /c [all|baseline|<id>] | /ca | /t | /clear{RESET}")
910 print(f"{DIM}Usage: nanocode (new) | nanocode -c (continue) | nanocode -l (select){RESET}\n")
911
912 selected_session_id = None
913 if list_sessions:
914 selected_session_id = select_session_interactive()
915 if not selected_session_id:
916 print(f"{DIM}Exiting...{RESET}")
917 return
918
919 run_main_loop(continue_session, selected_session_id)
920 finally:
921 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
922
923def select_session_interactive():
924 """Display sessions and let user select one
925
926 Returns:
927 session_id: Selected session ID, or None if cancelled
928 """
929 session_manager = SessionManager(os.getcwd())
930 sessions = session_manager.list_sessions()[:10] # Limit to 10 most recent
931
932 if not sessions:
933 print(f"{YELLOW}⚠ No previous sessions found{RESET}")
934 print(f"{DIM}Starting new session...{RESET}\n")
935 return None
936
937 print(f"{BOLD}📂 Recent Sessions:{RESET}\n")
938
939 for i, sess_info in enumerate(sessions, 1):
940 created = datetime.fromtimestamp(sess_info['metadata']['created_at']).strftime('%Y-%m-%d %H:%M')
941 last_active = datetime.fromtimestamp(sess_info['metadata']['last_active']).strftime('%Y-%m-%d %H:%M')
942 desc = sess_info['metadata'].get('description', '(no description)')
943
944 # Git info
945 git_commit = sess_info['metadata'].get('git_commit')
946 git_branch = sess_info['metadata'].get('git_branch')
947 git_dirty = sess_info['metadata'].get('git_dirty', False)
948
949 print(f"{CYAN}{i}.{RESET} {BOLD}{sess_info['session_id']}{RESET}")
950 print(f" {desc}")
951
952 git_info = ""
953 if git_commit and git_branch:
954 dirty_mark = f"{YELLOW}*{RESET}" if git_dirty else ""
955 git_info = f" | Git: {git_branch}@{git_commit}{dirty_mark}"
956
957 print(f" Created: {created} | Last: {last_active} | {sess_info['message_count']} messages{git_info}\n")
958
959 print(f"{DIM}Enter session number (1-{len(sessions)}), or press Enter for new session:{RESET}")
960
961 try:
962 choice = input(f"{BOLD}{BLUE}{RESET} ").strip()
963
964 if not choice:
965 # Empty input = new session
966 return None
967
968 try:
969 idx = int(choice) - 1
970 if 0 <= idx < len(sessions):
971 return sessions[idx]['session_id']
972 else:
973 print(f"{RED}✗ Invalid number{RESET}")
974 return None
975 except ValueError:
976 print(f"{RED}✗ Invalid input{RESET}")
977 return None
978 except (EOFError, KeyboardInterrupt):
979 return None
980
981
982def run_main_loop(continue_session=False, selected_session_id=None):
983 # Initialize session manager
984 session_manager = SessionManager(os.getcwd())
985
986 # Load or create session based on parameters
987 if selected_session_id:
988 # Load specific session selected by user
989 session = session_manager.load_session(selected_session_id)
990 if session:
991 git_info = ""
992 git_commit = session.metadata.get('git_commit')
993 git_branch = session.metadata.get('git_branch')
994 if git_commit and git_branch:
995 git_dirty = session.metadata.get('git_dirty', False)
996 dirty_mark = f"{YELLOW}*{RESET}" if git_dirty else ""
997 git_info = f" | Git: {git_branch}@{git_commit}{dirty_mark}"
998
999 print(f"{GREEN}✓ Loaded session: {session.session_id}{RESET}")
1000 print(f"{DIM} └─ {len(session.messages)} messages{git_info}{RESET}")
1001
1002 # Check for conflicts
1003 conflicts = session.detect_conflicts()
1004 if conflicts:
1005 print(f"\n{YELLOW}⚠ File conflicts detected:{RESET}")
1006 for filepath in conflicts[:5]:
1007 print(f" - {filepath}")
1008 if len(conflicts) > 5:
1009 print(f" ... and {len(conflicts)-5} more")
1010 print(f"\n{DIM}These files have been modified outside this session.{RESET}")
1011 confirm = input(f"{BOLD}Continue anyway? (y/N/u=update): {RESET}").strip().lower()
1012
1013 if confirm == 'u':
1014 session.update_file_states()
1015 session_manager.save_session()
1016 print(f"{GREEN}✓ Updated file states{RESET}\n")
1017 elif confirm != 'y':
1018 print(f"{DIM}Creating new session instead...{RESET}\n")
1019 session_manager.create_session()
1020 else:
1021 print()
1022 else:
1023 print()
1024 else:
1025 print(f"{RED}✗ Failed to load session{RESET}")
1026 print(f"{GREEN}✓ Creating new session instead{RESET}\n")
1027 session_manager.create_session()
1028 elif continue_session:
1029 # Continue last session
1030 last_session = session_manager.load_last_session()
1031 if last_session:
1032 git_info = ""
1033 git_commit = last_session.metadata.get('git_commit')
1034 git_branch = last_session.metadata.get('git_branch')
1035 if git_commit and git_branch:
1036 git_dirty = last_session.metadata.get('git_dirty', False)
1037 dirty_mark = f"{YELLOW}*{RESET}" if git_dirty else ""
1038 git_info = f" | Git: {git_branch}@{git_commit}{dirty_mark}"
1039
1040 print(f"{GREEN}✓ Continued session: {last_session.session_id}{RESET}")
1041 print(f"{DIM} └─ {len(last_session.messages)} messages{git_info}{RESET}")
1042
1043 # Check for conflicts
1044 conflicts = last_session.detect_conflicts()
1045 if conflicts:
1046 print(f"\n{YELLOW}⚠ File conflicts detected:{RESET}")
1047 for filepath in conflicts[:5]:
1048 print(f" - {filepath}")
1049 if len(conflicts) > 5:
1050 print(f" ... and {len(conflicts)-5} more")
1051 print(f"\n{DIM}These files have been modified outside this session.{RESET}")
1052 confirm = input(f"{BOLD}Continue anyway? (y/N/u=update): {RESET}").strip().lower()
1053
1054 if confirm == 'u':
1055 last_session.update_file_states()
1056 session_manager.save_session()
1057 print(f"{GREEN}✓ Updated file states{RESET}\n")
1058 elif confirm != 'y':
1059 print(f"{DIM}Creating new session instead...{RESET}\n")
1060 session_manager.create_session()
1061 else:
1062 print()
1063 else:
1064 print()
1065 else:
1066 # No previous session, create new one
1067 session_manager.create_session()
1068 print(f"{YELLOW}⚠ No previous session found{RESET}")
1069 print(f"{GREEN}✓ Created new session: {session_manager.current_session.session_id}{RESET}\n")
1070 else:
1071 # Always create new session by default
1072 # Try to detect parent from last session's latest checkpoint
1073 parent_checkpoint = None
1074 parent_session = None
1075
1076 last_session = session_manager.load_last_session()
1077 if last_session:
1078 # Get the latest checkpoint from last session
1079 checkpoints = session_manager.checkpoint_manager.list_checkpoints(show_all=False)
1080 if checkpoints:
1081 parent_checkpoint = checkpoints[0][0] # Latest checkpoint hash
1082 parent_session = last_session.session_id
1083
1084 session_manager.create_session(
1085 parent_checkpoint=parent_checkpoint,
1086 parent_session=parent_session
1087 )
1088
1089 git_info = ""
1090 git_commit = session_manager.current_session.metadata.get('git_commit')
1091 git_branch = session_manager.current_session.metadata.get('git_branch')
1092 if git_commit and git_branch:
1093 git_dirty = session_manager.current_session.metadata.get('git_dirty', False)
1094 dirty_mark = f"{YELLOW}*{RESET}" if git_dirty else ""
1095 git_info = f" | Git: {git_branch}@{git_commit}{dirty_mark}"
1096
1097 if parent_checkpoint:
1098 print(f"{GREEN}✓ Created new session: {session_manager.current_session.session_id}{RESET}")
1099 print(f"{DIM} └─ Branched from {parent_session[:8]}... @ {parent_checkpoint}{git_info}{RESET}\n")
1100 else:
1101 print(f"{GREEN}✓ Created new session: {session_manager.current_session.session_id}{RESET}")
1102 if git_info:
1103 print(f"{DIM} └─{git_info}{RESET}\n")
1104 else:
1105 print()
1106
1107 files_modified = set()
1108 auto_checkpoint = True
1109
1110 current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
1111 system_prompt = f"""Concise coding assistant. cwd: {os.getcwd()} Current time: {current_time}
1112IMPORTANT: When you don't have a tool for the task, ALWAYS try search_extension first before saying you can't do it.
1113Examples:
1114- User asks about GitHub repo → search_extension({{"query": "github documentation"}})
1115- User needs web data → search_extension({{"query": "web scraping"}})
1116- User needs API → search_extension({{"query": "api client"}})"""
1117
1118 prefill_next_input = "" # Store prefill text for next input
1119
1120 while True:
1121 try:
1122 print(f"{DIM}{''*80}{RESET}")
1123 user_input, images = read_multiline_input(prefill_next_input)
1124 prefill_next_input = "" # Clear after use
1125 print(f"{DIM}{''*80}{RESET}")
1126
1127 if not user_input and not images: continue
1128 if user_input in ("/q", "exit"):
1129 session_manager.save_session()
1130 break
1131
1132 # Handle /clear command first (before /c to avoid conflict)
1133 if user_input == "/clear":
1134 # Save current session
1135 session_manager.save_session()
1136
1137 # Get latest checkpoint from current session (if any)
1138 checkpoints = session_manager.checkpoint_manager.list_checkpoints(show_all=False)
1139 parent_checkpoint = checkpoints[0][0] if checkpoints else None
1140 parent_session = session_manager.current_session.session_id
1141
1142 # Create new session branched from current
1143 session_manager.create_session(
1144 parent_checkpoint=parent_checkpoint,
1145 parent_session=parent_session
1146 )
1147
1148 # Reset state
1149 files_modified.clear()
1150
1151 print(f"{GREEN}✓ Started new session: {session_manager.current_session.session_id}{RESET}")
1152 if parent_checkpoint:
1153 print(f"{DIM} └─ Branched from {parent_session[:8]}... @ {parent_checkpoint}{RESET}")
1154 continue
1155
1156 # Handle turns command
1157 if user_input.startswith("/t") or user_input.startswith("/turns"):
1158 parts = user_input.split()
1159
1160 # Parse turn number if provided
1161 turn_number = None
1162 if len(parts) >= 2 and parts[1].isdigit():
1163 turn_number = int(parts[1])
1164
1165 success, prefill_input = handle_turns_command(session_manager, turn_number)
1166 if success:
1167 # Session already modified in handle_turns_command, just save
1168 session_manager.save_session()
1169 # Set prefill for next input if provided
1170 if prefill_input:
1171 prefill_next_input = prefill_input
1172 continue
1173
1174 # Handle checkpoint commands
1175 if user_input.startswith("/checkpoint") or user_input.startswith("/c") or user_input == "/ca":
1176 parts = user_input.split()
1177
1178 # /ca is shortcut for /c all
1179 if parts[0] == "/ca":
1180 parts = ["/c", "all"]
1181
1182 # /c without args defaults to list
1183 if len(parts) == 1 and parts[0] in ["/c", "/checkpoint"]:
1184 parts.append("list")
1185
1186 restored_messages = handle_checkpoint_command(parts, session_manager, files_modified)
1187 if restored_messages is not None:
1188 # Restore conversation by replacing session messages
1189 session_manager.current_session.messages = restored_messages
1190 session_manager.save_session()
1191 continue
1192
1193 # Build message content
1194 if images:
1195 # Has images: use content array format
1196 content = []
1197 if user_input:
1198 content.append({"type": "text", "text": user_input})
1199
1200 for img_block in images:
1201 # Remove metadata (API doesn't need it)
1202 img_block_clean = {
1203 "type": "image",
1204 "source": img_block["source"]
1205 }
1206 content.append(img_block_clean)
1207
1208 message = {"role": "user", "content": content}
1209
1210 # Show image info
1211 print(f"{GREEN}📷 Attached {len(images)} image(s){RESET}")
1212 else:
1213 # Text only: keep original format
1214 message = {"role": "user", "content": user_input}
1215
1216 # Add user message to current session
1217 session_manager.current_session.messages.append(message)
1218
1219 # Reset stop flag for new turn
1220 stop_flag = False
1221
1222 # Track files modified in this turn
1223 files_modified_this_turn = set()
1224
1225 while True:
1226 response = call_api(session_manager.current_session.messages, system_prompt)
1227 blocks = process_stream(response)
1228 if stop_flag: break
1229
1230 tool_results = []
1231 for block in blocks:
1232 if block["type"] == "tool_use":
1233 name, args = block["name"], block["input"]
1234
1235 # Save baseline BEFORE executing write/edit
1236 if name in ['write', 'edit']:
1237 filepath = args.get('path')
1238 if filepath and is_file_in_project(filepath, session_manager.project_path):
1239 session_manager.save_baseline_if_needed(filepath)
1240
1241 # Format preview based on tool type
1242 preview = format_tool_preview(name, args)
1243 print(f"\n{GREEN}{name}{RESET}({DIM}{preview}{RESET})")
1244
1245 result = run_tool(name, args)
1246
1247 # Format result based on tool type
1248 result_preview = format_tool_result(name, result)
1249 print(f" {DIM}{result_preview}{RESET}")
1250
1251 # Track file modifications (only project files)
1252 if name in ['write', 'edit']:
1253 filepath = args.get('path')
1254 if filepath and is_file_in_project(filepath, session_manager.project_path):
1255 files_modified.add(filepath)
1256 files_modified_this_turn.add(filepath)
1257 session_manager.current_session.track_file_state(filepath)
1258
1259 tool_results.append({"type": "tool_result", "tool_use_id": block["id"], "content": result})
1260
1261 # Check stop_flag after each tool execution
1262 if stop_flag:
1263 print(f"{YELLOW}⚠ Tool execution stopped{RESET}")
1264 break
1265
1266 session_manager.current_session.messages.append({"role": "assistant", "content": blocks})
1267 if not tool_results or stop_flag: break
1268 session_manager.current_session.messages.append({"role": "user", "content": tool_results})
1269
1270 # Auto checkpoint after AI work (if project files were modified)
1271 checkpoint_id = None
1272 if auto_checkpoint and files_modified_this_turn:
1273 # files_modified_this_turn already filtered to project files only
1274 # Use parent_commit for first checkpoint of new session
1275 parent_commit = session_manager.parent_commit_for_next_checkpoint
1276 checkpoint_id = session_manager.checkpoint_manager.create_checkpoint(
1277 f"Auto: {user_input[:50]}",
1278 list(files_modified_this_turn),
1279 conversation_snapshot=session_manager.current_session.messages.copy(),
1280 parent_commit=parent_commit
1281 )
1282 # Clear parent after first checkpoint
1283 if parent_commit:
1284 session_manager.parent_commit_for_next_checkpoint = None
1285
1286 if checkpoint_id:
1287 # Generate summary using LLM with actual diff
1288 print(f"{DIM}Generating checkpoint summary...{RESET}", end="", flush=True)
1289 summary = summarize_changes(
1290 user_input,
1291 files_modified_this_turn,
1292 session_manager.checkpoint_manager,
1293 checkpoint_id
1294 )
1295 print(f"\r{' ' * 40}\r", end="", flush=True) # Clear the line
1296
1297 # Update commit message with better summary (only if different from temp message)
1298 temp_message = f"Auto: {user_input[:50]}"
1299 if summary != user_input[:50] and summary != temp_message:
1300 session_manager.checkpoint_manager._git_command(
1301 "--git-dir", session_manager.checkpoint_manager.bare_repo,
1302 "commit", "--amend", "-m", summary
1303 )
1304
1305 print(f"\n{YELLOW}📍 {checkpoint_id}: {summary}{RESET}")
1306 else:
1307 # Checkpoint creation failed (e.g., no actual diff)
1308 print(f"\n{DIM}(No project file changes to checkpoint){RESET}")
1309
1310 # Record this turn
1311 session_manager.current_session.add_turn(
1312 user_input,
1313 files_modified_this_turn,
1314 checkpoint_id=checkpoint_id
1315 )
1316
1317 # Auto-save session after each interaction
1318 session_manager.save_session()
1319
1320 print()
1321 except EOFError:
1322 session_manager.save_session()
1323 break
1324 except Exception as e: print(f"{RED}⏺ Error: {e}{RESET}")
1325
1326# ============================================================================
1327# Checkpoint & Session Management (Phase 1+2)
1328# ============================================================================
1329
1330class CheckpointManager:
1331 """Manage checkpoints using shadow bare git repository with session isolation"""
1332
1333 def __init__(self, project_path, session_id=None):
1334 self.project_path = project_path
1335 self.session_id = session_id
1336 self.nanocode_dir = os.path.join(project_path, ".nanocode")
1337 self.bare_repo = os.path.join(self.nanocode_dir, "checkpoint.git")
1338 self._init_bare_repo()
1339
1340 def set_session(self, session_id):
1341 """Set current session for checkpoint operations"""
1342 self.session_id = session_id
1343
1344 def _get_branch_name(self):
1345 """Get git branch name for current session"""
1346 if not self.session_id:
1347 return "main"
1348 return f"session_{self.session_id}"
1349
1350 def _init_bare_repo(self):
1351 """Initialize shadow bare repository"""
1352 if not os.path.exists(self.bare_repo):
1353 os.makedirs(self.bare_repo, exist_ok=True)
1354 try:
1355 subprocess.run(
1356 ["git", "init", "--bare", self.bare_repo],
1357 capture_output=True, check=True
1358 )
1359 except (subprocess.CalledProcessError, FileNotFoundError):
1360 # Git not available, will handle gracefully
1361 pass
1362
1363 def _git_command(self, *args, cwd=None):
1364 """Execute git command"""
1365 try:
1366 result = subprocess.run(
1367 ["git"] + list(args),
1368 cwd=cwd or self.project_path,
1369 capture_output=True,
1370 text=True,
1371 check=True
1372 )
1373 return result.stdout.strip()
1374 except (subprocess.CalledProcessError, FileNotFoundError) as e:
1375 return f"error: {e}"
1376
1377 def save_file_to_blob(self, filepath):
1378 """Save file to git blob storage
1379
1380 Returns:
1381 str: blob hash
1382 """
1383 try:
1384 result = subprocess.run(
1385 ["git", "--git-dir", self.bare_repo, "hash-object", "-w", filepath],
1386 capture_output=True, text=True, check=True
1387 )
1388 return result.stdout.strip()
1389 except Exception as e:
1390 return None
1391
1392 def restore_file_from_blob(self, blob_hash, filepath):
1393 """Restore file from git blob storage"""
1394 try:
1395 content = subprocess.run(
1396 ["git", "--git-dir", self.bare_repo, "cat-file", "-p", blob_hash],
1397 capture_output=True, check=True
1398 ).stdout
1399
1400 os.makedirs(os.path.dirname(filepath) or ".", exist_ok=True)
1401 with open(filepath, 'wb') as f:
1402 f.write(content)
1403 return True
1404 except Exception as e:
1405 return False
1406
1407 def get_file_git_info(self, filepath):
1408 """Get file's git info from project .git
1409
1410 Returns:
1411 dict: {"commit": "abc123", "has_changes": True/False} or None
1412 """
1413 try:
1414 # Check if file is tracked
1415 result = subprocess.run(
1416 ["git", "ls-files", "--error-unmatch", filepath],
1417 cwd=self.project_path,
1418 capture_output=True, text=True
1419 )
1420
1421 if result.returncode != 0:
1422 return None # Not tracked
1423
1424 # Get last commit for this file
1425 commit = subprocess.run(
1426 ["git", "log", "-1", "--format=%H", "--", filepath],
1427 cwd=self.project_path,
1428 capture_output=True, text=True, check=True
1429 ).stdout.strip()
1430
1431 # Check if file has local changes
1432 diff = subprocess.run(
1433 ["git", "diff", "HEAD", "--", filepath],
1434 cwd=self.project_path,
1435 capture_output=True, text=True, check=True
1436 ).stdout.strip()
1437
1438 return {
1439 "commit": commit,
1440 "has_changes": bool(diff)
1441 }
1442 except Exception as e:
1443 return None
1444
1445 def create_checkpoint(self, message, files_changed, conversation_snapshot=None, parent_commit=None):
1446 """Create a checkpoint on current session's branch
1447
1448 Args:
1449 message: Commit message
1450 files_changed: List of modified files
1451 conversation_snapshot: Conversation state to save
1452 parent_commit: Parent commit hash to branch from (for new sessions)
1453 """
1454 print(f"{DIM}[LOG] create_checkpoint: files_changed={files_changed}{RESET}", flush=True)
1455 if not files_changed or not self.session_id:
1456 return None
1457
1458 branch_name = self._get_branch_name()
1459
1460 # Save conversation snapshot
1461 if conversation_snapshot:
1462 snapshot_file = os.path.join(self.nanocode_dir, "conversation_snapshots.json")
1463 snapshots = {}
1464 if os.path.exists(snapshot_file):
1465 with open(snapshot_file, 'r') as f:
1466 snapshots = json.load(f)
1467
1468 # Create temp worktree for this session
1469 temp_worktree = os.path.join(self.nanocode_dir, f"temp_worktree_{self.session_id}")
1470
1471 try:
1472 # Check if branch exists
1473 branch_exists = self._git_command("--git-dir", self.bare_repo, "rev-parse", "--verify", branch_name)
1474
1475 if not branch_exists or branch_exists.startswith("error"):
1476 # Create new branch
1477 os.makedirs(temp_worktree, exist_ok=True)
1478 self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "config", "core.bare", "false")
1479
1480 # If parent_commit specified, branch from it
1481 if parent_commit:
1482 # Create branch from parent commit
1483 self._git_command("--git-dir", self.bare_repo, "branch", branch_name, parent_commit)
1484 self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "checkout", branch_name, "-f")
1485 else:
1486 # Create orphan branch (no parent)
1487 self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "checkout", "--orphan", branch_name)
1488
1489 # Copy files to temp worktree
1490 for filepath in files_changed:
1491 print(f"{DIM}[LOG] checkpoint copying: {filepath}{RESET}", flush=True)
1492 if os.path.exists(filepath):
1493 file_size = os.path.getsize(filepath)
1494 print(f"{DIM}[LOG] source file exists: {filepath} ({file_size} bytes){RESET}", flush=True)
1495 # Convert absolute path to relative path
1496 if os.path.isabs(filepath):
1497 rel_filepath = os.path.relpath(filepath, self.project_path)
1498 else:
1499 rel_filepath = filepath
1500 dest = os.path.join(temp_worktree, rel_filepath)
1501 os.makedirs(os.path.dirname(dest), exist_ok=True)
1502 with open(filepath, 'rb') as src, open(dest, 'wb') as dst:
1503 content = src.read()
1504 dst.write(content)
1505 print(f"{DIM}[LOG] copied to temp_worktree: {dest} ({len(content)} bytes){RESET}", flush=True)
1506 else:
1507 print(f"{DIM}[LOG] source file NOT exists: {filepath}{RESET}", flush=True)
1508
1509 # Commit
1510 self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "add", "-A")
1511 self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree,
1512 "commit", "-m", message, "--allow-empty")
1513
1514 commit_hash = self._git_command("--git-dir", self.bare_repo, "rev-parse", "HEAD")
1515 checkpoint_id = commit_hash[:8] if commit_hash and not commit_hash.startswith("error") else None
1516
1517 # Save conversation snapshot with checkpoint_id
1518 if checkpoint_id and conversation_snapshot:
1519 snapshots[checkpoint_id] = conversation_snapshot
1520 with open(snapshot_file, 'w') as f:
1521 json.dump(snapshots, f, indent=2)
1522
1523 return checkpoint_id
1524 else:
1525 # Branch exists, checkout and commit
1526 self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "checkout", branch_name, "-f")
1527
1528 # Update temp worktree
1529 for filepath in files_changed:
1530 print(f"{DIM}[LOG] checkpoint updating: {filepath}{RESET}", flush=True)
1531 if os.path.exists(filepath):
1532 file_size = os.path.getsize(filepath)
1533 print(f"{DIM}[LOG] source file exists: {filepath} ({file_size} bytes){RESET}", flush=True)
1534 # Convert absolute path to relative path
1535 if os.path.isabs(filepath):
1536 rel_filepath = os.path.relpath(filepath, self.project_path)
1537 else:
1538 rel_filepath = filepath
1539 dest = os.path.join(temp_worktree, rel_filepath)
1540 os.makedirs(os.path.dirname(dest), exist_ok=True)
1541 with open(filepath, 'rb') as src, open(dest, 'wb') as dst:
1542 content = src.read()
1543 dst.write(content)
1544 print(f"{DIM}[LOG] copied to temp_worktree: {dest} ({len(content)} bytes){RESET}", flush=True)
1545 else:
1546 print(f"{DIM}[LOG] source file NOT exists: {filepath}{RESET}", flush=True)
1547
1548 self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "add", "-A")
1549 self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree,
1550 "commit", "-m", message, "--allow-empty")
1551
1552 commit_hash = self._git_command("--git-dir", self.bare_repo, "rev-parse", "HEAD")
1553 checkpoint_id = commit_hash[:8] if commit_hash and not commit_hash.startswith("error") else None
1554
1555 # Save conversation snapshot with checkpoint_id
1556 if checkpoint_id and conversation_snapshot:
1557 snapshots[checkpoint_id] = conversation_snapshot
1558 with open(snapshot_file, 'w') as f:
1559 json.dump(snapshots, f, indent=2)
1560
1561 return checkpoint_id
1562 except Exception as e:
1563 return None
1564
1565 def list_checkpoints(self, limit=10, show_all=False):
1566 """List recent checkpoints for current session
1567
1568 Args:
1569 limit: Maximum number of checkpoints to show
1570 show_all: If True, show all sessions; if False, only show current session
1571 """
1572 if not self.session_id and not show_all:
1573 return []
1574
1575 try:
1576 if show_all:
1577 # Show all branches
1578 args = ["--git-dir", self.bare_repo, "log", f"--max-count={limit}", "--oneline", "--all"]
1579 else:
1580 # Show only current session's branch
1581 branch_name = self._get_branch_name()
1582 args = ["--git-dir", self.bare_repo, "log", f"--max-count={limit}", "--oneline", branch_name]
1583
1584 log = self._git_command(*args)
1585 if log and not log.startswith("error"):
1586 return [line.split(" ", 1) for line in log.split("\n") if line]
1587 return []
1588 except:
1589 return []
1590
1591 def restore_checkpoint(self, checkpoint_id, session_baseline_files):
1592 """Restore files to checkpoint state and reset current session's branch
1593
1594 This properly handles files that were added after the checkpoint by:
1595 1. Restoring files that exist in checkpoint
1596 2. Restoring files to baseline if they don't exist in checkpoint
1597
1598 Args:
1599 checkpoint_id: Checkpoint hash to restore to
1600 session_baseline_files: Dict of baseline file states from session
1601
1602 Returns:
1603 tuple: (success: bool, conversation_snapshot: dict or None)
1604 """
1605 if not self.session_id:
1606 return False, None
1607
1608 branch_name = self._get_branch_name()
1609 temp_worktree = os.path.join(self.nanocode_dir, f"temp_worktree_{self.session_id}")
1610
1611 try:
1612 # Checkout branch first
1613 self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "checkout", branch_name, "-f")
1614
1615 # Reset branch to checkpoint (discards future commits on this branch)
1616 self._git_command("--git-dir", self.bare_repo, "reset", "--hard", checkpoint_id)
1617
1618 # Checkout to temp worktree
1619 self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree,
1620 "checkout", checkpoint_id, "-f")
1621
1622 # Get list of files in checkpoint
1623 files_in_checkpoint_str = self._git_command(
1624 "--git-dir", self.bare_repo,
1625 "ls-tree", "-r", "--name-only", checkpoint_id
1626 )
1627
1628 files_in_checkpoint = set()
1629 if files_in_checkpoint_str and not files_in_checkpoint_str.startswith("error"):
1630 files_in_checkpoint = set(f for f in files_in_checkpoint_str.split('\n') if f.strip())
1631
1632 print(f"\n{DIM}Files in checkpoint: {len(files_in_checkpoint)}{RESET}")
1633 print(f"{DIM}Files modified in session: {len(session_baseline_files)}{RESET}\n")
1634
1635 # Process each file that was modified in this session
1636 for filepath, baseline_source in session_baseline_files.items():
1637 # Convert to relative path for comparison
1638 if os.path.isabs(filepath):
1639 rel_filepath = os.path.relpath(filepath, self.project_path)
1640 else:
1641 rel_filepath = filepath
1642
1643 # Normalize path: remove leading ./
1644 normalized_rel_path = rel_filepath.lstrip('./')
1645
1646 if normalized_rel_path in files_in_checkpoint:
1647 # File exists in checkpoint - restore from checkpoint
1648 src = os.path.join(temp_worktree, normalized_rel_path)
1649 dest = os.path.join(self.project_path, normalized_rel_path)
1650
1651 dest_dir = os.path.dirname(dest)
1652 if dest_dir:
1653 os.makedirs(dest_dir, exist_ok=True)
1654
1655 with open(src, 'rb') as s, open(dest, 'wb') as d:
1656 d.write(s.read())
1657
1658 print(f" {GREEN}{RESET} {filepath} {DIM}(from checkpoint){RESET}")
1659 else:
1660 # File doesn't exist in checkpoint - restore to baseline
1661 abs_filepath = filepath if os.path.isabs(filepath) else os.path.join(self.project_path, filepath)
1662
1663 if baseline_source["type"] == "git":
1664 # Restore from project .git (use normalized path for git)
1665 result = subprocess.run(
1666 ["git", "checkout", baseline_source["commit"], "--", normalized_rel_path],
1667 cwd=self.project_path,
1668 capture_output=True, text=True
1669 )
1670 if result.returncode == 0:
1671 print(f" {CYAN}{RESET} {filepath} {DIM}(to baseline: git {baseline_source['commit'][:8]}){RESET}")
1672
1673 elif baseline_source["type"] == "blob":
1674 # Restore from blob
1675 if self.restore_file_from_blob(baseline_source["hash"], abs_filepath):
1676 print(f" {CYAN}{RESET} {filepath} {DIM}(to baseline: blob {baseline_source['hash'][:8]}){RESET}")
1677
1678 elif baseline_source["type"] == "new":
1679 # Delete new file
1680 if os.path.exists(abs_filepath):
1681 os.remove(abs_filepath)
1682 print(f" {YELLOW}{RESET} {filepath} {DIM}(deleted: was added after checkpoint){RESET}")
1683
1684 # Load conversation snapshot
1685 snapshot_file = os.path.join(self.nanocode_dir, "conversation_snapshots.json")
1686 conversation_snapshot = None
1687 if os.path.exists(snapshot_file):
1688 with open(snapshot_file, 'r') as f:
1689 snapshots = json.load(f)
1690 conversation_snapshot = snapshots.get(checkpoint_id)
1691
1692 return True, conversation_snapshot
1693 except Exception as e:
1694 print(f"{RED}Error during restore: {e}{RESET}")
1695 return False, None
1696
1697
1698class Session:
1699 """Represents a conversation session"""
1700
1701 def __init__(self, session_id=None):
1702 self.session_id = session_id or self._generate_session_id()
1703 self.messages = []
1704 self.file_states = {}
1705 self.baseline_files = {} # Track original file versions for rollback
1706 self.turns = [] # Track conversation turns
1707 self.metadata = {
1708 'created_at': time.time(),
1709 'last_active': time.time(),
1710 'description': '',
1711 'cwd': os.getcwd(),
1712 'parent_checkpoint': None, # Track where this session branched from
1713 'parent_session': None, # Track which session it branched from
1714 'git_commit': None, # Project .git commit hash when session started
1715 'git_branch': None, # Project .git branch when session started
1716 'git_dirty': False, # Whether project had uncommitted changes
1717 }
1718
1719 def _generate_session_id(self):
1720 """Generate unique session ID"""
1721 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
1722 random_suffix = ''.join(random.choices('0123456789abcdef', k=4))
1723 return f"{timestamp}_{random_suffix}"
1724
1725 def _get_project_git_info(self):
1726 """Get current project git state"""
1727 try:
1728 cwd = self.metadata.get('cwd', os.getcwd())
1729
1730 # Get current commit
1731 commit = subprocess.run(
1732 ["git", "rev-parse", "HEAD"],
1733 cwd=cwd,
1734 capture_output=True, text=True, check=True
1735 ).stdout.strip()
1736
1737 # Get current branch
1738 branch = subprocess.run(
1739 ["git", "rev-parse", "--abbrev-ref", "HEAD"],
1740 cwd=cwd,
1741 capture_output=True, text=True, check=True
1742 ).stdout.strip()
1743
1744 # Check if dirty
1745 status = subprocess.run(
1746 ["git", "status", "--porcelain"],
1747 cwd=cwd,
1748 capture_output=True, text=True, check=True
1749 ).stdout.strip()
1750
1751 return {
1752 'git_commit': commit[:8],
1753 'git_branch': branch,
1754 'git_dirty': bool(status)
1755 }
1756 except:
1757 return None
1758
1759 def capture_git_state(self):
1760 """Capture current project git state into metadata"""
1761 git_info = self._get_project_git_info()
1762 if git_info:
1763 self.metadata.update(git_info)
1764
1765 def track_file_state(self, filepath):
1766 """Track file state for conflict detection"""
1767 if os.path.exists(filepath):
1768 with open(filepath, 'rb') as f:
1769 content = f.read()
1770 file_hash = hashlib.md5(content).hexdigest()
1771 self.file_states[filepath] = {
1772 'hash': file_hash,
1773 'mtime': os.path.getmtime(filepath),
1774 'size': len(content)
1775 }
1776
1777 def detect_conflicts(self):
1778 """Detect if tracked files have been modified outside this session
1779
1780 Returns:
1781 list: List of conflicted file paths
1782 """
1783 conflicts = []
1784
1785 for filepath, saved_state in self.file_states.items():
1786 if os.path.exists(filepath):
1787 with open(filepath, 'rb') as f:
1788 content = f.read()
1789 current_hash = hashlib.md5(content).hexdigest()
1790 current_mtime = os.path.getmtime(filepath)
1791
1792 # Check if file has changed
1793 if (current_hash != saved_state['hash'] or
1794 current_mtime != saved_state['mtime']):
1795 conflicts.append(filepath)
1796 else:
1797 # File was deleted
1798 conflicts.append(f"{filepath} (deleted)")
1799
1800 return conflicts
1801
1802 def update_file_states(self):
1803 """Update all tracked file states to current state"""
1804 for filepath in list(self.file_states.keys()):
1805 if os.path.exists(filepath):
1806 self.track_file_state(filepath)
1807 else:
1808 # Remove deleted files from tracking
1809 del self.file_states[filepath]
1810
1811 def add_turn(self, user_input, files_modified, checkpoint_id=None):
1812 """Record a conversation turn
1813
1814 Args:
1815 user_input: User's input for this turn
1816 files_modified: Set of files modified in this turn
1817 checkpoint_id: Associated checkpoint ID if files were modified
1818 """
1819 turn_number = len(self.turns) + 1
1820 turn_data = {
1821 'turn_number': turn_number,
1822 'timestamp': time.time(),
1823 'user_input': user_input[:100], # Truncate for preview
1824 'files_modified': list(files_modified),
1825 'checkpoint_id': checkpoint_id,
1826 'message_count': len(self.messages) # Track message index
1827 }
1828 self.turns.append(turn_data)
1829 return turn_number
1830
1831 def to_dict(self):
1832 """Serialize to dict"""
1833 return {
1834 'session_id': self.session_id,
1835 'messages': self.messages,
1836 'file_states': self.file_states,
1837 'baseline_files': self.baseline_files,
1838 'turns': self.turns,
1839 'metadata': self.metadata
1840 }
1841
1842 @staticmethod
1843 def from_dict(data):
1844 """Deserialize from dict"""
1845 session = Session(session_id=data['session_id'])
1846 session.messages = data.get('messages', [])
1847 session.file_states = data.get('file_states', {})
1848 session.baseline_files = data.get('baseline_files', {})
1849 session.turns = data.get('turns', [])
1850 session.metadata = data.get('metadata', {})
1851 return session
1852
1853
1854class SessionManager:
1855 """Manage multiple sessions"""
1856
1857 def __init__(self, project_path):
1858 self.project_path = project_path
1859 self.sessions_dir = os.path.join(project_path, ".nanocode", "sessions")
1860 self.current_session = None
1861 self.checkpoint_manager = CheckpointManager(project_path)
1862 self.parent_commit_for_next_checkpoint = None # Track parent for first checkpoint
1863 os.makedirs(self.sessions_dir, exist_ok=True)
1864
1865 def save_baseline_if_needed(self, filepath):
1866 """Save file's baseline version before first modification
1867
1868 This is called before write/edit operations to preserve the original state.
1869 """
1870 if not self.current_session:
1871 return
1872
1873 # Already saved
1874 if filepath in self.current_session.baseline_files:
1875 return
1876
1877 print(f"{DIM}[LOG] Saving baseline for: {filepath}{RESET}", flush=True)
1878
1879 # Get file info from project .git
1880 git_info = self.checkpoint_manager.get_file_git_info(filepath)
1881
1882 if git_info:
1883 # File is tracked by project .git
1884 if git_info["has_changes"]:
1885 # Has local changes - save to blob
1886 blob_hash = self.checkpoint_manager.save_file_to_blob(filepath)
1887 if blob_hash:
1888 self.current_session.baseline_files[filepath] = {
1889 "type": "blob",
1890 "hash": blob_hash
1891 }
1892 print(f"{DIM}[LOG] Saved dirty file to blob: {blob_hash[:8]}{RESET}", flush=True)
1893 else:
1894 # Clean - just record commit
1895 self.current_session.baseline_files[filepath] = {
1896 "type": "git",
1897 "commit": git_info["commit"]
1898 }
1899 print(f"{DIM}[LOG] Recorded git commit: {git_info['commit'][:8]}{RESET}", flush=True)
1900 else:
1901 # Untracked file or no .git
1902 if os.path.exists(filepath):
1903 # Save existing untracked file to blob
1904 blob_hash = self.checkpoint_manager.save_file_to_blob(filepath)
1905 if blob_hash:
1906 self.current_session.baseline_files[filepath] = {
1907 "type": "blob",
1908 "hash": blob_hash
1909 }
1910 print(f"{DIM}[LOG] Saved untracked file to blob: {blob_hash[:8]}{RESET}", flush=True)
1911 else:
1912 # New file - mark as new
1913 self.current_session.baseline_files[filepath] = {
1914 "type": "new"
1915 }
1916 print(f"{DIM}[LOG] Marked as new file{RESET}", flush=True)
1917
1918 # Auto-save session to persist baseline_files
1919 self.save_session()
1920
1921 def restore_baseline(self):
1922 """Restore all files to their baseline state
1923
1924 Returns:
1925 bool: Success status
1926 """
1927 if not self.current_session:
1928 return False
1929
1930 if not self.current_session.baseline_files:
1931 # No files modified = already at baseline state = success
1932 print(f"{DIM}No files to restore (already at baseline){RESET}")
1933 return True
1934
1935 print(f"\n{BOLD}Restoring {len(self.current_session.baseline_files)} files to baseline...{RESET}\n")
1936
1937 success_count = 0
1938 for filepath, source in self.current_session.baseline_files.items():
1939 try:
1940 # Normalize to absolute path
1941 abs_filepath = filepath if os.path.isabs(filepath) else os.path.join(self.project_path, filepath)
1942 # Get relative path for git operations
1943 rel_filepath = os.path.relpath(abs_filepath, self.project_path)
1944
1945 if source["type"] == "git":
1946 # Restore from project .git (use relative path)
1947 result = subprocess.run(
1948 ["git", "checkout", source["commit"], "--", rel_filepath],
1949 cwd=self.project_path,
1950 capture_output=True, text=True
1951 )
1952 if result.returncode == 0:
1953 print(f" {GREEN}{RESET} {filepath} {DIM}(from git {source['commit'][:8]}){RESET}")
1954 success_count += 1
1955 else:
1956 print(f" {RED}{RESET} {filepath} {DIM}(git checkout failed){RESET}")
1957
1958 elif source["type"] == "blob":
1959 # Restore from checkpoint.git blob (use absolute path)
1960 if self.checkpoint_manager.restore_file_from_blob(source["hash"], abs_filepath):
1961 print(f" {GREEN}{RESET} {filepath} {DIM}(from blob {source['hash'][:8]}){RESET}")
1962 success_count += 1
1963 else:
1964 print(f" {RED}{RESET} {filepath} {DIM}(blob restore failed){RESET}")
1965
1966 elif source["type"] == "new":
1967 # Delete new file (use absolute path)
1968 if os.path.exists(abs_filepath):
1969 os.remove(abs_filepath)
1970 print(f" {GREEN}{RESET} {filepath} {DIM}(deleted new file){RESET}")
1971 success_count += 1
1972 else:
1973 print(f" {DIM}{RESET} {filepath} {DIM}(already deleted){RESET}")
1974 success_count += 1
1975
1976 except Exception as e:
1977 print(f" {RED}{RESET} {filepath} {DIM}(error: {e}){RESET}")
1978
1979 print(f"\n{GREEN}✓ Restored {success_count}/{len(self.current_session.baseline_files)} files{RESET}")
1980 return success_count > 0
1981
1982 def create_session(self, description="", parent_checkpoint=None, parent_session=None):
1983 """Create new session
1984
1985 Args:
1986 description: Session description
1987 parent_checkpoint: Checkpoint ID this session branches from
1988 parent_session: Session ID this session branches from
1989 """
1990 session = Session()
1991 session.metadata['description'] = description
1992 session.metadata['parent_checkpoint'] = parent_checkpoint
1993 session.metadata['parent_session'] = parent_session
1994 # Capture project git state
1995 session.capture_git_state()
1996 self.current_session = session
1997 # Set checkpoint manager to use this session
1998 self.checkpoint_manager.set_session(session.session_id)
1999 # Store parent commit for first checkpoint
2000 self.parent_commit_for_next_checkpoint = parent_checkpoint
2001 self.save_session()
2002 return session
2003
2004 def save_session(self):
2005 """Save current session to disk"""
2006 if not self.current_session:
2007 return
2008
2009 self.current_session.metadata['last_active'] = time.time()
2010 session_file = os.path.join(
2011 self.sessions_dir,
2012 f"{self.current_session.session_id}.json"
2013 )
2014
2015 with open(session_file, 'w') as f:
2016 json.dump(self.current_session.to_dict(), f, indent=2)
2017
2018 def load_session(self, session_id):
2019 """Load session from disk"""
2020 session_file = os.path.join(self.sessions_dir, f"{session_id}.json")
2021
2022 if not os.path.exists(session_file):
2023 return None
2024
2025 with open(session_file, 'r') as f:
2026 data = json.load(f)
2027
2028 session = Session.from_dict(data)
2029 self.current_session = session
2030 # Set checkpoint manager to use this session
2031 self.checkpoint_manager.set_session(session.session_id)
2032 return session
2033
2034 def list_sessions(self):
2035 """List all sessions"""
2036 sessions = []
2037
2038 if not os.path.exists(self.sessions_dir):
2039 return sessions
2040
2041 for filename in os.listdir(self.sessions_dir):
2042 if filename.endswith('.json'):
2043 filepath = os.path.join(self.sessions_dir, filename)
2044 try:
2045 with open(filepath, 'r') as f:
2046 data = json.load(f)
2047 sessions.append({
2048 'session_id': data['session_id'],
2049 'metadata': data['metadata'],
2050 'message_count': len(data.get('messages', [])),
2051 })
2052 except:
2053 pass
2054
2055 return sorted(sessions, key=lambda x: x['metadata'].get('last_active', 0), reverse=True)
2056
2057 def load_last_session(self):
2058 """Load the most recent session"""
2059 sessions = self.list_sessions()
2060 if sessions:
2061 return self.load_session(sessions[0]['session_id'])
2062 return None
2063
2064
2065
2066
2067def handle_turns_command(session_manager, turn_number=None):
2068 """Handle /t or /turns command to display or restore conversation turns
2069
2070 Args:
2071 session_manager: SessionManager instance
2072 turn_number: If provided, restore to this turn; otherwise list turns
2073
2074 Returns:
2075 tuple: (success: bool, prefill_input: str or None)
2076 """
2077 if not session_manager.current_session:
2078 print(f"{YELLOW}⚠ No active session{RESET}")
2079 return False, None
2080
2081 turns = session_manager.current_session.turns
2082
2083 if not turns:
2084 print(f"{DIM}No conversation turns yet{RESET}")
2085 return False, None
2086
2087 # If turn_number provided, restore to that turn
2088 if turn_number is not None:
2089 return restore_to_turn(session_manager, turn_number)
2090
2091 # Otherwise, list all turns
2092 print(f"\n{BOLD}💬 Conversation Turns:{RESET}\n")
2093
2094 for turn in turns:
2095 turn_num = turn['turn_number']
2096 timestamp = turn['timestamp']
2097 user_input = turn['user_input']
2098 files_modified = turn.get('files_modified', [])
2099 checkpoint_id = turn.get('checkpoint_id')
2100
2101 # Calculate time ago
2102 time_ago = time.time() - timestamp
2103 if time_ago < 60:
2104 time_str = f"{int(time_ago)}s ago"
2105 elif time_ago < 3600:
2106 time_str = f"{int(time_ago/60)}m ago"
2107 elif time_ago < 86400:
2108 time_str = f"{int(time_ago/3600)}h ago"
2109 else:
2110 time_str = f"{int(time_ago/86400)}d ago"
2111
2112 # Format turn line
2113 checkpoint_marker = f" {YELLOW}[{checkpoint_id}]{RESET}" if checkpoint_id else ""
2114 files_marker = f" {GREEN}{RESET}" if files_modified else ""
2115
2116 print(f" {CYAN}turn_{turn_num}{RESET}{checkpoint_marker}{files_marker} {DIM}({time_str}){RESET}")
2117 print(f" {DIM}└─{RESET} {user_input}")
2118
2119 if files_modified:
2120 files_display = ", ".join(files_modified[:3])
2121 if len(files_modified) > 3:
2122 files_display += f" +{len(files_modified)-3} more"
2123 print(f" {DIM}Files: {files_display}{RESET}")
2124
2125 print()
2126
2127 print(f"{DIM}Tip: Use '/t <number>' to undo that turn and restart from previous{RESET}")
2128 return False, None # Return False to indicate no restore action
2129
2130
2131def restore_to_turn(session_manager, turn_number):
2132 """Restore files and conversation to a specific turn
2133
2134 Args:
2135 session_manager: SessionManager instance
2136 turn_number: Turn number to undo (1-indexed, will restore to turn_number-1)
2137
2138 Returns:
2139 tuple: (success: bool, prefill_input: str or None)
2140 """
2141 turns = session_manager.current_session.turns
2142
2143 # User wants to undo turn_N, so restore to turn_(N-1)
2144 # Special case: /t 1 means clear all turns
2145 if turn_number == 1:
2146 print(f"{YELLOW}⚠ This will undo turn_1 and reset to session start{RESET}")
2147 confirm = input(f"\n{BOLD}Continue? (y/N): {RESET}").strip().lower()
2148 if confirm != 'y':
2149 print(f"{DIM}Cancelled{RESET}")
2150 return False, None
2151
2152 # Restore to baseline
2153 success = session_manager.restore_baseline()
2154 if not success:
2155 return False, None
2156
2157 # Clear all turns and messages
2158 session_manager.current_session.turns = []
2159 session_manager.current_session.messages = []
2160 session_manager.current_session.update_file_states()
2161 session_manager.parent_commit_for_next_checkpoint = None
2162
2163 print(f"{GREEN}✓ Reset to session start{RESET}")
2164
2165 # Return original turn_1 input for prefill
2166 original_input = turns[0]['user_input'] if turns else None
2167 return True, original_input
2168
2169 # Normal case: restore to turn_(N-1)
2170 restore_to = turn_number - 1
2171
2172 # Validate
2173 if restore_to < 1 or restore_to > len(turns):
2174 print(f"{RED}Invalid turn number. Valid range: 1-{len(turns)}{RESET}")
2175 return False, None
2176
2177 turn = turns[restore_to - 1]
2178
2179 # Find the most recent checkpoint at or before restore_to
2180 checkpoint_id = None
2181 checkpoint_turn = None
2182
2183 for i in range(restore_to - 1, -1, -1):
2184 if turns[i].get('checkpoint_id'):
2185 checkpoint_id = turns[i]['checkpoint_id']
2186 checkpoint_turn = i + 1
2187 break
2188
2189 # Get original input from the turn being undone
2190 # Note: turn['user_input'] is truncated to 100 chars, so get full input from messages
2191 original_input = None
2192 if turn_number <= len(turns):
2193 turn_to_undo = turns[turn_number - 1]
2194 # Find the user message for this turn
2195 # turn_to_undo['message_count'] is the total messages after this turn
2196 # The user message for this turn is at index (message_count - 2) if there's an assistant response
2197 # or (message_count - 1) if it's the last message
2198 msg_idx = turn_to_undo['message_count'] - 2 # Assume there's an assistant response
2199 if msg_idx >= 0 and msg_idx < len(session_manager.current_session.messages):
2200 msg = session_manager.current_session.messages[msg_idx]
2201 if msg['role'] == 'user' and isinstance(msg['content'], str):
2202 original_input = msg['content']
2203
2204 # Show warning
2205 future_turns = len(turns) - restore_to
2206 print(f"\n{YELLOW}⚠ This will undo turn_{turn_number} (restore to turn_{restore_to}){RESET}")
2207
2208 if checkpoint_id:
2209 if checkpoint_turn == restore_to:
2210 print(f"{YELLOW}⚠ Files: restored to turn_{restore_to} checkpoint ({checkpoint_id}){RESET}")
2211 else:
2212 print(f"{YELLOW}⚠ Files: restored to turn_{checkpoint_turn} checkpoint ({checkpoint_id}){RESET}")
2213 else:
2214 print(f"{YELLOW}⚠ Files: restored to baseline (no checkpoints before turn_{restore_to}){RESET}")
2215
2216 print(f"{YELLOW}⚠ Conversation: restored to turn_{restore_to} ({turn['message_count']} messages){RESET}")
2217
2218 if future_turns > 0:
2219 print(f"{YELLOW}⚠ Future turns ({restore_to + 1}-{len(turns)}) will be discarded{RESET}")
2220
2221 if original_input:
2222 print(f"\n{CYAN}Original turn_{turn_number} input will be prefilled:{RESET}")
2223 print(f" {DIM}{original_input[:100]}{'...' if len(original_input) > 100 else ''}{RESET}")
2224
2225 confirm = input(f"\n{BOLD}Continue? (y/N): {RESET}").strip().lower()
2226
2227 if confirm != 'y':
2228 print(f"{DIM}Cancelled{RESET}")
2229 return False, None
2230
2231 # Restore files
2232 if checkpoint_id:
2233 success, _ = session_manager.checkpoint_manager.restore_checkpoint(
2234 checkpoint_id,
2235 session_manager.current_session.baseline_files
2236 )
2237 if not success:
2238 print(f"{RED}✗ Failed to restore files{RESET}")
2239 return False, None
2240 print(f"{GREEN}✓ Restored files to checkpoint {checkpoint_id}{RESET}")
2241 else:
2242 # No checkpoint before this turn, restore to baseline
2243 success = session_manager.restore_baseline()
2244 if not success:
2245 print(f"{RED}✗ Failed to restore to baseline{RESET}")
2246 return False, None
2247 print(f"{GREEN}✓ Restored files to baseline{RESET}")
2248
2249 # Restore conversation by truncating messages (modify session directly)
2250 session_manager.current_session.messages = session_manager.current_session.messages[:turn['message_count']]
2251
2252 # Truncate turns list to restore_to
2253 session_manager.current_session.turns = turns[:restore_to]
2254
2255 # Update file states to match restored files
2256 session_manager.current_session.update_file_states()
2257
2258 # Reset parent commit for next checkpoint
2259 session_manager.parent_commit_for_next_checkpoint = checkpoint_id if checkpoint_id else None
2260
2261 print(f"{GREEN}✓ Restored to turn_{restore_to} ({len(session_manager.current_session.messages)} messages){RESET}")
2262
2263 return True, original_input # Return success and prefill input
2264
2265
2266def handle_checkpoint_command(parts, session_manager, files_modified):
2267 """Handle /checkpoint or /c commands
2268
2269 Returns:
2270 messages: New messages list if conversation was restored, None otherwise
2271 """
2272 # Default to list if no subcommand
2273 if len(parts) < 2:
2274 parts.append("list")
2275
2276 cmd = parts[1]
2277
2278 # If cmd looks like a commit hash (7-8 hex chars), treat as restore
2279 if len(cmd) >= 7 and len(cmd) <= 8 and all(c in '0123456789abcdef' for c in cmd.lower()):
2280 cmd = "restore"
2281 checkpoint_id = parts[1]
2282 else:
2283 checkpoint_id = None
2284
2285 if cmd == "baseline" or cmd == "base":
2286 # Restore all files to baseline (session start state)
2287 if not session_manager.current_session.baseline_files:
2288 print(f"{YELLOW}⚠ No baseline: no files have been modified in this session{RESET}")
2289 return None
2290
2291 print(f"{YELLOW}⚠ This will restore all modified files to their original state{RESET}")
2292 print(f"{YELLOW}⚠ Files to restore: {len(session_manager.current_session.baseline_files)}{RESET}")
2293
2294 # Show files
2295 print(f"\n{DIM}Files:{RESET}")
2296 for filepath in list(session_manager.current_session.baseline_files.keys())[:10]:
2297 print(f" {DIM}{filepath}{RESET}")
2298 if len(session_manager.current_session.baseline_files) > 10:
2299 print(f" {DIM}... and {len(session_manager.current_session.baseline_files) - 10} more{RESET}")
2300
2301 confirm = input(f"\n{BOLD}Continue? (y/N): {RESET}").strip().lower()
2302
2303 if confirm != 'y':
2304 print(f"{DIM}Cancelled{RESET}")
2305 return None
2306
2307 success = session_manager.restore_baseline()
2308 if success:
2309 # Clear conversation
2310 print(f"{GREEN}✓ Conversation cleared{RESET}")
2311 return []
2312 else:
2313 return None
2314
2315 elif cmd == "list" or cmd == "all" or cmd == "--all":
2316 show_all = (cmd == "all" or "--all" in parts)
2317
2318 if show_all:
2319 # Show git graph of all branches
2320 print(f"\n{BOLD}📍 Checkpoint Graph:{RESET}\n")
2321
2322 # Use git log --graph --all to show the tree
2323 # Format: %h = short hash, %d = ref names, %s = subject, %ar = relative date
2324 graph_output = session_manager.checkpoint_manager._git_command(
2325 "--git-dir", session_manager.checkpoint_manager.bare_repo,
2326 "log", "--graph", "--all", "--oneline",
2327 "--format=%h %s (%ar)", "-20"
2328 )
2329
2330 if graph_output and not graph_output.startswith("error"):
2331 # Also get branch info for each commit
2332 branches_output = session_manager.checkpoint_manager._git_command(
2333 "--git-dir", session_manager.checkpoint_manager.bare_repo,
2334 "branch", "-a", "--contains"
2335 )
2336
2337 # Parse and display
2338 for line in graph_output.split('\n'):
2339 if not line.strip():
2340 continue
2341
2342 # Extract commit hash
2343 match = re.search(r'\b([0-9a-f]{7,8})\b', line)
2344 if match:
2345 commit_hash = match.group(1)
2346
2347 # Get branches containing this commit
2348 branch_info = session_manager.checkpoint_manager._git_command(
2349 "--git-dir", session_manager.checkpoint_manager.bare_repo,
2350 "branch", "-a", "--contains", commit_hash
2351 )
2352
2353 # Extract session names from branches
2354 session_names = []
2355 if branch_info and not branch_info.startswith("error"):
2356 for branch_line in branch_info.split('\n'):
2357 branch_line = branch_line.strip().lstrip('* ')
2358 if branch_line.startswith('session_'):
2359 # Shorten session name: session_20260130_103323_f7 -> s:20260130_103323_f7
2360 session_short = 's:' + branch_line[8:] # Remove 'session_' prefix
2361 session_names.append(session_short)
2362
2363 # Highlight commit hash
2364 line = line.replace(commit_hash, f"{CYAN}{commit_hash}{RESET}")
2365
2366 # Add session info if found
2367 if session_names:
2368 # Insert session names after commit hash
2369 session_str = f"{GREEN}[{', '.join(session_names[:2])}]{RESET}"
2370 line = line.replace(commit_hash + f"{RESET}", commit_hash + f"{RESET} {session_str}")
2371
2372
2373 print(f" {line}")
2374 print()
2375 else:
2376 print(f"{DIM}No checkpoints yet{RESET}\n")
2377
2378 print(f"{DIM}Restore: /c <hash>{RESET}")
2379 return None
2380 else:
2381 # Show current session's checkpoints
2382 checkpoints = session_manager.checkpoint_manager.list_checkpoints(show_all=False)
2383 if not checkpoints:
2384 print(f"{DIM}No checkpoints yet{RESET}")
2385 return None
2386
2387 print(f"\n{BOLD}📍 Checkpoints:{RESET}\n")
2388
2389 # Get checkpoint details from git log with timestamp
2390 for commit_hash, message in checkpoints[:10]: # Show first 10 (already newest first from git log)
2391 # Try to get timestamp from git
2392 timestamp_str = session_manager.checkpoint_manager._git_command(
2393 "--git-dir", session_manager.checkpoint_manager.bare_repo,
2394 "log", "-1", "--format=%ar", commit_hash
2395 )
2396 if timestamp_str.startswith("error"):
2397 timestamp_str = ""
2398
2399 # Get modified files
2400 files_str = session_manager.checkpoint_manager._git_command(
2401 "--git-dir", session_manager.checkpoint_manager.bare_repo,
2402 "diff-tree", "--no-commit-id", "--name-only", "-r", commit_hash
2403 )
2404 files = []
2405 if files_str and not files_str.startswith("error"):
2406 files = [f.strip() for f in files_str.split('\n') if f.strip()]
2407
2408 # Format: hash | time ago | message
2409 time_part = f"{DIM}{timestamp_str}{RESET}" if timestamp_str else ""
2410 print(f" {CYAN}{commit_hash}{RESET} {time_part}")
2411 print(f" {DIM}└─{RESET} {message}")
2412 if files:
2413 files_display = ", ".join(files[:3])
2414 if len(files) > 3:
2415 files_display += f" +{len(files)-3} more"
2416 print(f" {DIM} Files: {files_display}{RESET}")
2417 print()
2418
2419 print(f"{DIM}Tip: Use '/c all' or '/ca' to see git graph{RESET}")
2420 print(f"{DIM}Restore: /c <hash>{RESET}")
2421 return None
2422
2423 elif cmd == "restore":
2424 if not checkpoint_id:
2425 print(f"{RED}Usage: /c <checkpoint_id>{RESET}")
2426 return None
2427
2428 print(f"{YELLOW}⚠ This will restore files AND conversation to checkpoint {checkpoint_id}{RESET}")
2429 print(f"{YELLOW}⚠ Future checkpoints will be discarded from history{RESET}")
2430 confirm = input(f"{BOLD}Continue? (y/N): {RESET}").strip().lower()
2431
2432 if confirm != 'y':
2433 print(f"{DIM}Cancelled{RESET}")
2434 return None
2435
2436 success, conversation_snapshot = session_manager.checkpoint_manager.restore_checkpoint(
2437 checkpoint_id,
2438 session_manager.current_session.baseline_files
2439 )
2440 if success:
2441 print(f"{GREEN}✓ Restored files to checkpoint {checkpoint_id}{RESET}")
2442 if conversation_snapshot:
2443 print(f"{GREEN}✓ Restored conversation ({len(conversation_snapshot)} messages){RESET}")
2444 return conversation_snapshot
2445 else:
2446 print(f"{YELLOW}⚠ No conversation snapshot found for this checkpoint{RESET}")
2447 return None
2448 else:
2449 print(f"{RED}✗ Failed to restore checkpoint{RESET}")
2450 return None
2451
2452 else:
2453 print(f"{RED}Unknown command: {cmd}{RESET}")
2454 return None
2455
2456
2457if __name__ == "__main__":
2458 main()
2459