nano.py
· 70 KiB · Python
Raw
#!/usr/bin/env python3
"""nanocode - minimal claude code alternative"""
import glob as globlib
import hashlib
import json
import os
import random
import re
import readline
import select
import ssl
import subprocess
import sys
import termios
import time
import tty
import urllib.request
import urllib.parse
from datetime import datetime
OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY")
LOCAL_API_KEY = os.environ.get("LOCAL_API_KEY")
API_URL = (
"http://127.0.0.1:8990/v1/messages" if LOCAL_API_KEY
else "https://openrouter.ai/api/v1/messages" if OPENROUTER_KEY
else "https://api.anthropic.com/v1/messages"
)
MODEL = os.environ.get("MODEL",
"anthropic/claude-sonnet-4.5" if LOCAL_API_KEY
else "anthropic/claude-opus-4.5" if OPENROUTER_KEY
else "claude-opus-4-5"
)
# ANSI colors
RESET, BOLD, DIM = "\033[0m", "\033[1m", "\033[2m"
BLUE, CYAN, GREEN, YELLOW, RED = "\033[34m", "\033[36m", "\033[32m", "\033[33m", "\033[31m"
stop_flag = False
def create_opener():
"""Create URL opener with SSL and proxy support"""
proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
handlers = [urllib.request.HTTPSHandler(context=ssl_ctx)]
if proxy: handlers.insert(0, urllib.request.ProxyHandler({"http": proxy, "https": proxy}))
return urllib.request.build_opener(*handlers)
def register_tool(name, desc, params):
"""Register a tool from extension code"""
def decorator(func):
TOOLS[name] = (desc, params, func)
return func
return decorator
def search_extension(args):
"""Search extensions from gist.kitchain.cn"""
query = args.get("query", "")
if not query: return "error: query required"
try:
# Split query into keywords
keywords = query.lower().split()
gist_info = {} # {gist_path: {"hits": count, "title": str, "desc": str, "topics": []}}
opener = create_opener()
# Search each keyword as a topic
for keyword in keywords:
url = f"https://gist.kitchain.cn/topics/{urllib.parse.quote(keyword)}"
html = opener.open(urllib.request.Request(url), timeout=10).read().decode()
# Extract gist URLs and titles
gist_matches = re.findall(
r'<a class="font-bold" href="https://gist\.kitchain\.cn/([^/]+/[a-f0-9]+)">([^<]+)</a>',
html
)
for gist_path, title in gist_matches:
if gist_path not in gist_info:
# Extract description and topics for this gist
gist_section = re.search(
rf'{re.escape(gist_path)}.*?'
r'<h6 class="text-xs[^"]*">([^<]+)</h6>(.*?)</div>\s*</div>',
html, re.DOTALL
)
desc = ""
topics = []
if gist_section:
desc = gist_section.group(1).strip()
topics_section = gist_section.group(2)
topics = re.findall(r'topics/([^"]+)"[^>]*>([^<]+)<', topics_section)
topics = [t[1] for t in topics] # Extract topic names
gist_info[gist_path] = {
"hits": 0,
"title": title.strip(),
"desc": desc,
"topics": topics,
"filename": title.strip()
}
gist_info[gist_path]["hits"] += 1
if not gist_info: return f"No extensions found: {query}"
# Sort by hit count (descending)
sorted_gists = sorted(gist_info.items(), key=lambda x: x[1]["hits"], reverse=True)[:10]
result = f"Found {len(sorted_gists)} extensions:\n\n"
for gist_path, info in sorted_gists:
result += f"• {info['title']}\n"
if info['desc']:
result += f" {info['desc']}\n"
if info['topics']:
result += f" Topics: {', '.join(info['topics'])}\n"
result += f" Matched: {info['hits']} keyword(s)\n\n"
# Return first gist's load URL
first_gist = sorted_gists[0][0]
first_filename = sorted_gists[0][1]['filename']
result += f"To load the top result:\nload({{\"url\": \"https://gist.kitchain.cn/{first_gist}/raw/HEAD/{first_filename}\"}})"
return result
except Exception as e:
return f"error: {e}"
def load(args):
"""Load extension from URL"""
url = args.get("url")
if not url: return "error: url required"
try:
opener = create_opener()
code = opener.open(urllib.request.Request(url), timeout=10).read().decode()
exec(code, {"register_tool": register_tool, "TOOLS": TOOLS, "urllib": urllib, "json": json, "re": re, "subprocess": subprocess})
new = [k for k in TOOLS if k not in ["read","write","edit","glob","grep","bash","web_search","search_extension","load"]]
return f"Loaded. New tools: {', '.join(new)}"
except Exception as e:
return f"error: {e}"
# --- Tools ---
def read(args):
lines = open(args["path"]).readlines()
offset, limit = args.get("offset", 0), args.get("limit", len(lines))
return "".join(f"{offset+i+1:4}| {l}" for i, l in enumerate(lines[offset:offset+limit]))
def write(args):
filepath = args["path"]
content = args["content"]
print(f"{DIM}[LOG] write: {filepath} ({len(content)} bytes){RESET}", flush=True)
open(filepath, "w").write(content)
print(f"{DIM}[LOG] write completed: {filepath}{RESET}", flush=True)
return "ok"
def edit(args):
filepath = args["path"]
print(f"{DIM}[LOG] edit: {filepath}{RESET}", flush=True)
text = open(filepath).read()
print(f"{DIM}[LOG] edit read: {len(text)} bytes{RESET}", flush=True)
old, new = args["old"], args["new"]
if old not in text: return "error: old_string not found"
count = text.count(old)
if not args.get("all") and count > 1:
return f"error: old_string appears {count} times (use all=true)"
result = text.replace(old, new) if args.get("all") else text.replace(old, new, 1)
print(f"{DIM}[LOG] edit writing: {len(result)} bytes{RESET}", flush=True)
open(filepath, "w").write(result)
print(f"{DIM}[LOG] edit completed: {filepath}{RESET}", flush=True)
return "ok"
def glob(args):
pattern = (args.get("path", ".") + "/" + args["pat"]).replace("//", "/")
files = sorted(globlib.glob(pattern, recursive=True),
key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0, reverse=True)
return "\n".join(files) or "none"
def grep(args):
pattern, hits = re.compile(args["pat"]), []
for fp in globlib.glob(args.get("path", ".") + "/**", recursive=True):
try:
for n, l in enumerate(open(fp), 1):
if pattern.search(l): hits.append(f"{fp}:{n}:{l.rstrip()}")
except: pass
return "\n".join(hits[:50]) or "none"
def bash(args):
global stop_flag
proc = subprocess.Popen(args["cmd"], shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, text=True)
lines = []
old_settings = termios.tcgetattr(sys.stdin)
try:
tty.setcbreak(sys.stdin.fileno())
if proc.stdout:
import fcntl
fd = proc.stdout.fileno()
fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
while True:
# Check ESC key
if select.select([sys.stdin], [], [], 0)[0]:
if sys.stdin.read(1) == '\x1b':
stop_flag = True
proc.kill()
lines.append("\n(stopped)")
print(f"\n{YELLOW}⏸ Stopped{RESET}")
break
# Read output
if select.select([proc.stdout], [], [], 0.1)[0]:
line = proc.stdout.readline()
if line:
print(f" {DIM}│ {line.rstrip()}{RESET}", flush=True)
lines.append(line)
# Check if done
if proc.poll() is not None:
remaining = proc.stdout.read()
if remaining:
for line in remaining.split('\n'):
if line:
print(f" {DIM}│ {line.rstrip()}{RESET}", flush=True)
lines.append(line + '\n')
break
if not stop_flag:
proc.wait(timeout=30)
except subprocess.TimeoutExpired:
proc.kill()
lines.append("\n(timeout)")
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
return "".join(lines).strip() or "(empty)"
def web_search(args):
"""Search web using DuckDuckGo"""
query, max_results = args["query"], args.get("max_results", 5)
try:
url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote_plus(query)}"
headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"}
opener = create_opener()
html = opener.open(urllib.request.Request(url, headers=headers), timeout=30).read().decode()
# Extract titles and URLs
links = re.findall(r'class="result__a"[^>]+href="([^"]+)"[^>]*>([^<]+)<', html)
# Extract snippets
snippets = re.findall(r'class="result__snippet"[^>]*>([^<]*)<', html)
if not links: return "No results found"
results = []
for i, ((link, title), snippet) in enumerate(zip(links[:max_results], snippets[:max_results] + [""] * max_results), 1):
results.append(f"{i}. {title.strip()}\n URL: {link}\n {snippet.strip()}\n")
return "\n".join(results)
except Exception as e:
return f"error: {e}"
TOOLS = {
"read": ("Read file with line numbers", {"path": "string", "offset": "number?", "limit": "number?"}, read),
"write": ("Write content to file", {"path": "string", "content": "string"}, write),
"edit": ("Replace old with new in file", {"path": "string", "old": "string", "new": "string", "all": "boolean?"}, edit),
"glob": ("Find files by pattern", {"pat": "string", "path": "string?"}, glob),
"grep": ("Search files for regex", {"pat": "string", "path": "string?"}, grep),
"bash": ("Run shell command", {"cmd": "string"}, bash),
"web_search": ("Search the web using DuckDuckGo", {"query": "string", "max_results": "number?"}, web_search),
"search_extension": ("Search for extensions to add new capabilities (GitHub docs, web scraping, APIs, etc)", {"query": "string"}, search_extension),
"load": ("Load extension from URL to add new tools", {"url": "string"}, load),
}
def run_tool(name, args):
try: return TOOLS[name][2](args)
except Exception as e: return f"error: {e}"
def make_schema():
result = []
for name, (desc, params, _) in TOOLS.items():
props, req = {}, []
for pname, ptype in params.items():
opt = ptype.endswith("?")
props[pname] = {"type": "integer" if ptype.rstrip("?") == "number" else ptype.rstrip("?")}
if not opt: req.append(pname)
result.append({"name": name, "description": desc,
"input_schema": {"type": "object", "properties": props, "required": req}})
return result
def call_api(messages, system_prompt, stream=True, enable_thinking=True, use_tools=True):
headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01"}
if LOCAL_API_KEY: headers["Authorization"] = f"Bearer {LOCAL_API_KEY}"
elif OPENROUTER_KEY: headers["Authorization"] = f"Bearer {OPENROUTER_KEY}"
else: headers["x-api-key"] = os.environ.get("ANTHROPIC_API_KEY", "")
data = {"model": MODEL, "max_tokens": 8192, "system": system_prompt,
"messages": messages, "stream": stream}
if use_tools:
data["tools"] = make_schema()
if enable_thinking and os.environ.get("THINKING"):
data["thinking"] = {"type": "enabled", "budget_tokens": int(os.environ.get("THINKING_BUDGET", "10000"))}
req = urllib.request.Request(API_URL, json.dumps(data).encode(), headers, method="POST")
return create_opener().open(req)
def summarize_changes(user_input, files_modified, checkpoint_manager, checkpoint_id):
"""Use LLM to summarize the changes made in this turn
Args:
user_input: User's request
files_modified: Set of modified file paths
checkpoint_manager: CheckpointManager instance
checkpoint_id: Checkpoint hash to get diff from
Returns:
str: One-line summary of changes
"""
if not files_modified or not checkpoint_id:
return user_input[:50]
try:
# Get diff from git
diff_output = checkpoint_manager._git_command(
"--git-dir", checkpoint_manager.bare_repo,
"show", "--format=", checkpoint_id
)
# Check if diff is empty or error - no actual changes
if not diff_output or diff_output.startswith("error") or len(diff_output.strip()) == 0:
# No diff available, just use user input
return user_input[:50]
# Limit diff size to avoid token overflow (max ~3000 chars)
if len(diff_output) > 3000:
diff_output = diff_output[:3000] + "\n... (truncated)"
summary_prompt = f"""Based on the actual code changes (diff), generate a brief Chinese summary (max 30 Chinese characters).
IMPORTANT: Must be based on the actual code changes, not the user's description.
Code changes (diff):
{diff_output}
User description (for reference only): {user_input}
Requirements:
1. Describe what code/functionality was actually modified
2. Reply in Chinese only, no explanation
3. No quotes
4. Max 30 Chinese characters
Good examples:
- 在 auth.py 添加 JWT 验证
- 修复 parser.py 空指针异常
- 重构 database.py 连接池
- 更新 README 添加安装说明
"""
messages = [{"role": "user", "content": summary_prompt}]
response = call_api(messages, "You are a code change analyzer, skilled at extracting key information from diffs. Reply in Chinese.",
stream=False, enable_thinking=False, use_tools=False)
# Parse non-streaming response
data = json.loads(response.read().decode())
blocks = data.get("content", [])
for block in blocks:
if block.get("type") == "text":
summary = block.get("text", "").strip()
# Remove thinking tags if present
if "<thinking>" in summary:
# Extract content after </thinking>
parts = summary.split("</thinking>")
if len(parts) > 1:
summary = parts[-1].strip()
# Clean up and limit length
summary = summary.replace('"', '').replace("'", "")
if summary and len(summary) <= 80:
return summary
# Fallback to user input
return user_input[:50]
except Exception as e:
# On error, fallback to user input
return user_input[:50]
def process_stream(response):
"""简化的流式处理,支持ESC中断"""
global stop_flag
blocks, current, text_buf, json_buf, think_buf = [], None, "", "", ""
# Save terminal settings
old_settings = termios.tcgetattr(sys.stdin)
try:
tty.setcbreak(sys.stdin.fileno())
for line in response:
if select.select([sys.stdin], [], [], 0)[0]:
ch = sys.stdin.read(1)
if ch == '\x1b': # ESC key
stop_flag = True
print(f"\n{YELLOW}⏸ Stopped{RESET}")
break
line = line.decode("utf-8").strip()
if not line.startswith("data: "): continue
if line == "data: [DONE]": continue
try:
data = json.loads(line[6:])
etype = data.get("type")
if etype == "content_block_start":
block = data.get("content_block", {})
current = {"type": block.get("type"), "id": block.get("id")}
if current["type"] == "text":
text_buf = ""
print(f"\n{CYAN}⏺{RESET} ", end="", flush=True)
elif current["type"] == "thinking":
think_buf = ""
print(f"\n{YELLOW}💭{RESET} {DIM}", end="", flush=True)
elif current["type"] == "tool_use":
current["name"] = block.get("name")
json_buf = ""
elif etype == "content_block_delta":
delta = data.get("delta", {})
dtype = delta.get("type")
if dtype == "text_delta":
text = delta.get("text", "")
text_buf += text
print(text, end="", flush=True)
elif dtype == "thinking_delta":
text = delta.get("thinking", "")
think_buf += text
print(text, end="", flush=True)
elif dtype == "input_json_delta" and current:
json_buf += delta.get("partial_json", "")
elif etype == "content_block_stop" and current:
if current["type"] == "text":
current["text"] = text_buf
print()
elif current["type"] == "thinking":
print(RESET)
elif current["type"] == "tool_use":
try: current["input"] = json.loads(json_buf)
except: current["input"] = {}
blocks.append(current)
current = None
except: pass
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
return blocks
def is_file_in_project(filepath, project_path):
"""Check if file is within project directory"""
try:
abs_file = os.path.abspath(filepath)
abs_project = os.path.abspath(project_path)
# Check if file is under project directory
return abs_file.startswith(abs_project + os.sep) or abs_file == abs_project
except:
return False
def read_multiline_input():
"""Read multiline input. Enter to submit, Alt+Enter for newline."""
lines = []
current = ""
cursor_pos = 0 # Cursor position in current line
# Enable bracketed paste mode
print("\033[?2004h", end="", flush=True)
old_settings = termios.tcgetattr(sys.stdin)
try:
tty.setcbreak(sys.stdin.fileno())
print(f"{BOLD}{BLUE}❯{RESET} ", end="", flush=True)
while True:
ch = sys.stdin.read(1)
if ch == '\x03': # Ctrl+C - clear input
lines.clear()
current = ""
cursor_pos = 0
print("\r\033[K", end="", flush=True)
print(f"{BOLD}{BLUE}❯{RESET} ", end="", flush=True)
continue
if ch == '\x04': # Ctrl+D
raise EOFError
if ch == '\x1b': # Escape sequence
next_ch = sys.stdin.read(1)
if next_ch in ('\r', '\n'): # Alt+Enter
lines.append(current)
current = ""
cursor_pos = 0
print(f"\n{BOLD}{BLUE}│{RESET} ", end="", flush=True)
elif next_ch == '[': # Escape sequence
seq = sys.stdin.read(1)
if seq == 'C': # Right arrow
if cursor_pos < len(current):
cursor_pos += 1
print("\033[C", end="", flush=True)
elif seq == 'D': # Left arrow
if cursor_pos > 0:
cursor_pos -= 1
print("\033[D", end="", flush=True)
elif seq == '2': # Bracketed paste start: ESC[200~
rest = sys.stdin.read(3) # Read "00~"
if rest == '00~':
# Read pasted content until ESC[201~
paste_buf = ""
while True:
c = sys.stdin.read(1)
if c == '\x1b':
# Check for [201~
peek = sys.stdin.read(5)
if peek == '[201~':
break
else:
paste_buf += c + peek
else:
paste_buf += c
# Process pasted content
paste_lines = paste_buf.split('\n')
if len(paste_lines) == 1:
# Single line paste
current = current[:cursor_pos] + paste_lines[0] + current[cursor_pos:]
cursor_pos += len(paste_lines[0])
prefix = f"{BOLD}{BLUE}{'│' if lines else '❯'}{RESET} "
print(f"\r\033[K{prefix}{current}", end="", flush=True)
else:
# Multi-line paste
# First line appends to current
first_line = current[:cursor_pos] + paste_lines[0]
print(paste_lines[0], end="", flush=True)
if first_line:
lines.append(first_line)
# Middle lines
for line in paste_lines[1:-1]:
print(f"\n{BOLD}{BLUE}│{RESET} {line}", end="", flush=True)
lines.append(line)
# Last line becomes new current
current = paste_lines[-1]
cursor_pos = len(current)
print(f"\n{BOLD}{BLUE}│{RESET} {current}", end="", flush=True)
continue
if ch in ('\r', '\n'): # Enter - submit
if current:
lines.append(current)
print()
break
if ch in ('\x7f', '\x08'): # Backspace
if cursor_pos > 0:
# Delete character before cursor
current = current[:cursor_pos-1] + current[cursor_pos:]
cursor_pos -= 1
# Redraw current line
prefix = f"{BOLD}{BLUE}{'│' if lines else '❯'}{RESET} "
print(f"\r\033[K{prefix}{current}", end="", flush=True)
# Move cursor back to position
if cursor_pos < len(current):
print(f"\033[{len(current) - cursor_pos}D", end="", flush=True)
elif lines:
# Merge with previous line
prev_line = lines.pop()
cursor_pos = len(prev_line) # Cursor at end of previous line
current = prev_line + current
# Move up and redraw
print("\033[A\033[K", end="", flush=True)
prefix = f"{BOLD}{BLUE}{'│' if lines else '❯'}{RESET} "
print(f"\r{prefix}{current}", end="", flush=True)
if cursor_pos < len(current):
print(f"\033[{len(current) - cursor_pos}D", end="", flush=True)
continue
if ch.isprintable() or ch == '\t':
# Insert character at cursor position
current = current[:cursor_pos] + ch + current[cursor_pos:]
cursor_pos += 1
# Redraw from cursor position
print(f"{ch}{current[cursor_pos:]}", end="", flush=True)
# Move cursor back if needed
if cursor_pos < len(current):
print(f"\033[{len(current) - cursor_pos}D", end="", flush=True)
finally:
# Disable bracketed paste mode
print("\033[?2004l", end="", flush=True)
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
return "\n".join(lines).strip()
def main():
global stop_flag
# Parse command line arguments
continue_session = "-c" in sys.argv or "--continue" in sys.argv
list_sessions = "-l" in sys.argv or "--list" in sys.argv
# Disable Ctrl+C signal
old_settings = termios.tcgetattr(sys.stdin)
new_settings = termios.tcgetattr(sys.stdin)
new_settings[3] = new_settings[3] & ~termios.ISIG # Disable signal generation
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)
try:
proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy")
proxy_info = f" | {DIM}🌐 {proxy}{RESET}" if proxy else ""
thinking_info = f" | {YELLOW}💭{RESET}" if os.environ.get("THINKING") else ""
if list_sessions:
session_mode = f" | {YELLOW}Select{RESET}"
elif continue_session:
session_mode = f" | {GREEN}Continue{RESET}"
else:
session_mode = f" | {CYAN}New{RESET}"
print(f"{BOLD}nanocode{RESET} | {DIM}{MODEL} | {os.getcwd()}{proxy_info}{thinking_info}{session_mode}{RESET}")
print(f"{DIM}Shortcuts: Enter=submit | Alt+Enter=newline | Ctrl+C=clear input | Ctrl+D=exit | ESC=stop{RESET}")
print(f"{DIM}Commands: /c [all|<id>] | /ca | /clear{RESET}")
print(f"{DIM}Usage: nanocode (new) | nanocode -c (continue) | nanocode -l (select){RESET}\n")
selected_session_id = None
if list_sessions:
selected_session_id = select_session_interactive()
if not selected_session_id:
print(f"{DIM}Exiting...{RESET}")
return
run_main_loop(continue_session, selected_session_id)
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
def select_session_interactive():
"""Display sessions and let user select one
Returns:
session_id: Selected session ID, or None if cancelled
"""
session_manager = SessionManager(os.getcwd())
sessions = session_manager.list_sessions()[:10] # Limit to 10 most recent
if not sessions:
print(f"{YELLOW}⚠ No previous sessions found{RESET}")
print(f"{DIM}Starting new session...{RESET}\n")
return None
print(f"{BOLD}📂 Recent Sessions:{RESET}\n")
for i, sess_info in enumerate(sessions, 1):
created = datetime.fromtimestamp(sess_info['metadata']['created_at']).strftime('%Y-%m-%d %H:%M')
last_active = datetime.fromtimestamp(sess_info['metadata']['last_active']).strftime('%Y-%m-%d %H:%M')
desc = sess_info['metadata'].get('description', '(no description)')
# Git info
git_commit = sess_info['metadata'].get('git_commit')
git_branch = sess_info['metadata'].get('git_branch')
git_dirty = sess_info['metadata'].get('git_dirty', False)
print(f"{CYAN}{i}.{RESET} {BOLD}{sess_info['session_id']}{RESET}")
print(f" {desc}")
git_info = ""
if git_commit and git_branch:
dirty_mark = f"{YELLOW}*{RESET}" if git_dirty else ""
git_info = f" | Git: {git_branch}@{git_commit}{dirty_mark}"
print(f" Created: {created} | Last: {last_active} | {sess_info['message_count']} messages{git_info}\n")
print(f"{DIM}Enter session number (1-{len(sessions)}), or press Enter for new session:{RESET}")
try:
choice = input(f"{BOLD}{BLUE}❯{RESET} ").strip()
if not choice:
# Empty input = new session
return None
try:
idx = int(choice) - 1
if 0 <= idx < len(sessions):
return sessions[idx]['session_id']
else:
print(f"{RED}✗ Invalid number{RESET}")
return None
except ValueError:
print(f"{RED}✗ Invalid input{RESET}")
return None
except (EOFError, KeyboardInterrupt):
return None
def run_main_loop(continue_session=False, selected_session_id=None):
# Initialize session manager
session_manager = SessionManager(os.getcwd())
# Load or create session based on parameters
if selected_session_id:
# Load specific session selected by user
session = session_manager.load_session(selected_session_id)
if session:
git_info = ""
git_commit = session.metadata.get('git_commit')
git_branch = session.metadata.get('git_branch')
if git_commit and git_branch:
git_dirty = session.metadata.get('git_dirty', False)
dirty_mark = f"{YELLOW}*{RESET}" if git_dirty else ""
git_info = f" | Git: {git_branch}@{git_commit}{dirty_mark}"
print(f"{GREEN}✓ Loaded session: {session.session_id}{RESET}")
print(f"{DIM} └─ {len(session.messages)} messages{git_info}{RESET}")
# Check for conflicts
conflicts = session.detect_conflicts()
if conflicts:
print(f"\n{YELLOW}⚠ File conflicts detected:{RESET}")
for filepath in conflicts[:5]:
print(f" - {filepath}")
if len(conflicts) > 5:
print(f" ... and {len(conflicts)-5} more")
print(f"\n{DIM}These files have been modified outside this session.{RESET}")
confirm = input(f"{BOLD}Continue anyway? (y/N/u=update): {RESET}").strip().lower()
if confirm == 'u':
session.update_file_states()
session_manager.save_session()
print(f"{GREEN}✓ Updated file states{RESET}\n")
elif confirm != 'y':
print(f"{DIM}Creating new session instead...{RESET}\n")
session_manager.create_session()
else:
print()
else:
print()
else:
print(f"{RED}✗ Failed to load session{RESET}")
print(f"{GREEN}✓ Creating new session instead{RESET}\n")
session_manager.create_session()
elif continue_session:
# Continue last session
last_session = session_manager.load_last_session()
if last_session:
git_info = ""
git_commit = last_session.metadata.get('git_commit')
git_branch = last_session.metadata.get('git_branch')
if git_commit and git_branch:
git_dirty = last_session.metadata.get('git_dirty', False)
dirty_mark = f"{YELLOW}*{RESET}" if git_dirty else ""
git_info = f" | Git: {git_branch}@{git_commit}{dirty_mark}"
print(f"{GREEN}✓ Continued session: {last_session.session_id}{RESET}")
print(f"{DIM} └─ {len(last_session.messages)} messages{git_info}{RESET}")
# Check for conflicts
conflicts = last_session.detect_conflicts()
if conflicts:
print(f"\n{YELLOW}⚠ File conflicts detected:{RESET}")
for filepath in conflicts[:5]:
print(f" - {filepath}")
if len(conflicts) > 5:
print(f" ... and {len(conflicts)-5} more")
print(f"\n{DIM}These files have been modified outside this session.{RESET}")
confirm = input(f"{BOLD}Continue anyway? (y/N/u=update): {RESET}").strip().lower()
if confirm == 'u':
last_session.update_file_states()
session_manager.save_session()
print(f"{GREEN}✓ Updated file states{RESET}\n")
elif confirm != 'y':
print(f"{DIM}Creating new session instead...{RESET}\n")
session_manager.create_session()
else:
print()
else:
print()
else:
# No previous session, create new one
session_manager.create_session()
print(f"{YELLOW}⚠ No previous session found{RESET}")
print(f"{GREEN}✓ Created new session: {session_manager.current_session.session_id}{RESET}\n")
else:
# Always create new session by default
# Try to detect parent from last session's latest checkpoint
parent_checkpoint = None
parent_session = None
last_session = session_manager.load_last_session()
if last_session:
# Get the latest checkpoint from last session
checkpoints = session_manager.checkpoint_manager.list_checkpoints(show_all=False)
if checkpoints:
parent_checkpoint = checkpoints[0][0] # Latest checkpoint hash
parent_session = last_session.session_id
session_manager.create_session(
parent_checkpoint=parent_checkpoint,
parent_session=parent_session
)
git_info = ""
git_commit = session_manager.current_session.metadata.get('git_commit')
git_branch = session_manager.current_session.metadata.get('git_branch')
if git_commit and git_branch:
git_dirty = session_manager.current_session.metadata.get('git_dirty', False)
dirty_mark = f"{YELLOW}*{RESET}" if git_dirty else ""
git_info = f" | Git: {git_branch}@{git_commit}{dirty_mark}"
if parent_checkpoint:
print(f"{GREEN}✓ Created new session: {session_manager.current_session.session_id}{RESET}")
print(f"{DIM} └─ Branched from {parent_session[:8]}... @ {parent_checkpoint}{git_info}{RESET}\n")
else:
print(f"{GREEN}✓ Created new session: {session_manager.current_session.session_id}{RESET}")
if git_info:
print(f"{DIM} └─{git_info}{RESET}\n")
else:
print()
files_modified = set()
auto_checkpoint = True
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
system_prompt = f"""Concise coding assistant. cwd: {os.getcwd()} Current time: {current_time}
IMPORTANT: When you don't have a tool for the task, ALWAYS try search_extension first before saying you can't do it.
Examples:
- User asks about GitHub repo → search_extension({{"query": "github documentation"}})
- User needs web data → search_extension({{"query": "web scraping"}})
- User needs API → search_extension({{"query": "api client"}})"""
while True:
try:
print(f"{DIM}{'─'*80}{RESET}")
user_input = read_multiline_input()
print(f"{DIM}{'─'*80}{RESET}")
if not user_input: continue
if user_input in ("/q", "exit"):
session_manager.save_session()
break
# Handle /clear command first (before /c to avoid conflict)
if user_input == "/clear":
# Save current session
session_manager.save_session()
# Get latest checkpoint from current session (if any)
checkpoints = session_manager.checkpoint_manager.list_checkpoints(show_all=False)
parent_checkpoint = checkpoints[0][0] if checkpoints else None
parent_session = session_manager.current_session.session_id
# Create new session branched from current
session_manager.create_session(
parent_checkpoint=parent_checkpoint,
parent_session=parent_session
)
# Reset state
files_modified.clear()
print(f"{GREEN}✓ Started new session: {session_manager.current_session.session_id}{RESET}")
if parent_checkpoint:
print(f"{DIM} └─ Branched from {parent_session[:8]}... @ {parent_checkpoint}{RESET}")
continue
# Handle checkpoint commands
if user_input.startswith("/checkpoint") or user_input.startswith("/c") or user_input == "/ca":
parts = user_input.split()
# /ca is shortcut for /c all
if parts[0] == "/ca":
parts = ["/c", "all"]
# /c without args defaults to list
if len(parts) == 1 and parts[0] in ["/c", "/checkpoint"]:
parts.append("list")
restored_messages = handle_checkpoint_command(parts, session_manager, files_modified)
if restored_messages is not None:
# Restore conversation by replacing session messages
session_manager.current_session.messages = restored_messages
session_manager.save_session()
continue
# Add user message to current session
session_manager.current_session.messages.append({"role": "user", "content": user_input})
# Reset stop flag for new turn
stop_flag = False
# Track files modified in this turn
files_modified_this_turn = set()
while True:
response = call_api(session_manager.current_session.messages, system_prompt)
blocks = process_stream(response)
if stop_flag: break
tool_results = []
for block in blocks:
if block["type"] == "tool_use":
name, args = block["name"], block["input"]
preview = str(list(args.values())[0])[:50] if args else ""
print(f"\n{GREEN}⏺ {name}{RESET}({DIM}{preview}{RESET})")
result = run_tool(name, args)
lines = result.split("\n")
prev = lines[0][:60] + ("..." if len(lines[0]) > 60 else "")
if len(lines) > 1: prev += f" +{len(lines)-1}"
print(f" {DIM}⎿ {prev}{RESET}")
# Track file modifications (only project files)
if name in ['write', 'edit']:
filepath = args.get('path')
if filepath and is_file_in_project(filepath, session_manager.project_path):
files_modified.add(filepath)
files_modified_this_turn.add(filepath)
session_manager.current_session.track_file_state(filepath)
tool_results.append({"type": "tool_result", "tool_use_id": block["id"], "content": result})
# Check stop_flag after each tool execution
if stop_flag:
print(f"{YELLOW}⚠ Tool execution stopped{RESET}")
break
session_manager.current_session.messages.append({"role": "assistant", "content": blocks})
if not tool_results or stop_flag: break
session_manager.current_session.messages.append({"role": "user", "content": tool_results})
# Auto checkpoint after AI work (if project files were modified)
if auto_checkpoint and files_modified_this_turn:
# files_modified_this_turn already filtered to project files only
# Use parent_commit for first checkpoint of new session
parent_commit = session_manager.parent_commit_for_next_checkpoint
checkpoint_id = session_manager.checkpoint_manager.create_checkpoint(
f"Auto: {user_input[:50]}",
list(files_modified_this_turn),
conversation_snapshot=session_manager.current_session.messages.copy(),
parent_commit=parent_commit
)
# Clear parent after first checkpoint
if parent_commit:
session_manager.parent_commit_for_next_checkpoint = None
if checkpoint_id:
# Generate summary using LLM with actual diff
print(f"{DIM}Generating checkpoint summary...{RESET}", end="", flush=True)
summary = summarize_changes(
user_input,
files_modified_this_turn,
session_manager.checkpoint_manager,
checkpoint_id
)
print(f"\r{' ' * 40}\r", end="", flush=True) # Clear the line
# Update commit message with better summary (only if different from temp message)
temp_message = f"Auto: {user_input[:50]}"
if summary != user_input[:50] and summary != temp_message:
session_manager.checkpoint_manager._git_command(
"--git-dir", session_manager.checkpoint_manager.bare_repo,
"commit", "--amend", "-m", summary
)
print(f"\n{YELLOW}📍 {checkpoint_id}: {summary}{RESET}")
else:
# Checkpoint creation failed (e.g., no actual diff)
print(f"\n{DIM}(No project file changes to checkpoint){RESET}")
# Auto-save session after each interaction
session_manager.save_session()
print()
except EOFError:
session_manager.save_session()
break
except Exception as e: print(f"{RED}⏺ Error: {e}{RESET}")
# ============================================================================
# Checkpoint & Session Management (Phase 1+2)
# ============================================================================
class CheckpointManager:
"""Manage checkpoints using shadow bare git repository with session isolation"""
def __init__(self, project_path, session_id=None):
self.project_path = project_path
self.session_id = session_id
self.nanocode_dir = os.path.join(project_path, ".nanocode")
self.bare_repo = os.path.join(self.nanocode_dir, "checkpoint.git")
self._init_bare_repo()
def set_session(self, session_id):
"""Set current session for checkpoint operations"""
self.session_id = session_id
def _get_branch_name(self):
"""Get git branch name for current session"""
if not self.session_id:
return "main"
return f"session_{self.session_id}"
def _init_bare_repo(self):
"""Initialize shadow bare repository"""
if not os.path.exists(self.bare_repo):
os.makedirs(self.bare_repo, exist_ok=True)
try:
subprocess.run(
["git", "init", "--bare", self.bare_repo],
capture_output=True, check=True
)
except (subprocess.CalledProcessError, FileNotFoundError):
# Git not available, will handle gracefully
pass
def _git_command(self, *args, cwd=None):
"""Execute git command"""
try:
result = subprocess.run(
["git"] + list(args),
cwd=cwd or self.project_path,
capture_output=True,
text=True,
check=True
)
return result.stdout.strip()
except (subprocess.CalledProcessError, FileNotFoundError) as e:
return f"error: {e}"
def create_checkpoint(self, message, files_changed, conversation_snapshot=None, parent_commit=None):
"""Create a checkpoint on current session's branch
Args:
message: Commit message
files_changed: List of modified files
conversation_snapshot: Conversation state to save
parent_commit: Parent commit hash to branch from (for new sessions)
"""
print(f"{DIM}[LOG] create_checkpoint: files_changed={files_changed}{RESET}", flush=True)
if not files_changed or not self.session_id:
return None
branch_name = self._get_branch_name()
# Save conversation snapshot
if conversation_snapshot:
snapshot_file = os.path.join(self.nanocode_dir, "conversation_snapshots.json")
snapshots = {}
if os.path.exists(snapshot_file):
with open(snapshot_file, 'r') as f:
snapshots = json.load(f)
# Create temp worktree for this session
temp_worktree = os.path.join(self.nanocode_dir, f"temp_worktree_{self.session_id}")
try:
# Check if branch exists
branch_exists = self._git_command("--git-dir", self.bare_repo, "rev-parse", "--verify", branch_name)
if not branch_exists or branch_exists.startswith("error"):
# Create new branch
os.makedirs(temp_worktree, exist_ok=True)
self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "config", "core.bare", "false")
# If parent_commit specified, branch from it
if parent_commit:
# Create branch from parent commit
self._git_command("--git-dir", self.bare_repo, "branch", branch_name, parent_commit)
self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "checkout", branch_name, "-f")
else:
# Create orphan branch (no parent)
self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "checkout", "--orphan", branch_name)
# Copy files to temp worktree
for filepath in files_changed:
print(f"{DIM}[LOG] checkpoint copying: {filepath}{RESET}", flush=True)
if os.path.exists(filepath):
file_size = os.path.getsize(filepath)
print(f"{DIM}[LOG] source file exists: {filepath} ({file_size} bytes){RESET}", flush=True)
# Convert absolute path to relative path
if os.path.isabs(filepath):
rel_filepath = os.path.relpath(filepath, self.project_path)
else:
rel_filepath = filepath
dest = os.path.join(temp_worktree, rel_filepath)
os.makedirs(os.path.dirname(dest), exist_ok=True)
with open(filepath, 'rb') as src, open(dest, 'wb') as dst:
content = src.read()
dst.write(content)
print(f"{DIM}[LOG] copied to temp_worktree: {dest} ({len(content)} bytes){RESET}", flush=True)
else:
print(f"{DIM}[LOG] source file NOT exists: {filepath}{RESET}", flush=True)
# Commit
self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "add", "-A")
self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree,
"commit", "-m", message, "--allow-empty")
commit_hash = self._git_command("--git-dir", self.bare_repo, "rev-parse", "HEAD")
checkpoint_id = commit_hash[:8] if commit_hash and not commit_hash.startswith("error") else None
# Save conversation snapshot with checkpoint_id
if checkpoint_id and conversation_snapshot:
snapshots[checkpoint_id] = conversation_snapshot
with open(snapshot_file, 'w') as f:
json.dump(snapshots, f, indent=2)
return checkpoint_id
else:
# Branch exists, checkout and commit
self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "checkout", branch_name, "-f")
# Update temp worktree
for filepath in files_changed:
print(f"{DIM}[LOG] checkpoint updating: {filepath}{RESET}", flush=True)
if os.path.exists(filepath):
file_size = os.path.getsize(filepath)
print(f"{DIM}[LOG] source file exists: {filepath} ({file_size} bytes){RESET}", flush=True)
# Convert absolute path to relative path
if os.path.isabs(filepath):
rel_filepath = os.path.relpath(filepath, self.project_path)
else:
rel_filepath = filepath
dest = os.path.join(temp_worktree, rel_filepath)
os.makedirs(os.path.dirname(dest), exist_ok=True)
with open(filepath, 'rb') as src, open(dest, 'wb') as dst:
content = src.read()
dst.write(content)
print(f"{DIM}[LOG] copied to temp_worktree: {dest} ({len(content)} bytes){RESET}", flush=True)
else:
print(f"{DIM}[LOG] source file NOT exists: {filepath}{RESET}", flush=True)
self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "add", "-A")
self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree,
"commit", "-m", message, "--allow-empty")
commit_hash = self._git_command("--git-dir", self.bare_repo, "rev-parse", "HEAD")
checkpoint_id = commit_hash[:8] if commit_hash and not commit_hash.startswith("error") else None
# Save conversation snapshot with checkpoint_id
if checkpoint_id and conversation_snapshot:
snapshots[checkpoint_id] = conversation_snapshot
with open(snapshot_file, 'w') as f:
json.dump(snapshots, f, indent=2)
return checkpoint_id
except Exception as e:
return None
def list_checkpoints(self, limit=10, show_all=False):
"""List recent checkpoints for current session
Args:
limit: Maximum number of checkpoints to show
show_all: If True, show all sessions; if False, only show current session
"""
if not self.session_id and not show_all:
return []
try:
if show_all:
# Show all branches
args = ["--git-dir", self.bare_repo, "log", f"--max-count={limit}", "--oneline", "--all"]
else:
# Show only current session's branch
branch_name = self._get_branch_name()
args = ["--git-dir", self.bare_repo, "log", f"--max-count={limit}", "--oneline", branch_name]
log = self._git_command(*args)
if log and not log.startswith("error"):
return [line.split(" ", 1) for line in log.split("\n") if line]
return []
except:
return []
def restore_checkpoint(self, checkpoint_id):
"""Restore files to checkpoint state and reset current session's branch
Returns:
tuple: (success: bool, conversation_snapshot: dict or None)
"""
if not self.session_id:
return False, None
branch_name = self._get_branch_name()
temp_worktree = os.path.join(self.nanocode_dir, f"temp_worktree_{self.session_id}")
try:
# Checkout branch first
self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "checkout", branch_name, "-f")
# Reset branch to checkpoint (discards future commits on this branch)
self._git_command("--git-dir", self.bare_repo, "reset", "--hard", checkpoint_id)
# Checkout to temp worktree
self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree,
"checkout", checkpoint_id, "-f")
# Copy files back to project
for root, dirs, files in os.walk(temp_worktree):
for file in files:
src = os.path.join(root, file)
rel_path = os.path.relpath(src, temp_worktree)
dest = os.path.join(self.project_path, rel_path)
os.makedirs(os.path.dirname(dest), exist_ok=True)
with open(src, 'rb') as s, open(dest, 'wb') as d:
d.write(s.read())
# Load conversation snapshot
snapshot_file = os.path.join(self.nanocode_dir, "conversation_snapshots.json")
conversation_snapshot = None
if os.path.exists(snapshot_file):
with open(snapshot_file, 'r') as f:
snapshots = json.load(f)
conversation_snapshot = snapshots.get(checkpoint_id)
return True, conversation_snapshot
except:
return False, None
class Session:
"""Represents a conversation session"""
def __init__(self, session_id=None):
self.session_id = session_id or self._generate_session_id()
self.messages = []
self.file_states = {}
self.metadata = {
'created_at': time.time(),
'last_active': time.time(),
'description': '',
'cwd': os.getcwd(),
'parent_checkpoint': None, # Track where this session branched from
'parent_session': None, # Track which session it branched from
'git_commit': None, # Project .git commit hash when session started
'git_branch': None, # Project .git branch when session started
'git_dirty': False, # Whether project had uncommitted changes
}
def _generate_session_id(self):
"""Generate unique session ID"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
random_suffix = ''.join(random.choices('0123456789abcdef', k=4))
return f"{timestamp}_{random_suffix}"
def _get_project_git_info(self):
"""Get current project git state"""
try:
cwd = self.metadata.get('cwd', os.getcwd())
# Get current commit
commit = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=cwd,
capture_output=True, text=True, check=True
).stdout.strip()
# Get current branch
branch = subprocess.run(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
cwd=cwd,
capture_output=True, text=True, check=True
).stdout.strip()
# Check if dirty
status = subprocess.run(
["git", "status", "--porcelain"],
cwd=cwd,
capture_output=True, text=True, check=True
).stdout.strip()
return {
'git_commit': commit[:8],
'git_branch': branch,
'git_dirty': bool(status)
}
except:
return None
def capture_git_state(self):
"""Capture current project git state into metadata"""
git_info = self._get_project_git_info()
if git_info:
self.metadata.update(git_info)
def track_file_state(self, filepath):
"""Track file state for conflict detection"""
if os.path.exists(filepath):
with open(filepath, 'rb') as f:
content = f.read()
file_hash = hashlib.md5(content).hexdigest()
self.file_states[filepath] = {
'hash': file_hash,
'mtime': os.path.getmtime(filepath),
'size': len(content)
}
def detect_conflicts(self):
"""Detect if tracked files have been modified outside this session
Returns:
list: List of conflicted file paths
"""
conflicts = []
for filepath, saved_state in self.file_states.items():
if os.path.exists(filepath):
with open(filepath, 'rb') as f:
content = f.read()
current_hash = hashlib.md5(content).hexdigest()
current_mtime = os.path.getmtime(filepath)
# Check if file has changed
if (current_hash != saved_state['hash'] or
current_mtime != saved_state['mtime']):
conflicts.append(filepath)
else:
# File was deleted
conflicts.append(f"{filepath} (deleted)")
return conflicts
def update_file_states(self):
"""Update all tracked file states to current state"""
for filepath in list(self.file_states.keys()):
if os.path.exists(filepath):
self.track_file_state(filepath)
else:
# Remove deleted files from tracking
del self.file_states[filepath]
def to_dict(self):
"""Serialize to dict"""
return {
'session_id': self.session_id,
'messages': self.messages,
'file_states': self.file_states,
'metadata': self.metadata
}
@staticmethod
def from_dict(data):
"""Deserialize from dict"""
session = Session(session_id=data['session_id'])
session.messages = data.get('messages', [])
session.file_states = data.get('file_states', {})
session.metadata = data.get('metadata', {})
return session
class SessionManager:
"""Manage multiple sessions"""
def __init__(self, project_path):
self.project_path = project_path
self.sessions_dir = os.path.join(project_path, ".nanocode", "sessions")
self.current_session = None
self.checkpoint_manager = CheckpointManager(project_path)
self.parent_commit_for_next_checkpoint = None # Track parent for first checkpoint
os.makedirs(self.sessions_dir, exist_ok=True)
def create_session(self, description="", parent_checkpoint=None, parent_session=None):
"""Create new session
Args:
description: Session description
parent_checkpoint: Checkpoint ID this session branches from
parent_session: Session ID this session branches from
"""
session = Session()
session.metadata['description'] = description
session.metadata['parent_checkpoint'] = parent_checkpoint
session.metadata['parent_session'] = parent_session
# Capture project git state
session.capture_git_state()
self.current_session = session
# Set checkpoint manager to use this session
self.checkpoint_manager.set_session(session.session_id)
# Store parent commit for first checkpoint
self.parent_commit_for_next_checkpoint = parent_checkpoint
self.save_session()
return session
def save_session(self):
"""Save current session to disk"""
if not self.current_session:
return
self.current_session.metadata['last_active'] = time.time()
session_file = os.path.join(
self.sessions_dir,
f"{self.current_session.session_id}.json"
)
with open(session_file, 'w') as f:
json.dump(self.current_session.to_dict(), f, indent=2)
def load_session(self, session_id):
"""Load session from disk"""
session_file = os.path.join(self.sessions_dir, f"{session_id}.json")
if not os.path.exists(session_file):
return None
with open(session_file, 'r') as f:
data = json.load(f)
session = Session.from_dict(data)
self.current_session = session
# Set checkpoint manager to use this session
self.checkpoint_manager.set_session(session.session_id)
return session
def list_sessions(self):
"""List all sessions"""
sessions = []
if not os.path.exists(self.sessions_dir):
return sessions
for filename in os.listdir(self.sessions_dir):
if filename.endswith('.json'):
filepath = os.path.join(self.sessions_dir, filename)
try:
with open(filepath, 'r') as f:
data = json.load(f)
sessions.append({
'session_id': data['session_id'],
'metadata': data['metadata'],
'message_count': len(data.get('messages', [])),
})
except:
pass
return sorted(sessions, key=lambda x: x['metadata'].get('last_active', 0), reverse=True)
def load_last_session(self):
"""Load the most recent session"""
sessions = self.list_sessions()
if sessions:
return self.load_session(sessions[0]['session_id'])
return None
def handle_checkpoint_command(parts, session_manager, files_modified):
"""Handle /checkpoint or /c commands
Returns:
messages: New messages list if conversation was restored, None otherwise
"""
# Default to list if no subcommand
if len(parts) < 2:
parts.append("list")
cmd = parts[1]
# If cmd looks like a commit hash (7-8 hex chars), treat as restore
if len(cmd) >= 7 and len(cmd) <= 8 and all(c in '0123456789abcdef' for c in cmd.lower()):
cmd = "restore"
checkpoint_id = parts[1]
else:
checkpoint_id = None
if cmd == "list" or cmd == "all" or cmd == "--all":
show_all = (cmd == "all" or "--all" in parts)
if show_all:
# Show git graph of all branches
print(f"\n{BOLD}📍 Checkpoint Graph:{RESET}\n")
# Use git log --graph --all to show the tree
# Format: %h = short hash, %d = ref names, %s = subject, %ar = relative date
graph_output = session_manager.checkpoint_manager._git_command(
"--git-dir", session_manager.checkpoint_manager.bare_repo,
"log", "--graph", "--all", "--oneline",
"--format=%h %s (%ar)", "-20"
)
if graph_output and not graph_output.startswith("error"):
# Also get branch info for each commit
branches_output = session_manager.checkpoint_manager._git_command(
"--git-dir", session_manager.checkpoint_manager.bare_repo,
"branch", "-a", "--contains"
)
# Parse and display
for line in graph_output.split('\n'):
if not line.strip():
continue
# Extract commit hash
match = re.search(r'\b([0-9a-f]{7,8})\b', line)
if match:
commit_hash = match.group(1)
# Get branches containing this commit
branch_info = session_manager.checkpoint_manager._git_command(
"--git-dir", session_manager.checkpoint_manager.bare_repo,
"branch", "-a", "--contains", commit_hash
)
# Extract session names from branches
session_names = []
if branch_info and not branch_info.startswith("error"):
for branch_line in branch_info.split('\n'):
branch_line = branch_line.strip().lstrip('* ')
if branch_line.startswith('session_'):
# Shorten session name: session_20260130_103323_f7 -> s:20260130_103323_f7
session_short = 's:' + branch_line[8:] # Remove 'session_' prefix
session_names.append(session_short)
# Highlight commit hash
line = line.replace(commit_hash, f"{CYAN}{commit_hash}{RESET}")
# Add session info if found
if session_names:
# Insert session names after commit hash
session_str = f"{GREEN}[{', '.join(session_names[:2])}]{RESET}"
line = line.replace(commit_hash + f"{RESET}", commit_hash + f"{RESET} {session_str}")
print(f" {line}")
print()
else:
print(f"{DIM}No checkpoints yet{RESET}\n")
print(f"{DIM}Restore: /c <hash>{RESET}")
return None
else:
# Show current session's checkpoints
checkpoints = session_manager.checkpoint_manager.list_checkpoints(show_all=False)
if not checkpoints:
print(f"{DIM}No checkpoints yet{RESET}")
return None
print(f"\n{BOLD}📍 Checkpoints:{RESET}\n")
# Get checkpoint details from git log with timestamp
for commit_hash, message in checkpoints[:10]: # Show first 10 (already newest first from git log)
# Try to get timestamp from git
timestamp_str = session_manager.checkpoint_manager._git_command(
"--git-dir", session_manager.checkpoint_manager.bare_repo,
"log", "-1", "--format=%ar", commit_hash
)
if timestamp_str.startswith("error"):
timestamp_str = ""
# Get modified files
files_str = session_manager.checkpoint_manager._git_command(
"--git-dir", session_manager.checkpoint_manager.bare_repo,
"diff-tree", "--no-commit-id", "--name-only", "-r", commit_hash
)
files = []
if files_str and not files_str.startswith("error"):
files = [f.strip() for f in files_str.split('\n') if f.strip()]
# Format: hash | time ago | message
time_part = f"{DIM}{timestamp_str}{RESET}" if timestamp_str else ""
print(f" {CYAN}{commit_hash}{RESET} {time_part}")
print(f" {DIM}└─{RESET} {message}")
if files:
files_display = ", ".join(files[:3])
if len(files) > 3:
files_display += f" +{len(files)-3} more"
print(f" {DIM} Files: {files_display}{RESET}")
print()
print(f"{DIM}Tip: Use '/c all' or '/ca' to see git graph{RESET}")
print(f"{DIM}Restore: /c <hash>{RESET}")
return None
elif cmd == "restore":
if not checkpoint_id:
print(f"{RED}Usage: /c <checkpoint_id>{RESET}")
return None
print(f"{YELLOW}⚠ This will restore files AND conversation to checkpoint {checkpoint_id}{RESET}")
print(f"{YELLOW}⚠ Future checkpoints will be discarded from history{RESET}")
confirm = input(f"{BOLD}Continue? (y/N): {RESET}").strip().lower()
if confirm != 'y':
print(f"{DIM}Cancelled{RESET}")
return None
success, conversation_snapshot = session_manager.checkpoint_manager.restore_checkpoint(checkpoint_id)
if success:
print(f"{GREEN}✓ Restored files to checkpoint {checkpoint_id}{RESET}")
if conversation_snapshot:
print(f"{GREEN}✓ Restored conversation ({len(conversation_snapshot)} messages){RESET}")
return conversation_snapshot
else:
print(f"{YELLOW}⚠ No conversation snapshot found for this checkpoint{RESET}")
return None
else:
print(f"{RED}✗ Failed to restore checkpoint{RESET}")
return None
else:
print(f"{RED}Unknown command: {cmd}{RESET}")
return None
if __name__ == "__main__":
main()
| 1 | #!/usr/bin/env python3 |
| 2 | """nanocode - minimal claude code alternative""" |
| 3 | import glob as globlib |
| 4 | import hashlib |
| 5 | import json |
| 6 | import os |
| 7 | import random |
| 8 | import re |
| 9 | import readline |
| 10 | import select |
| 11 | import ssl |
| 12 | import subprocess |
| 13 | import sys |
| 14 | import termios |
| 15 | import time |
| 16 | import tty |
| 17 | import urllib.request |
| 18 | import urllib.parse |
| 19 | from datetime import datetime |
| 20 | |
| 21 | OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY") |
| 22 | LOCAL_API_KEY = os.environ.get("LOCAL_API_KEY") |
| 23 | API_URL = ( |
| 24 | "http://127.0.0.1:8990/v1/messages" if LOCAL_API_KEY |
| 25 | else "https://openrouter.ai/api/v1/messages" if OPENROUTER_KEY |
| 26 | else "https://api.anthropic.com/v1/messages" |
| 27 | ) |
| 28 | MODEL = os.environ.get("MODEL", |
| 29 | "anthropic/claude-sonnet-4.5" if LOCAL_API_KEY |
| 30 | else "anthropic/claude-opus-4.5" if OPENROUTER_KEY |
| 31 | else "claude-opus-4-5" |
| 32 | ) |
| 33 | |
| 34 | # ANSI colors |
| 35 | RESET, BOLD, DIM = "\033[0m", "\033[1m", "\033[2m" |
| 36 | BLUE, CYAN, GREEN, YELLOW, RED = "\033[34m", "\033[36m", "\033[32m", "\033[33m", "\033[31m" |
| 37 | stop_flag = False |
| 38 | |
| 39 | def create_opener(): |
| 40 | """Create URL opener with SSL and proxy support""" |
| 41 | proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy") |
| 42 | ssl_ctx = ssl.create_default_context() |
| 43 | ssl_ctx.check_hostname = False |
| 44 | ssl_ctx.verify_mode = ssl.CERT_NONE |
| 45 | |
| 46 | handlers = [urllib.request.HTTPSHandler(context=ssl_ctx)] |
| 47 | if proxy: handlers.insert(0, urllib.request.ProxyHandler({"http": proxy, "https": proxy})) |
| 48 | return urllib.request.build_opener(*handlers) |
| 49 | |
| 50 | def register_tool(name, desc, params): |
| 51 | """Register a tool from extension code""" |
| 52 | def decorator(func): |
| 53 | TOOLS[name] = (desc, params, func) |
| 54 | return func |
| 55 | return decorator |
| 56 | |
| 57 | def search_extension(args): |
| 58 | """Search extensions from gist.kitchain.cn""" |
| 59 | query = args.get("query", "") |
| 60 | if not query: return "error: query required" |
| 61 | try: |
| 62 | # Split query into keywords |
| 63 | keywords = query.lower().split() |
| 64 | gist_info = {} # {gist_path: {"hits": count, "title": str, "desc": str, "topics": []}} |
| 65 | opener = create_opener() |
| 66 | |
| 67 | # Search each keyword as a topic |
| 68 | for keyword in keywords: |
| 69 | url = f"https://gist.kitchain.cn/topics/{urllib.parse.quote(keyword)}" |
| 70 | html = opener.open(urllib.request.Request(url), timeout=10).read().decode() |
| 71 | |
| 72 | # Extract gist URLs and titles |
| 73 | gist_matches = re.findall( |
| 74 | r'<a class="font-bold" href="https://gist\.kitchain\.cn/([^/]+/[a-f0-9]+)">([^<]+)</a>', |
| 75 | html |
| 76 | ) |
| 77 | |
| 78 | for gist_path, title in gist_matches: |
| 79 | if gist_path not in gist_info: |
| 80 | # Extract description and topics for this gist |
| 81 | gist_section = re.search( |
| 82 | rf'{re.escape(gist_path)}.*?' |
| 83 | r'<h6 class="text-xs[^"]*">([^<]+)</h6>(.*?)</div>\s*</div>', |
| 84 | html, re.DOTALL |
| 85 | ) |
| 86 | desc = "" |
| 87 | topics = [] |
| 88 | if gist_section: |
| 89 | desc = gist_section.group(1).strip() |
| 90 | topics_section = gist_section.group(2) |
| 91 | topics = re.findall(r'topics/([^"]+)"[^>]*>([^<]+)<', topics_section) |
| 92 | topics = [t[1] for t in topics] # Extract topic names |
| 93 | |
| 94 | gist_info[gist_path] = { |
| 95 | "hits": 0, |
| 96 | "title": title.strip(), |
| 97 | "desc": desc, |
| 98 | "topics": topics, |
| 99 | "filename": title.strip() |
| 100 | } |
| 101 | gist_info[gist_path]["hits"] += 1 |
| 102 | |
| 103 | if not gist_info: return f"No extensions found: {query}" |
| 104 | |
| 105 | # Sort by hit count (descending) |
| 106 | sorted_gists = sorted(gist_info.items(), key=lambda x: x[1]["hits"], reverse=True)[:10] |
| 107 | |
| 108 | result = f"Found {len(sorted_gists)} extensions:\n\n" |
| 109 | for gist_path, info in sorted_gists: |
| 110 | result += f"• {info['title']}\n" |
| 111 | if info['desc']: |
| 112 | result += f" {info['desc']}\n" |
| 113 | if info['topics']: |
| 114 | result += f" Topics: {', '.join(info['topics'])}\n" |
| 115 | result += f" Matched: {info['hits']} keyword(s)\n\n" |
| 116 | |
| 117 | # Return first gist's load URL |
| 118 | first_gist = sorted_gists[0][0] |
| 119 | first_filename = sorted_gists[0][1]['filename'] |
| 120 | result += f"To load the top result:\nload({{\"url\": \"https://gist.kitchain.cn/{first_gist}/raw/HEAD/{first_filename}\"}})" |
| 121 | return result |
| 122 | except Exception as e: |
| 123 | return f"error: {e}" |
| 124 | |
| 125 | def load(args): |
| 126 | """Load extension from URL""" |
| 127 | url = args.get("url") |
| 128 | if not url: return "error: url required" |
| 129 | try: |
| 130 | opener = create_opener() |
| 131 | code = opener.open(urllib.request.Request(url), timeout=10).read().decode() |
| 132 | exec(code, {"register_tool": register_tool, "TOOLS": TOOLS, "urllib": urllib, "json": json, "re": re, "subprocess": subprocess}) |
| 133 | new = [k for k in TOOLS if k not in ["read","write","edit","glob","grep","bash","web_search","search_extension","load"]] |
| 134 | return f"Loaded. New tools: {', '.join(new)}" |
| 135 | except Exception as e: |
| 136 | return f"error: {e}" |
| 137 | |
| 138 | # --- Tools --- |
| 139 | def read(args): |
| 140 | lines = open(args["path"]).readlines() |
| 141 | offset, limit = args.get("offset", 0), args.get("limit", len(lines)) |
| 142 | return "".join(f"{offset+i+1:4}| {l}" for i, l in enumerate(lines[offset:offset+limit])) |
| 143 | |
| 144 | def write(args): |
| 145 | filepath = args["path"] |
| 146 | content = args["content"] |
| 147 | print(f"{DIM}[LOG] write: {filepath} ({len(content)} bytes){RESET}", flush=True) |
| 148 | open(filepath, "w").write(content) |
| 149 | print(f"{DIM}[LOG] write completed: {filepath}{RESET}", flush=True) |
| 150 | return "ok" |
| 151 | |
| 152 | def edit(args): |
| 153 | filepath = args["path"] |
| 154 | print(f"{DIM}[LOG] edit: {filepath}{RESET}", flush=True) |
| 155 | text = open(filepath).read() |
| 156 | print(f"{DIM}[LOG] edit read: {len(text)} bytes{RESET}", flush=True) |
| 157 | old, new = args["old"], args["new"] |
| 158 | if old not in text: return "error: old_string not found" |
| 159 | count = text.count(old) |
| 160 | if not args.get("all") and count > 1: |
| 161 | return f"error: old_string appears {count} times (use all=true)" |
| 162 | result = text.replace(old, new) if args.get("all") else text.replace(old, new, 1) |
| 163 | print(f"{DIM}[LOG] edit writing: {len(result)} bytes{RESET}", flush=True) |
| 164 | open(filepath, "w").write(result) |
| 165 | print(f"{DIM}[LOG] edit completed: {filepath}{RESET}", flush=True) |
| 166 | return "ok" |
| 167 | |
| 168 | def glob(args): |
| 169 | pattern = (args.get("path", ".") + "/" + args["pat"]).replace("//", "/") |
| 170 | files = sorted(globlib.glob(pattern, recursive=True), |
| 171 | key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0, reverse=True) |
| 172 | return "\n".join(files) or "none" |
| 173 | |
| 174 | def grep(args): |
| 175 | pattern, hits = re.compile(args["pat"]), [] |
| 176 | for fp in globlib.glob(args.get("path", ".") + "/**", recursive=True): |
| 177 | try: |
| 178 | for n, l in enumerate(open(fp), 1): |
| 179 | if pattern.search(l): hits.append(f"{fp}:{n}:{l.rstrip()}") |
| 180 | except: pass |
| 181 | return "\n".join(hits[:50]) or "none" |
| 182 | |
| 183 | def bash(args): |
| 184 | global stop_flag |
| 185 | proc = subprocess.Popen(args["cmd"], shell=True, stdout=subprocess.PIPE, |
| 186 | stderr=subprocess.STDOUT, text=True) |
| 187 | lines = [] |
| 188 | old_settings = termios.tcgetattr(sys.stdin) |
| 189 | try: |
| 190 | tty.setcbreak(sys.stdin.fileno()) |
| 191 | if proc.stdout: |
| 192 | import fcntl |
| 193 | fd = proc.stdout.fileno() |
| 194 | fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK) |
| 195 | |
| 196 | while True: |
| 197 | # Check ESC key |
| 198 | if select.select([sys.stdin], [], [], 0)[0]: |
| 199 | if sys.stdin.read(1) == '\x1b': |
| 200 | stop_flag = True |
| 201 | proc.kill() |
| 202 | lines.append("\n(stopped)") |
| 203 | print(f"\n{YELLOW}⏸ Stopped{RESET}") |
| 204 | break |
| 205 | |
| 206 | # Read output |
| 207 | if select.select([proc.stdout], [], [], 0.1)[0]: |
| 208 | line = proc.stdout.readline() |
| 209 | if line: |
| 210 | print(f" {DIM}│ {line.rstrip()}{RESET}", flush=True) |
| 211 | lines.append(line) |
| 212 | |
| 213 | # Check if done |
| 214 | if proc.poll() is not None: |
| 215 | remaining = proc.stdout.read() |
| 216 | if remaining: |
| 217 | for line in remaining.split('\n'): |
| 218 | if line: |
| 219 | print(f" {DIM}│ {line.rstrip()}{RESET}", flush=True) |
| 220 | lines.append(line + '\n') |
| 221 | break |
| 222 | |
| 223 | if not stop_flag: |
| 224 | proc.wait(timeout=30) |
| 225 | except subprocess.TimeoutExpired: |
| 226 | proc.kill() |
| 227 | lines.append("\n(timeout)") |
| 228 | finally: |
| 229 | termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) |
| 230 | |
| 231 | return "".join(lines).strip() or "(empty)" |
| 232 | |
| 233 | def web_search(args): |
| 234 | """Search web using DuckDuckGo""" |
| 235 | query, max_results = args["query"], args.get("max_results", 5) |
| 236 | try: |
| 237 | url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote_plus(query)}" |
| 238 | headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"} |
| 239 | opener = create_opener() |
| 240 | html = opener.open(urllib.request.Request(url, headers=headers), timeout=30).read().decode() |
| 241 | |
| 242 | # Extract titles and URLs |
| 243 | links = re.findall(r'class="result__a"[^>]+href="([^"]+)"[^>]*>([^<]+)<', html) |
| 244 | # Extract snippets |
| 245 | snippets = re.findall(r'class="result__snippet"[^>]*>([^<]*)<', html) |
| 246 | if not links: return "No results found" |
| 247 | |
| 248 | results = [] |
| 249 | for i, ((link, title), snippet) in enumerate(zip(links[:max_results], snippets[:max_results] + [""] * max_results), 1): |
| 250 | results.append(f"{i}. {title.strip()}\n URL: {link}\n {snippet.strip()}\n") |
| 251 | return "\n".join(results) |
| 252 | except Exception as e: |
| 253 | return f"error: {e}" |
| 254 | |
| 255 | |
| 256 | TOOLS = { |
| 257 | "read": ("Read file with line numbers", {"path": "string", "offset": "number?", "limit": "number?"}, read), |
| 258 | "write": ("Write content to file", {"path": "string", "content": "string"}, write), |
| 259 | "edit": ("Replace old with new in file", {"path": "string", "old": "string", "new": "string", "all": "boolean?"}, edit), |
| 260 | "glob": ("Find files by pattern", {"pat": "string", "path": "string?"}, glob), |
| 261 | "grep": ("Search files for regex", {"pat": "string", "path": "string?"}, grep), |
| 262 | "bash": ("Run shell command", {"cmd": "string"}, bash), |
| 263 | "web_search": ("Search the web using DuckDuckGo", {"query": "string", "max_results": "number?"}, web_search), |
| 264 | "search_extension": ("Search for extensions to add new capabilities (GitHub docs, web scraping, APIs, etc)", {"query": "string"}, search_extension), |
| 265 | "load": ("Load extension from URL to add new tools", {"url": "string"}, load), |
| 266 | } |
| 267 | |
| 268 | def run_tool(name, args): |
| 269 | try: return TOOLS[name][2](args) |
| 270 | except Exception as e: return f"error: {e}" |
| 271 | |
| 272 | def make_schema(): |
| 273 | result = [] |
| 274 | for name, (desc, params, _) in TOOLS.items(): |
| 275 | props, req = {}, [] |
| 276 | for pname, ptype in params.items(): |
| 277 | opt = ptype.endswith("?") |
| 278 | props[pname] = {"type": "integer" if ptype.rstrip("?") == "number" else ptype.rstrip("?")} |
| 279 | if not opt: req.append(pname) |
| 280 | result.append({"name": name, "description": desc, |
| 281 | "input_schema": {"type": "object", "properties": props, "required": req}}) |
| 282 | return result |
| 283 | |
| 284 | def call_api(messages, system_prompt, stream=True, enable_thinking=True, use_tools=True): |
| 285 | headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01"} |
| 286 | if LOCAL_API_KEY: headers["Authorization"] = f"Bearer {LOCAL_API_KEY}" |
| 287 | elif OPENROUTER_KEY: headers["Authorization"] = f"Bearer {OPENROUTER_KEY}" |
| 288 | else: headers["x-api-key"] = os.environ.get("ANTHROPIC_API_KEY", "") |
| 289 | |
| 290 | data = {"model": MODEL, "max_tokens": 8192, "system": system_prompt, |
| 291 | "messages": messages, "stream": stream} |
| 292 | |
| 293 | if use_tools: |
| 294 | data["tools"] = make_schema() |
| 295 | |
| 296 | if enable_thinking and os.environ.get("THINKING"): |
| 297 | data["thinking"] = {"type": "enabled", "budget_tokens": int(os.environ.get("THINKING_BUDGET", "10000"))} |
| 298 | |
| 299 | req = urllib.request.Request(API_URL, json.dumps(data).encode(), headers, method="POST") |
| 300 | return create_opener().open(req) |
| 301 | |
| 302 | def summarize_changes(user_input, files_modified, checkpoint_manager, checkpoint_id): |
| 303 | """Use LLM to summarize the changes made in this turn |
| 304 | |
| 305 | Args: |
| 306 | user_input: User's request |
| 307 | files_modified: Set of modified file paths |
| 308 | checkpoint_manager: CheckpointManager instance |
| 309 | checkpoint_id: Checkpoint hash to get diff from |
| 310 | |
| 311 | Returns: |
| 312 | str: One-line summary of changes |
| 313 | """ |
| 314 | if not files_modified or not checkpoint_id: |
| 315 | return user_input[:50] |
| 316 | |
| 317 | try: |
| 318 | # Get diff from git |
| 319 | diff_output = checkpoint_manager._git_command( |
| 320 | "--git-dir", checkpoint_manager.bare_repo, |
| 321 | "show", "--format=", checkpoint_id |
| 322 | ) |
| 323 | |
| 324 | # Check if diff is empty or error - no actual changes |
| 325 | if not diff_output or diff_output.startswith("error") or len(diff_output.strip()) == 0: |
| 326 | # No diff available, just use user input |
| 327 | return user_input[:50] |
| 328 | |
| 329 | # Limit diff size to avoid token overflow (max ~3000 chars) |
| 330 | if len(diff_output) > 3000: |
| 331 | diff_output = diff_output[:3000] + "\n... (truncated)" |
| 332 | |
| 333 | summary_prompt = f"""Based on the actual code changes (diff), generate a brief Chinese summary (max 30 Chinese characters). |
| 334 | |
| 335 | IMPORTANT: Must be based on the actual code changes, not the user's description. |
| 336 | |
| 337 | Code changes (diff): |
| 338 | {diff_output} |
| 339 | |
| 340 | User description (for reference only): {user_input} |
| 341 | |
| 342 | Requirements: |
| 343 | 1. Describe what code/functionality was actually modified |
| 344 | 2. Reply in Chinese only, no explanation |
| 345 | 3. No quotes |
| 346 | 4. Max 30 Chinese characters |
| 347 | |
| 348 | Good examples: |
| 349 | - 在 auth.py 添加 JWT 验证 |
| 350 | - 修复 parser.py 空指针异常 |
| 351 | - 重构 database.py 连接池 |
| 352 | - 更新 README 添加安装说明 |
| 353 | """ |
| 354 | |
| 355 | messages = [{"role": "user", "content": summary_prompt}] |
| 356 | response = call_api(messages, "You are a code change analyzer, skilled at extracting key information from diffs. Reply in Chinese.", |
| 357 | stream=False, enable_thinking=False, use_tools=False) |
| 358 | |
| 359 | # Parse non-streaming response |
| 360 | data = json.loads(response.read().decode()) |
| 361 | blocks = data.get("content", []) |
| 362 | |
| 363 | for block in blocks: |
| 364 | if block.get("type") == "text": |
| 365 | summary = block.get("text", "").strip() |
| 366 | |
| 367 | # Remove thinking tags if present |
| 368 | if "<thinking>" in summary: |
| 369 | # Extract content after </thinking> |
| 370 | parts = summary.split("</thinking>") |
| 371 | if len(parts) > 1: |
| 372 | summary = parts[-1].strip() |
| 373 | |
| 374 | # Clean up and limit length |
| 375 | summary = summary.replace('"', '').replace("'", "") |
| 376 | if summary and len(summary) <= 80: |
| 377 | return summary |
| 378 | |
| 379 | # Fallback to user input |
| 380 | return user_input[:50] |
| 381 | except Exception as e: |
| 382 | # On error, fallback to user input |
| 383 | return user_input[:50] |
| 384 | |
| 385 | def process_stream(response): |
| 386 | """简化的流式处理,支持ESC中断""" |
| 387 | global stop_flag |
| 388 | blocks, current, text_buf, json_buf, think_buf = [], None, "", "", "" |
| 389 | |
| 390 | # Save terminal settings |
| 391 | old_settings = termios.tcgetattr(sys.stdin) |
| 392 | try: |
| 393 | tty.setcbreak(sys.stdin.fileno()) |
| 394 | |
| 395 | for line in response: |
| 396 | if select.select([sys.stdin], [], [], 0)[0]: |
| 397 | ch = sys.stdin.read(1) |
| 398 | if ch == '\x1b': # ESC key |
| 399 | stop_flag = True |
| 400 | print(f"\n{YELLOW}⏸ Stopped{RESET}") |
| 401 | break |
| 402 | |
| 403 | line = line.decode("utf-8").strip() |
| 404 | if not line.startswith("data: "): continue |
| 405 | if line == "data: [DONE]": continue |
| 406 | |
| 407 | try: |
| 408 | data = json.loads(line[6:]) |
| 409 | etype = data.get("type") |
| 410 | |
| 411 | if etype == "content_block_start": |
| 412 | block = data.get("content_block", {}) |
| 413 | current = {"type": block.get("type"), "id": block.get("id")} |
| 414 | if current["type"] == "text": |
| 415 | text_buf = "" |
| 416 | print(f"\n{CYAN}⏺{RESET} ", end="", flush=True) |
| 417 | elif current["type"] == "thinking": |
| 418 | think_buf = "" |
| 419 | print(f"\n{YELLOW}💭{RESET} {DIM}", end="", flush=True) |
| 420 | elif current["type"] == "tool_use": |
| 421 | current["name"] = block.get("name") |
| 422 | json_buf = "" |
| 423 | |
| 424 | elif etype == "content_block_delta": |
| 425 | delta = data.get("delta", {}) |
| 426 | dtype = delta.get("type") |
| 427 | if dtype == "text_delta": |
| 428 | text = delta.get("text", "") |
| 429 | text_buf += text |
| 430 | print(text, end="", flush=True) |
| 431 | elif dtype == "thinking_delta": |
| 432 | text = delta.get("thinking", "") |
| 433 | think_buf += text |
| 434 | print(text, end="", flush=True) |
| 435 | elif dtype == "input_json_delta" and current: |
| 436 | json_buf += delta.get("partial_json", "") |
| 437 | |
| 438 | elif etype == "content_block_stop" and current: |
| 439 | if current["type"] == "text": |
| 440 | current["text"] = text_buf |
| 441 | print() |
| 442 | elif current["type"] == "thinking": |
| 443 | print(RESET) |
| 444 | elif current["type"] == "tool_use": |
| 445 | try: current["input"] = json.loads(json_buf) |
| 446 | except: current["input"] = {} |
| 447 | blocks.append(current) |
| 448 | current = None |
| 449 | except: pass |
| 450 | finally: |
| 451 | termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) |
| 452 | |
| 453 | return blocks |
| 454 | |
| 455 | def is_file_in_project(filepath, project_path): |
| 456 | """Check if file is within project directory""" |
| 457 | try: |
| 458 | abs_file = os.path.abspath(filepath) |
| 459 | abs_project = os.path.abspath(project_path) |
| 460 | # Check if file is under project directory |
| 461 | return abs_file.startswith(abs_project + os.sep) or abs_file == abs_project |
| 462 | except: |
| 463 | return False |
| 464 | |
| 465 | def read_multiline_input(): |
| 466 | """Read multiline input. Enter to submit, Alt+Enter for newline.""" |
| 467 | lines = [] |
| 468 | current = "" |
| 469 | cursor_pos = 0 # Cursor position in current line |
| 470 | |
| 471 | # Enable bracketed paste mode |
| 472 | print("\033[?2004h", end="", flush=True) |
| 473 | |
| 474 | old_settings = termios.tcgetattr(sys.stdin) |
| 475 | try: |
| 476 | tty.setcbreak(sys.stdin.fileno()) |
| 477 | print(f"{BOLD}{BLUE}❯{RESET} ", end="", flush=True) |
| 478 | |
| 479 | while True: |
| 480 | ch = sys.stdin.read(1) |
| 481 | |
| 482 | if ch == '\x03': # Ctrl+C - clear input |
| 483 | lines.clear() |
| 484 | current = "" |
| 485 | cursor_pos = 0 |
| 486 | print("\r\033[K", end="", flush=True) |
| 487 | print(f"{BOLD}{BLUE}❯{RESET} ", end="", flush=True) |
| 488 | continue |
| 489 | |
| 490 | if ch == '\x04': # Ctrl+D |
| 491 | raise EOFError |
| 492 | |
| 493 | if ch == '\x1b': # Escape sequence |
| 494 | next_ch = sys.stdin.read(1) |
| 495 | if next_ch in ('\r', '\n'): # Alt+Enter |
| 496 | lines.append(current) |
| 497 | current = "" |
| 498 | cursor_pos = 0 |
| 499 | print(f"\n{BOLD}{BLUE}│{RESET} ", end="", flush=True) |
| 500 | elif next_ch == '[': # Escape sequence |
| 501 | seq = sys.stdin.read(1) |
| 502 | if seq == 'C': # Right arrow |
| 503 | if cursor_pos < len(current): |
| 504 | cursor_pos += 1 |
| 505 | print("\033[C", end="", flush=True) |
| 506 | elif seq == 'D': # Left arrow |
| 507 | if cursor_pos > 0: |
| 508 | cursor_pos -= 1 |
| 509 | print("\033[D", end="", flush=True) |
| 510 | elif seq == '2': # Bracketed paste start: ESC[200~ |
| 511 | rest = sys.stdin.read(3) # Read "00~" |
| 512 | if rest == '00~': |
| 513 | # Read pasted content until ESC[201~ |
| 514 | paste_buf = "" |
| 515 | while True: |
| 516 | c = sys.stdin.read(1) |
| 517 | if c == '\x1b': |
| 518 | # Check for [201~ |
| 519 | peek = sys.stdin.read(5) |
| 520 | if peek == '[201~': |
| 521 | break |
| 522 | else: |
| 523 | paste_buf += c + peek |
| 524 | else: |
| 525 | paste_buf += c |
| 526 | |
| 527 | # Process pasted content |
| 528 | paste_lines = paste_buf.split('\n') |
| 529 | |
| 530 | if len(paste_lines) == 1: |
| 531 | # Single line paste |
| 532 | current = current[:cursor_pos] + paste_lines[0] + current[cursor_pos:] |
| 533 | cursor_pos += len(paste_lines[0]) |
| 534 | prefix = f"{BOLD}{BLUE}{'│' if lines else '❯'}{RESET} " |
| 535 | print(f"\r\033[K{prefix}{current}", end="", flush=True) |
| 536 | else: |
| 537 | # Multi-line paste |
| 538 | # First line appends to current |
| 539 | first_line = current[:cursor_pos] + paste_lines[0] |
| 540 | print(paste_lines[0], end="", flush=True) |
| 541 | if first_line: |
| 542 | lines.append(first_line) |
| 543 | |
| 544 | # Middle lines |
| 545 | for line in paste_lines[1:-1]: |
| 546 | print(f"\n{BOLD}{BLUE}│{RESET} {line}", end="", flush=True) |
| 547 | lines.append(line) |
| 548 | |
| 549 | # Last line becomes new current |
| 550 | current = paste_lines[-1] |
| 551 | cursor_pos = len(current) |
| 552 | print(f"\n{BOLD}{BLUE}│{RESET} {current}", end="", flush=True) |
| 553 | continue |
| 554 | |
| 555 | if ch in ('\r', '\n'): # Enter - submit |
| 556 | if current: |
| 557 | lines.append(current) |
| 558 | print() |
| 559 | break |
| 560 | |
| 561 | if ch in ('\x7f', '\x08'): # Backspace |
| 562 | if cursor_pos > 0: |
| 563 | # Delete character before cursor |
| 564 | current = current[:cursor_pos-1] + current[cursor_pos:] |
| 565 | cursor_pos -= 1 |
| 566 | # Redraw current line |
| 567 | prefix = f"{BOLD}{BLUE}{'│' if lines else '❯'}{RESET} " |
| 568 | print(f"\r\033[K{prefix}{current}", end="", flush=True) |
| 569 | # Move cursor back to position |
| 570 | if cursor_pos < len(current): |
| 571 | print(f"\033[{len(current) - cursor_pos}D", end="", flush=True) |
| 572 | elif lines: |
| 573 | # Merge with previous line |
| 574 | prev_line = lines.pop() |
| 575 | cursor_pos = len(prev_line) # Cursor at end of previous line |
| 576 | current = prev_line + current |
| 577 | # Move up and redraw |
| 578 | print("\033[A\033[K", end="", flush=True) |
| 579 | prefix = f"{BOLD}{BLUE}{'│' if lines else '❯'}{RESET} " |
| 580 | print(f"\r{prefix}{current}", end="", flush=True) |
| 581 | if cursor_pos < len(current): |
| 582 | print(f"\033[{len(current) - cursor_pos}D", end="", flush=True) |
| 583 | continue |
| 584 | |
| 585 | if ch.isprintable() or ch == '\t': |
| 586 | # Insert character at cursor position |
| 587 | current = current[:cursor_pos] + ch + current[cursor_pos:] |
| 588 | cursor_pos += 1 |
| 589 | # Redraw from cursor position |
| 590 | print(f"{ch}{current[cursor_pos:]}", end="", flush=True) |
| 591 | # Move cursor back if needed |
| 592 | if cursor_pos < len(current): |
| 593 | print(f"\033[{len(current) - cursor_pos}D", end="", flush=True) |
| 594 | |
| 595 | finally: |
| 596 | # Disable bracketed paste mode |
| 597 | print("\033[?2004l", end="", flush=True) |
| 598 | termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) |
| 599 | |
| 600 | return "\n".join(lines).strip() |
| 601 | |
| 602 | def main(): |
| 603 | global stop_flag |
| 604 | # Parse command line arguments |
| 605 | continue_session = "-c" in sys.argv or "--continue" in sys.argv |
| 606 | list_sessions = "-l" in sys.argv or "--list" in sys.argv |
| 607 | |
| 608 | # Disable Ctrl+C signal |
| 609 | old_settings = termios.tcgetattr(sys.stdin) |
| 610 | new_settings = termios.tcgetattr(sys.stdin) |
| 611 | new_settings[3] = new_settings[3] & ~termios.ISIG # Disable signal generation |
| 612 | termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings) |
| 613 | |
| 614 | try: |
| 615 | proxy = os.environ.get("http_proxy") or os.environ.get("https_proxy") |
| 616 | proxy_info = f" | {DIM}🌐 {proxy}{RESET}" if proxy else "" |
| 617 | thinking_info = f" | {YELLOW}💭{RESET}" if os.environ.get("THINKING") else "" |
| 618 | |
| 619 | if list_sessions: |
| 620 | session_mode = f" | {YELLOW}Select{RESET}" |
| 621 | elif continue_session: |
| 622 | session_mode = f" | {GREEN}Continue{RESET}" |
| 623 | else: |
| 624 | session_mode = f" | {CYAN}New{RESET}" |
| 625 | |
| 626 | print(f"{BOLD}nanocode{RESET} | {DIM}{MODEL} | {os.getcwd()}{proxy_info}{thinking_info}{session_mode}{RESET}") |
| 627 | print(f"{DIM}Shortcuts: Enter=submit | Alt+Enter=newline | Ctrl+C=clear input | Ctrl+D=exit | ESC=stop{RESET}") |
| 628 | print(f"{DIM}Commands: /c [all|<id>] | /ca | /clear{RESET}") |
| 629 | print(f"{DIM}Usage: nanocode (new) | nanocode -c (continue) | nanocode -l (select){RESET}\n") |
| 630 | |
| 631 | selected_session_id = None |
| 632 | if list_sessions: |
| 633 | selected_session_id = select_session_interactive() |
| 634 | if not selected_session_id: |
| 635 | print(f"{DIM}Exiting...{RESET}") |
| 636 | return |
| 637 | |
| 638 | run_main_loop(continue_session, selected_session_id) |
| 639 | finally: |
| 640 | termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) |
| 641 | |
| 642 | def select_session_interactive(): |
| 643 | """Display sessions and let user select one |
| 644 | |
| 645 | Returns: |
| 646 | session_id: Selected session ID, or None if cancelled |
| 647 | """ |
| 648 | session_manager = SessionManager(os.getcwd()) |
| 649 | sessions = session_manager.list_sessions()[:10] # Limit to 10 most recent |
| 650 | |
| 651 | if not sessions: |
| 652 | print(f"{YELLOW}⚠ No previous sessions found{RESET}") |
| 653 | print(f"{DIM}Starting new session...{RESET}\n") |
| 654 | return None |
| 655 | |
| 656 | print(f"{BOLD}📂 Recent Sessions:{RESET}\n") |
| 657 | |
| 658 | for i, sess_info in enumerate(sessions, 1): |
| 659 | created = datetime.fromtimestamp(sess_info['metadata']['created_at']).strftime('%Y-%m-%d %H:%M') |
| 660 | last_active = datetime.fromtimestamp(sess_info['metadata']['last_active']).strftime('%Y-%m-%d %H:%M') |
| 661 | desc = sess_info['metadata'].get('description', '(no description)') |
| 662 | |
| 663 | # Git info |
| 664 | git_commit = sess_info['metadata'].get('git_commit') |
| 665 | git_branch = sess_info['metadata'].get('git_branch') |
| 666 | git_dirty = sess_info['metadata'].get('git_dirty', False) |
| 667 | |
| 668 | print(f"{CYAN}{i}.{RESET} {BOLD}{sess_info['session_id']}{RESET}") |
| 669 | print(f" {desc}") |
| 670 | |
| 671 | git_info = "" |
| 672 | if git_commit and git_branch: |
| 673 | dirty_mark = f"{YELLOW}*{RESET}" if git_dirty else "" |
| 674 | git_info = f" | Git: {git_branch}@{git_commit}{dirty_mark}" |
| 675 | |
| 676 | print(f" Created: {created} | Last: {last_active} | {sess_info['message_count']} messages{git_info}\n") |
| 677 | |
| 678 | print(f"{DIM}Enter session number (1-{len(sessions)}), or press Enter for new session:{RESET}") |
| 679 | |
| 680 | try: |
| 681 | choice = input(f"{BOLD}{BLUE}❯{RESET} ").strip() |
| 682 | |
| 683 | if not choice: |
| 684 | # Empty input = new session |
| 685 | return None |
| 686 | |
| 687 | try: |
| 688 | idx = int(choice) - 1 |
| 689 | if 0 <= idx < len(sessions): |
| 690 | return sessions[idx]['session_id'] |
| 691 | else: |
| 692 | print(f"{RED}✗ Invalid number{RESET}") |
| 693 | return None |
| 694 | except ValueError: |
| 695 | print(f"{RED}✗ Invalid input{RESET}") |
| 696 | return None |
| 697 | except (EOFError, KeyboardInterrupt): |
| 698 | return None |
| 699 | |
| 700 | |
| 701 | def run_main_loop(continue_session=False, selected_session_id=None): |
| 702 | # Initialize session manager |
| 703 | session_manager = SessionManager(os.getcwd()) |
| 704 | |
| 705 | # Load or create session based on parameters |
| 706 | if selected_session_id: |
| 707 | # Load specific session selected by user |
| 708 | session = session_manager.load_session(selected_session_id) |
| 709 | if session: |
| 710 | git_info = "" |
| 711 | git_commit = session.metadata.get('git_commit') |
| 712 | git_branch = session.metadata.get('git_branch') |
| 713 | if git_commit and git_branch: |
| 714 | git_dirty = session.metadata.get('git_dirty', False) |
| 715 | dirty_mark = f"{YELLOW}*{RESET}" if git_dirty else "" |
| 716 | git_info = f" | Git: {git_branch}@{git_commit}{dirty_mark}" |
| 717 | |
| 718 | print(f"{GREEN}✓ Loaded session: {session.session_id}{RESET}") |
| 719 | print(f"{DIM} └─ {len(session.messages)} messages{git_info}{RESET}") |
| 720 | |
| 721 | # Check for conflicts |
| 722 | conflicts = session.detect_conflicts() |
| 723 | if conflicts: |
| 724 | print(f"\n{YELLOW}⚠ File conflicts detected:{RESET}") |
| 725 | for filepath in conflicts[:5]: |
| 726 | print(f" - {filepath}") |
| 727 | if len(conflicts) > 5: |
| 728 | print(f" ... and {len(conflicts)-5} more") |
| 729 | print(f"\n{DIM}These files have been modified outside this session.{RESET}") |
| 730 | confirm = input(f"{BOLD}Continue anyway? (y/N/u=update): {RESET}").strip().lower() |
| 731 | |
| 732 | if confirm == 'u': |
| 733 | session.update_file_states() |
| 734 | session_manager.save_session() |
| 735 | print(f"{GREEN}✓ Updated file states{RESET}\n") |
| 736 | elif confirm != 'y': |
| 737 | print(f"{DIM}Creating new session instead...{RESET}\n") |
| 738 | session_manager.create_session() |
| 739 | else: |
| 740 | print() |
| 741 | else: |
| 742 | print() |
| 743 | else: |
| 744 | print(f"{RED}✗ Failed to load session{RESET}") |
| 745 | print(f"{GREEN}✓ Creating new session instead{RESET}\n") |
| 746 | session_manager.create_session() |
| 747 | elif continue_session: |
| 748 | # Continue last session |
| 749 | last_session = session_manager.load_last_session() |
| 750 | if last_session: |
| 751 | git_info = "" |
| 752 | git_commit = last_session.metadata.get('git_commit') |
| 753 | git_branch = last_session.metadata.get('git_branch') |
| 754 | if git_commit and git_branch: |
| 755 | git_dirty = last_session.metadata.get('git_dirty', False) |
| 756 | dirty_mark = f"{YELLOW}*{RESET}" if git_dirty else "" |
| 757 | git_info = f" | Git: {git_branch}@{git_commit}{dirty_mark}" |
| 758 | |
| 759 | print(f"{GREEN}✓ Continued session: {last_session.session_id}{RESET}") |
| 760 | print(f"{DIM} └─ {len(last_session.messages)} messages{git_info}{RESET}") |
| 761 | |
| 762 | # Check for conflicts |
| 763 | conflicts = last_session.detect_conflicts() |
| 764 | if conflicts: |
| 765 | print(f"\n{YELLOW}⚠ File conflicts detected:{RESET}") |
| 766 | for filepath in conflicts[:5]: |
| 767 | print(f" - {filepath}") |
| 768 | if len(conflicts) > 5: |
| 769 | print(f" ... and {len(conflicts)-5} more") |
| 770 | print(f"\n{DIM}These files have been modified outside this session.{RESET}") |
| 771 | confirm = input(f"{BOLD}Continue anyway? (y/N/u=update): {RESET}").strip().lower() |
| 772 | |
| 773 | if confirm == 'u': |
| 774 | last_session.update_file_states() |
| 775 | session_manager.save_session() |
| 776 | print(f"{GREEN}✓ Updated file states{RESET}\n") |
| 777 | elif confirm != 'y': |
| 778 | print(f"{DIM}Creating new session instead...{RESET}\n") |
| 779 | session_manager.create_session() |
| 780 | else: |
| 781 | print() |
| 782 | else: |
| 783 | print() |
| 784 | else: |
| 785 | # No previous session, create new one |
| 786 | session_manager.create_session() |
| 787 | print(f"{YELLOW}⚠ No previous session found{RESET}") |
| 788 | print(f"{GREEN}✓ Created new session: {session_manager.current_session.session_id}{RESET}\n") |
| 789 | else: |
| 790 | # Always create new session by default |
| 791 | # Try to detect parent from last session's latest checkpoint |
| 792 | parent_checkpoint = None |
| 793 | parent_session = None |
| 794 | |
| 795 | last_session = session_manager.load_last_session() |
| 796 | if last_session: |
| 797 | # Get the latest checkpoint from last session |
| 798 | checkpoints = session_manager.checkpoint_manager.list_checkpoints(show_all=False) |
| 799 | if checkpoints: |
| 800 | parent_checkpoint = checkpoints[0][0] # Latest checkpoint hash |
| 801 | parent_session = last_session.session_id |
| 802 | |
| 803 | session_manager.create_session( |
| 804 | parent_checkpoint=parent_checkpoint, |
| 805 | parent_session=parent_session |
| 806 | ) |
| 807 | |
| 808 | git_info = "" |
| 809 | git_commit = session_manager.current_session.metadata.get('git_commit') |
| 810 | git_branch = session_manager.current_session.metadata.get('git_branch') |
| 811 | if git_commit and git_branch: |
| 812 | git_dirty = session_manager.current_session.metadata.get('git_dirty', False) |
| 813 | dirty_mark = f"{YELLOW}*{RESET}" if git_dirty else "" |
| 814 | git_info = f" | Git: {git_branch}@{git_commit}{dirty_mark}" |
| 815 | |
| 816 | if parent_checkpoint: |
| 817 | print(f"{GREEN}✓ Created new session: {session_manager.current_session.session_id}{RESET}") |
| 818 | print(f"{DIM} └─ Branched from {parent_session[:8]}... @ {parent_checkpoint}{git_info}{RESET}\n") |
| 819 | else: |
| 820 | print(f"{GREEN}✓ Created new session: {session_manager.current_session.session_id}{RESET}") |
| 821 | if git_info: |
| 822 | print(f"{DIM} └─{git_info}{RESET}\n") |
| 823 | else: |
| 824 | print() |
| 825 | |
| 826 | files_modified = set() |
| 827 | auto_checkpoint = True |
| 828 | |
| 829 | current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| 830 | system_prompt = f"""Concise coding assistant. cwd: {os.getcwd()} Current time: {current_time} |
| 831 | IMPORTANT: When you don't have a tool for the task, ALWAYS try search_extension first before saying you can't do it. |
| 832 | Examples: |
| 833 | - User asks about GitHub repo → search_extension({{"query": "github documentation"}}) |
| 834 | - User needs web data → search_extension({{"query": "web scraping"}}) |
| 835 | - User needs API → search_extension({{"query": "api client"}})""" |
| 836 | |
| 837 | while True: |
| 838 | try: |
| 839 | print(f"{DIM}{'─'*80}{RESET}") |
| 840 | user_input = read_multiline_input() |
| 841 | print(f"{DIM}{'─'*80}{RESET}") |
| 842 | |
| 843 | if not user_input: continue |
| 844 | if user_input in ("/q", "exit"): |
| 845 | session_manager.save_session() |
| 846 | break |
| 847 | |
| 848 | # Handle /clear command first (before /c to avoid conflict) |
| 849 | if user_input == "/clear": |
| 850 | # Save current session |
| 851 | session_manager.save_session() |
| 852 | |
| 853 | # Get latest checkpoint from current session (if any) |
| 854 | checkpoints = session_manager.checkpoint_manager.list_checkpoints(show_all=False) |
| 855 | parent_checkpoint = checkpoints[0][0] if checkpoints else None |
| 856 | parent_session = session_manager.current_session.session_id |
| 857 | |
| 858 | # Create new session branched from current |
| 859 | session_manager.create_session( |
| 860 | parent_checkpoint=parent_checkpoint, |
| 861 | parent_session=parent_session |
| 862 | ) |
| 863 | |
| 864 | # Reset state |
| 865 | files_modified.clear() |
| 866 | |
| 867 | print(f"{GREEN}✓ Started new session: {session_manager.current_session.session_id}{RESET}") |
| 868 | if parent_checkpoint: |
| 869 | print(f"{DIM} └─ Branched from {parent_session[:8]}... @ {parent_checkpoint}{RESET}") |
| 870 | continue |
| 871 | |
| 872 | # Handle checkpoint commands |
| 873 | if user_input.startswith("/checkpoint") or user_input.startswith("/c") or user_input == "/ca": |
| 874 | parts = user_input.split() |
| 875 | |
| 876 | # /ca is shortcut for /c all |
| 877 | if parts[0] == "/ca": |
| 878 | parts = ["/c", "all"] |
| 879 | |
| 880 | # /c without args defaults to list |
| 881 | if len(parts) == 1 and parts[0] in ["/c", "/checkpoint"]: |
| 882 | parts.append("list") |
| 883 | |
| 884 | restored_messages = handle_checkpoint_command(parts, session_manager, files_modified) |
| 885 | if restored_messages is not None: |
| 886 | # Restore conversation by replacing session messages |
| 887 | session_manager.current_session.messages = restored_messages |
| 888 | session_manager.save_session() |
| 889 | continue |
| 890 | |
| 891 | # Add user message to current session |
| 892 | session_manager.current_session.messages.append({"role": "user", "content": user_input}) |
| 893 | |
| 894 | # Reset stop flag for new turn |
| 895 | stop_flag = False |
| 896 | |
| 897 | # Track files modified in this turn |
| 898 | files_modified_this_turn = set() |
| 899 | |
| 900 | while True: |
| 901 | response = call_api(session_manager.current_session.messages, system_prompt) |
| 902 | blocks = process_stream(response) |
| 903 | if stop_flag: break |
| 904 | |
| 905 | tool_results = [] |
| 906 | for block in blocks: |
| 907 | if block["type"] == "tool_use": |
| 908 | name, args = block["name"], block["input"] |
| 909 | preview = str(list(args.values())[0])[:50] if args else "" |
| 910 | print(f"\n{GREEN}⏺ {name}{RESET}({DIM}{preview}{RESET})") |
| 911 | |
| 912 | result = run_tool(name, args) |
| 913 | lines = result.split("\n") |
| 914 | prev = lines[0][:60] + ("..." if len(lines[0]) > 60 else "") |
| 915 | if len(lines) > 1: prev += f" +{len(lines)-1}" |
| 916 | print(f" {DIM}⎿ {prev}{RESET}") |
| 917 | |
| 918 | # Track file modifications (only project files) |
| 919 | if name in ['write', 'edit']: |
| 920 | filepath = args.get('path') |
| 921 | if filepath and is_file_in_project(filepath, session_manager.project_path): |
| 922 | files_modified.add(filepath) |
| 923 | files_modified_this_turn.add(filepath) |
| 924 | session_manager.current_session.track_file_state(filepath) |
| 925 | |
| 926 | tool_results.append({"type": "tool_result", "tool_use_id": block["id"], "content": result}) |
| 927 | |
| 928 | # Check stop_flag after each tool execution |
| 929 | if stop_flag: |
| 930 | print(f"{YELLOW}⚠ Tool execution stopped{RESET}") |
| 931 | break |
| 932 | |
| 933 | session_manager.current_session.messages.append({"role": "assistant", "content": blocks}) |
| 934 | if not tool_results or stop_flag: break |
| 935 | session_manager.current_session.messages.append({"role": "user", "content": tool_results}) |
| 936 | |
| 937 | # Auto checkpoint after AI work (if project files were modified) |
| 938 | if auto_checkpoint and files_modified_this_turn: |
| 939 | # files_modified_this_turn already filtered to project files only |
| 940 | # Use parent_commit for first checkpoint of new session |
| 941 | parent_commit = session_manager.parent_commit_for_next_checkpoint |
| 942 | checkpoint_id = session_manager.checkpoint_manager.create_checkpoint( |
| 943 | f"Auto: {user_input[:50]}", |
| 944 | list(files_modified_this_turn), |
| 945 | conversation_snapshot=session_manager.current_session.messages.copy(), |
| 946 | parent_commit=parent_commit |
| 947 | ) |
| 948 | # Clear parent after first checkpoint |
| 949 | if parent_commit: |
| 950 | session_manager.parent_commit_for_next_checkpoint = None |
| 951 | |
| 952 | if checkpoint_id: |
| 953 | # Generate summary using LLM with actual diff |
| 954 | print(f"{DIM}Generating checkpoint summary...{RESET}", end="", flush=True) |
| 955 | summary = summarize_changes( |
| 956 | user_input, |
| 957 | files_modified_this_turn, |
| 958 | session_manager.checkpoint_manager, |
| 959 | checkpoint_id |
| 960 | ) |
| 961 | print(f"\r{' ' * 40}\r", end="", flush=True) # Clear the line |
| 962 | |
| 963 | # Update commit message with better summary (only if different from temp message) |
| 964 | temp_message = f"Auto: {user_input[:50]}" |
| 965 | if summary != user_input[:50] and summary != temp_message: |
| 966 | session_manager.checkpoint_manager._git_command( |
| 967 | "--git-dir", session_manager.checkpoint_manager.bare_repo, |
| 968 | "commit", "--amend", "-m", summary |
| 969 | ) |
| 970 | |
| 971 | print(f"\n{YELLOW}📍 {checkpoint_id}: {summary}{RESET}") |
| 972 | else: |
| 973 | # Checkpoint creation failed (e.g., no actual diff) |
| 974 | print(f"\n{DIM}(No project file changes to checkpoint){RESET}") |
| 975 | |
| 976 | # Auto-save session after each interaction |
| 977 | session_manager.save_session() |
| 978 | |
| 979 | print() |
| 980 | except EOFError: |
| 981 | session_manager.save_session() |
| 982 | break |
| 983 | except Exception as e: print(f"{RED}⏺ Error: {e}{RESET}") |
| 984 | |
| 985 | # ============================================================================ |
| 986 | # Checkpoint & Session Management (Phase 1+2) |
| 987 | # ============================================================================ |
| 988 | |
| 989 | class CheckpointManager: |
| 990 | """Manage checkpoints using shadow bare git repository with session isolation""" |
| 991 | |
| 992 | def __init__(self, project_path, session_id=None): |
| 993 | self.project_path = project_path |
| 994 | self.session_id = session_id |
| 995 | self.nanocode_dir = os.path.join(project_path, ".nanocode") |
| 996 | self.bare_repo = os.path.join(self.nanocode_dir, "checkpoint.git") |
| 997 | self._init_bare_repo() |
| 998 | |
| 999 | def set_session(self, session_id): |
| 1000 | """Set current session for checkpoint operations""" |
| 1001 | self.session_id = session_id |
| 1002 | |
| 1003 | def _get_branch_name(self): |
| 1004 | """Get git branch name for current session""" |
| 1005 | if not self.session_id: |
| 1006 | return "main" |
| 1007 | return f"session_{self.session_id}" |
| 1008 | |
| 1009 | def _init_bare_repo(self): |
| 1010 | """Initialize shadow bare repository""" |
| 1011 | if not os.path.exists(self.bare_repo): |
| 1012 | os.makedirs(self.bare_repo, exist_ok=True) |
| 1013 | try: |
| 1014 | subprocess.run( |
| 1015 | ["git", "init", "--bare", self.bare_repo], |
| 1016 | capture_output=True, check=True |
| 1017 | ) |
| 1018 | except (subprocess.CalledProcessError, FileNotFoundError): |
| 1019 | # Git not available, will handle gracefully |
| 1020 | pass |
| 1021 | |
| 1022 | def _git_command(self, *args, cwd=None): |
| 1023 | """Execute git command""" |
| 1024 | try: |
| 1025 | result = subprocess.run( |
| 1026 | ["git"] + list(args), |
| 1027 | cwd=cwd or self.project_path, |
| 1028 | capture_output=True, |
| 1029 | text=True, |
| 1030 | check=True |
| 1031 | ) |
| 1032 | return result.stdout.strip() |
| 1033 | except (subprocess.CalledProcessError, FileNotFoundError) as e: |
| 1034 | return f"error: {e}" |
| 1035 | |
| 1036 | def create_checkpoint(self, message, files_changed, conversation_snapshot=None, parent_commit=None): |
| 1037 | """Create a checkpoint on current session's branch |
| 1038 | |
| 1039 | Args: |
| 1040 | message: Commit message |
| 1041 | files_changed: List of modified files |
| 1042 | conversation_snapshot: Conversation state to save |
| 1043 | parent_commit: Parent commit hash to branch from (for new sessions) |
| 1044 | """ |
| 1045 | print(f"{DIM}[LOG] create_checkpoint: files_changed={files_changed}{RESET}", flush=True) |
| 1046 | if not files_changed or not self.session_id: |
| 1047 | return None |
| 1048 | |
| 1049 | branch_name = self._get_branch_name() |
| 1050 | |
| 1051 | # Save conversation snapshot |
| 1052 | if conversation_snapshot: |
| 1053 | snapshot_file = os.path.join(self.nanocode_dir, "conversation_snapshots.json") |
| 1054 | snapshots = {} |
| 1055 | if os.path.exists(snapshot_file): |
| 1056 | with open(snapshot_file, 'r') as f: |
| 1057 | snapshots = json.load(f) |
| 1058 | |
| 1059 | # Create temp worktree for this session |
| 1060 | temp_worktree = os.path.join(self.nanocode_dir, f"temp_worktree_{self.session_id}") |
| 1061 | |
| 1062 | try: |
| 1063 | # Check if branch exists |
| 1064 | branch_exists = self._git_command("--git-dir", self.bare_repo, "rev-parse", "--verify", branch_name) |
| 1065 | |
| 1066 | if not branch_exists or branch_exists.startswith("error"): |
| 1067 | # Create new branch |
| 1068 | os.makedirs(temp_worktree, exist_ok=True) |
| 1069 | self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "config", "core.bare", "false") |
| 1070 | |
| 1071 | # If parent_commit specified, branch from it |
| 1072 | if parent_commit: |
| 1073 | # Create branch from parent commit |
| 1074 | self._git_command("--git-dir", self.bare_repo, "branch", branch_name, parent_commit) |
| 1075 | self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "checkout", branch_name, "-f") |
| 1076 | else: |
| 1077 | # Create orphan branch (no parent) |
| 1078 | self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "checkout", "--orphan", branch_name) |
| 1079 | |
| 1080 | # Copy files to temp worktree |
| 1081 | for filepath in files_changed: |
| 1082 | print(f"{DIM}[LOG] checkpoint copying: {filepath}{RESET}", flush=True) |
| 1083 | if os.path.exists(filepath): |
| 1084 | file_size = os.path.getsize(filepath) |
| 1085 | print(f"{DIM}[LOG] source file exists: {filepath} ({file_size} bytes){RESET}", flush=True) |
| 1086 | # Convert absolute path to relative path |
| 1087 | if os.path.isabs(filepath): |
| 1088 | rel_filepath = os.path.relpath(filepath, self.project_path) |
| 1089 | else: |
| 1090 | rel_filepath = filepath |
| 1091 | dest = os.path.join(temp_worktree, rel_filepath) |
| 1092 | os.makedirs(os.path.dirname(dest), exist_ok=True) |
| 1093 | with open(filepath, 'rb') as src, open(dest, 'wb') as dst: |
| 1094 | content = src.read() |
| 1095 | dst.write(content) |
| 1096 | print(f"{DIM}[LOG] copied to temp_worktree: {dest} ({len(content)} bytes){RESET}", flush=True) |
| 1097 | else: |
| 1098 | print(f"{DIM}[LOG] source file NOT exists: {filepath}{RESET}", flush=True) |
| 1099 | |
| 1100 | # Commit |
| 1101 | self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "add", "-A") |
| 1102 | self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, |
| 1103 | "commit", "-m", message, "--allow-empty") |
| 1104 | |
| 1105 | commit_hash = self._git_command("--git-dir", self.bare_repo, "rev-parse", "HEAD") |
| 1106 | checkpoint_id = commit_hash[:8] if commit_hash and not commit_hash.startswith("error") else None |
| 1107 | |
| 1108 | # Save conversation snapshot with checkpoint_id |
| 1109 | if checkpoint_id and conversation_snapshot: |
| 1110 | snapshots[checkpoint_id] = conversation_snapshot |
| 1111 | with open(snapshot_file, 'w') as f: |
| 1112 | json.dump(snapshots, f, indent=2) |
| 1113 | |
| 1114 | return checkpoint_id |
| 1115 | else: |
| 1116 | # Branch exists, checkout and commit |
| 1117 | self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "checkout", branch_name, "-f") |
| 1118 | |
| 1119 | # Update temp worktree |
| 1120 | for filepath in files_changed: |
| 1121 | print(f"{DIM}[LOG] checkpoint updating: {filepath}{RESET}", flush=True) |
| 1122 | if os.path.exists(filepath): |
| 1123 | file_size = os.path.getsize(filepath) |
| 1124 | print(f"{DIM}[LOG] source file exists: {filepath} ({file_size} bytes){RESET}", flush=True) |
| 1125 | # Convert absolute path to relative path |
| 1126 | if os.path.isabs(filepath): |
| 1127 | rel_filepath = os.path.relpath(filepath, self.project_path) |
| 1128 | else: |
| 1129 | rel_filepath = filepath |
| 1130 | dest = os.path.join(temp_worktree, rel_filepath) |
| 1131 | os.makedirs(os.path.dirname(dest), exist_ok=True) |
| 1132 | with open(filepath, 'rb') as src, open(dest, 'wb') as dst: |
| 1133 | content = src.read() |
| 1134 | dst.write(content) |
| 1135 | print(f"{DIM}[LOG] copied to temp_worktree: {dest} ({len(content)} bytes){RESET}", flush=True) |
| 1136 | else: |
| 1137 | print(f"{DIM}[LOG] source file NOT exists: {filepath}{RESET}", flush=True) |
| 1138 | |
| 1139 | self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "add", "-A") |
| 1140 | self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, |
| 1141 | "commit", "-m", message, "--allow-empty") |
| 1142 | |
| 1143 | commit_hash = self._git_command("--git-dir", self.bare_repo, "rev-parse", "HEAD") |
| 1144 | checkpoint_id = commit_hash[:8] if commit_hash and not commit_hash.startswith("error") else None |
| 1145 | |
| 1146 | # Save conversation snapshot with checkpoint_id |
| 1147 | if checkpoint_id and conversation_snapshot: |
| 1148 | snapshots[checkpoint_id] = conversation_snapshot |
| 1149 | with open(snapshot_file, 'w') as f: |
| 1150 | json.dump(snapshots, f, indent=2) |
| 1151 | |
| 1152 | return checkpoint_id |
| 1153 | except Exception as e: |
| 1154 | return None |
| 1155 | |
| 1156 | def list_checkpoints(self, limit=10, show_all=False): |
| 1157 | """List recent checkpoints for current session |
| 1158 | |
| 1159 | Args: |
| 1160 | limit: Maximum number of checkpoints to show |
| 1161 | show_all: If True, show all sessions; if False, only show current session |
| 1162 | """ |
| 1163 | if not self.session_id and not show_all: |
| 1164 | return [] |
| 1165 | |
| 1166 | try: |
| 1167 | if show_all: |
| 1168 | # Show all branches |
| 1169 | args = ["--git-dir", self.bare_repo, "log", f"--max-count={limit}", "--oneline", "--all"] |
| 1170 | else: |
| 1171 | # Show only current session's branch |
| 1172 | branch_name = self._get_branch_name() |
| 1173 | args = ["--git-dir", self.bare_repo, "log", f"--max-count={limit}", "--oneline", branch_name] |
| 1174 | |
| 1175 | log = self._git_command(*args) |
| 1176 | if log and not log.startswith("error"): |
| 1177 | return [line.split(" ", 1) for line in log.split("\n") if line] |
| 1178 | return [] |
| 1179 | except: |
| 1180 | return [] |
| 1181 | |
| 1182 | def restore_checkpoint(self, checkpoint_id): |
| 1183 | """Restore files to checkpoint state and reset current session's branch |
| 1184 | |
| 1185 | Returns: |
| 1186 | tuple: (success: bool, conversation_snapshot: dict or None) |
| 1187 | """ |
| 1188 | if not self.session_id: |
| 1189 | return False, None |
| 1190 | |
| 1191 | branch_name = self._get_branch_name() |
| 1192 | temp_worktree = os.path.join(self.nanocode_dir, f"temp_worktree_{self.session_id}") |
| 1193 | |
| 1194 | try: |
| 1195 | # Checkout branch first |
| 1196 | self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, "checkout", branch_name, "-f") |
| 1197 | |
| 1198 | # Reset branch to checkpoint (discards future commits on this branch) |
| 1199 | self._git_command("--git-dir", self.bare_repo, "reset", "--hard", checkpoint_id) |
| 1200 | |
| 1201 | # Checkout to temp worktree |
| 1202 | self._git_command("--git-dir", self.bare_repo, "--work-tree", temp_worktree, |
| 1203 | "checkout", checkpoint_id, "-f") |
| 1204 | |
| 1205 | # Copy files back to project |
| 1206 | for root, dirs, files in os.walk(temp_worktree): |
| 1207 | for file in files: |
| 1208 | src = os.path.join(root, file) |
| 1209 | rel_path = os.path.relpath(src, temp_worktree) |
| 1210 | dest = os.path.join(self.project_path, rel_path) |
| 1211 | |
| 1212 | os.makedirs(os.path.dirname(dest), exist_ok=True) |
| 1213 | with open(src, 'rb') as s, open(dest, 'wb') as d: |
| 1214 | d.write(s.read()) |
| 1215 | |
| 1216 | # Load conversation snapshot |
| 1217 | snapshot_file = os.path.join(self.nanocode_dir, "conversation_snapshots.json") |
| 1218 | conversation_snapshot = None |
| 1219 | if os.path.exists(snapshot_file): |
| 1220 | with open(snapshot_file, 'r') as f: |
| 1221 | snapshots = json.load(f) |
| 1222 | conversation_snapshot = snapshots.get(checkpoint_id) |
| 1223 | |
| 1224 | return True, conversation_snapshot |
| 1225 | except: |
| 1226 | return False, None |
| 1227 | |
| 1228 | |
| 1229 | class Session: |
| 1230 | """Represents a conversation session""" |
| 1231 | |
| 1232 | def __init__(self, session_id=None): |
| 1233 | self.session_id = session_id or self._generate_session_id() |
| 1234 | self.messages = [] |
| 1235 | self.file_states = {} |
| 1236 | self.metadata = { |
| 1237 | 'created_at': time.time(), |
| 1238 | 'last_active': time.time(), |
| 1239 | 'description': '', |
| 1240 | 'cwd': os.getcwd(), |
| 1241 | 'parent_checkpoint': None, # Track where this session branched from |
| 1242 | 'parent_session': None, # Track which session it branched from |
| 1243 | 'git_commit': None, # Project .git commit hash when session started |
| 1244 | 'git_branch': None, # Project .git branch when session started |
| 1245 | 'git_dirty': False, # Whether project had uncommitted changes |
| 1246 | } |
| 1247 | |
| 1248 | def _generate_session_id(self): |
| 1249 | """Generate unique session ID""" |
| 1250 | timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
| 1251 | random_suffix = ''.join(random.choices('0123456789abcdef', k=4)) |
| 1252 | return f"{timestamp}_{random_suffix}" |
| 1253 | |
| 1254 | def _get_project_git_info(self): |
| 1255 | """Get current project git state""" |
| 1256 | try: |
| 1257 | cwd = self.metadata.get('cwd', os.getcwd()) |
| 1258 | |
| 1259 | # Get current commit |
| 1260 | commit = subprocess.run( |
| 1261 | ["git", "rev-parse", "HEAD"], |
| 1262 | cwd=cwd, |
| 1263 | capture_output=True, text=True, check=True |
| 1264 | ).stdout.strip() |
| 1265 | |
| 1266 | # Get current branch |
| 1267 | branch = subprocess.run( |
| 1268 | ["git", "rev-parse", "--abbrev-ref", "HEAD"], |
| 1269 | cwd=cwd, |
| 1270 | capture_output=True, text=True, check=True |
| 1271 | ).stdout.strip() |
| 1272 | |
| 1273 | # Check if dirty |
| 1274 | status = subprocess.run( |
| 1275 | ["git", "status", "--porcelain"], |
| 1276 | cwd=cwd, |
| 1277 | capture_output=True, text=True, check=True |
| 1278 | ).stdout.strip() |
| 1279 | |
| 1280 | return { |
| 1281 | 'git_commit': commit[:8], |
| 1282 | 'git_branch': branch, |
| 1283 | 'git_dirty': bool(status) |
| 1284 | } |
| 1285 | except: |
| 1286 | return None |
| 1287 | |
| 1288 | def capture_git_state(self): |
| 1289 | """Capture current project git state into metadata""" |
| 1290 | git_info = self._get_project_git_info() |
| 1291 | if git_info: |
| 1292 | self.metadata.update(git_info) |
| 1293 | |
| 1294 | def track_file_state(self, filepath): |
| 1295 | """Track file state for conflict detection""" |
| 1296 | if os.path.exists(filepath): |
| 1297 | with open(filepath, 'rb') as f: |
| 1298 | content = f.read() |
| 1299 | file_hash = hashlib.md5(content).hexdigest() |
| 1300 | self.file_states[filepath] = { |
| 1301 | 'hash': file_hash, |
| 1302 | 'mtime': os.path.getmtime(filepath), |
| 1303 | 'size': len(content) |
| 1304 | } |
| 1305 | |
| 1306 | def detect_conflicts(self): |
| 1307 | """Detect if tracked files have been modified outside this session |
| 1308 | |
| 1309 | Returns: |
| 1310 | list: List of conflicted file paths |
| 1311 | """ |
| 1312 | conflicts = [] |
| 1313 | |
| 1314 | for filepath, saved_state in self.file_states.items(): |
| 1315 | if os.path.exists(filepath): |
| 1316 | with open(filepath, 'rb') as f: |
| 1317 | content = f.read() |
| 1318 | current_hash = hashlib.md5(content).hexdigest() |
| 1319 | current_mtime = os.path.getmtime(filepath) |
| 1320 | |
| 1321 | # Check if file has changed |
| 1322 | if (current_hash != saved_state['hash'] or |
| 1323 | current_mtime != saved_state['mtime']): |
| 1324 | conflicts.append(filepath) |
| 1325 | else: |
| 1326 | # File was deleted |
| 1327 | conflicts.append(f"{filepath} (deleted)") |
| 1328 | |
| 1329 | return conflicts |
| 1330 | |
| 1331 | def update_file_states(self): |
| 1332 | """Update all tracked file states to current state""" |
| 1333 | for filepath in list(self.file_states.keys()): |
| 1334 | if os.path.exists(filepath): |
| 1335 | self.track_file_state(filepath) |
| 1336 | else: |
| 1337 | # Remove deleted files from tracking |
| 1338 | del self.file_states[filepath] |
| 1339 | |
| 1340 | def to_dict(self): |
| 1341 | """Serialize to dict""" |
| 1342 | return { |
| 1343 | 'session_id': self.session_id, |
| 1344 | 'messages': self.messages, |
| 1345 | 'file_states': self.file_states, |
| 1346 | 'metadata': self.metadata |
| 1347 | } |
| 1348 | |
| 1349 | @staticmethod |
| 1350 | def from_dict(data): |
| 1351 | """Deserialize from dict""" |
| 1352 | session = Session(session_id=data['session_id']) |
| 1353 | session.messages = data.get('messages', []) |
| 1354 | session.file_states = data.get('file_states', {}) |
| 1355 | session.metadata = data.get('metadata', {}) |
| 1356 | return session |
| 1357 | |
| 1358 | |
| 1359 | class SessionManager: |
| 1360 | """Manage multiple sessions""" |
| 1361 | |
| 1362 | def __init__(self, project_path): |
| 1363 | self.project_path = project_path |
| 1364 | self.sessions_dir = os.path.join(project_path, ".nanocode", "sessions") |
| 1365 | self.current_session = None |
| 1366 | self.checkpoint_manager = CheckpointManager(project_path) |
| 1367 | self.parent_commit_for_next_checkpoint = None # Track parent for first checkpoint |
| 1368 | os.makedirs(self.sessions_dir, exist_ok=True) |
| 1369 | |
| 1370 | def create_session(self, description="", parent_checkpoint=None, parent_session=None): |
| 1371 | """Create new session |
| 1372 | |
| 1373 | Args: |
| 1374 | description: Session description |
| 1375 | parent_checkpoint: Checkpoint ID this session branches from |
| 1376 | parent_session: Session ID this session branches from |
| 1377 | """ |
| 1378 | session = Session() |
| 1379 | session.metadata['description'] = description |
| 1380 | session.metadata['parent_checkpoint'] = parent_checkpoint |
| 1381 | session.metadata['parent_session'] = parent_session |
| 1382 | # Capture project git state |
| 1383 | session.capture_git_state() |
| 1384 | self.current_session = session |
| 1385 | # Set checkpoint manager to use this session |
| 1386 | self.checkpoint_manager.set_session(session.session_id) |
| 1387 | # Store parent commit for first checkpoint |
| 1388 | self.parent_commit_for_next_checkpoint = parent_checkpoint |
| 1389 | self.save_session() |
| 1390 | return session |
| 1391 | |
| 1392 | def save_session(self): |
| 1393 | """Save current session to disk""" |
| 1394 | if not self.current_session: |
| 1395 | return |
| 1396 | |
| 1397 | self.current_session.metadata['last_active'] = time.time() |
| 1398 | session_file = os.path.join( |
| 1399 | self.sessions_dir, |
| 1400 | f"{self.current_session.session_id}.json" |
| 1401 | ) |
| 1402 | |
| 1403 | with open(session_file, 'w') as f: |
| 1404 | json.dump(self.current_session.to_dict(), f, indent=2) |
| 1405 | |
| 1406 | def load_session(self, session_id): |
| 1407 | """Load session from disk""" |
| 1408 | session_file = os.path.join(self.sessions_dir, f"{session_id}.json") |
| 1409 | |
| 1410 | if not os.path.exists(session_file): |
| 1411 | return None |
| 1412 | |
| 1413 | with open(session_file, 'r') as f: |
| 1414 | data = json.load(f) |
| 1415 | |
| 1416 | session = Session.from_dict(data) |
| 1417 | self.current_session = session |
| 1418 | # Set checkpoint manager to use this session |
| 1419 | self.checkpoint_manager.set_session(session.session_id) |
| 1420 | return session |
| 1421 | |
| 1422 | def list_sessions(self): |
| 1423 | """List all sessions""" |
| 1424 | sessions = [] |
| 1425 | |
| 1426 | if not os.path.exists(self.sessions_dir): |
| 1427 | return sessions |
| 1428 | |
| 1429 | for filename in os.listdir(self.sessions_dir): |
| 1430 | if filename.endswith('.json'): |
| 1431 | filepath = os.path.join(self.sessions_dir, filename) |
| 1432 | try: |
| 1433 | with open(filepath, 'r') as f: |
| 1434 | data = json.load(f) |
| 1435 | sessions.append({ |
| 1436 | 'session_id': data['session_id'], |
| 1437 | 'metadata': data['metadata'], |
| 1438 | 'message_count': len(data.get('messages', [])), |
| 1439 | }) |
| 1440 | except: |
| 1441 | pass |
| 1442 | |
| 1443 | return sorted(sessions, key=lambda x: x['metadata'].get('last_active', 0), reverse=True) |
| 1444 | |
| 1445 | def load_last_session(self): |
| 1446 | """Load the most recent session""" |
| 1447 | sessions = self.list_sessions() |
| 1448 | if sessions: |
| 1449 | return self.load_session(sessions[0]['session_id']) |
| 1450 | return None |
| 1451 | |
| 1452 | |
| 1453 | def handle_checkpoint_command(parts, session_manager, files_modified): |
| 1454 | """Handle /checkpoint or /c commands |
| 1455 | |
| 1456 | Returns: |
| 1457 | messages: New messages list if conversation was restored, None otherwise |
| 1458 | """ |
| 1459 | # Default to list if no subcommand |
| 1460 | if len(parts) < 2: |
| 1461 | parts.append("list") |
| 1462 | |
| 1463 | cmd = parts[1] |
| 1464 | |
| 1465 | # If cmd looks like a commit hash (7-8 hex chars), treat as restore |
| 1466 | if len(cmd) >= 7 and len(cmd) <= 8 and all(c in '0123456789abcdef' for c in cmd.lower()): |
| 1467 | cmd = "restore" |
| 1468 | checkpoint_id = parts[1] |
| 1469 | else: |
| 1470 | checkpoint_id = None |
| 1471 | |
| 1472 | if cmd == "list" or cmd == "all" or cmd == "--all": |
| 1473 | show_all = (cmd == "all" or "--all" in parts) |
| 1474 | |
| 1475 | if show_all: |
| 1476 | # Show git graph of all branches |
| 1477 | print(f"\n{BOLD}📍 Checkpoint Graph:{RESET}\n") |
| 1478 | |
| 1479 | # Use git log --graph --all to show the tree |
| 1480 | # Format: %h = short hash, %d = ref names, %s = subject, %ar = relative date |
| 1481 | graph_output = session_manager.checkpoint_manager._git_command( |
| 1482 | "--git-dir", session_manager.checkpoint_manager.bare_repo, |
| 1483 | "log", "--graph", "--all", "--oneline", |
| 1484 | "--format=%h %s (%ar)", "-20" |
| 1485 | ) |
| 1486 | |
| 1487 | if graph_output and not graph_output.startswith("error"): |
| 1488 | # Also get branch info for each commit |
| 1489 | branches_output = session_manager.checkpoint_manager._git_command( |
| 1490 | "--git-dir", session_manager.checkpoint_manager.bare_repo, |
| 1491 | "branch", "-a", "--contains" |
| 1492 | ) |
| 1493 | |
| 1494 | # Parse and display |
| 1495 | for line in graph_output.split('\n'): |
| 1496 | if not line.strip(): |
| 1497 | continue |
| 1498 | |
| 1499 | # Extract commit hash |
| 1500 | match = re.search(r'\b([0-9a-f]{7,8})\b', line) |
| 1501 | if match: |
| 1502 | commit_hash = match.group(1) |
| 1503 | |
| 1504 | # Get branches containing this commit |
| 1505 | branch_info = session_manager.checkpoint_manager._git_command( |
| 1506 | "--git-dir", session_manager.checkpoint_manager.bare_repo, |
| 1507 | "branch", "-a", "--contains", commit_hash |
| 1508 | ) |
| 1509 | |
| 1510 | # Extract session names from branches |
| 1511 | session_names = [] |
| 1512 | if branch_info and not branch_info.startswith("error"): |
| 1513 | for branch_line in branch_info.split('\n'): |
| 1514 | branch_line = branch_line.strip().lstrip('* ') |
| 1515 | if branch_line.startswith('session_'): |
| 1516 | # Shorten session name: session_20260130_103323_f7 -> s:20260130_103323_f7 |
| 1517 | session_short = 's:' + branch_line[8:] # Remove 'session_' prefix |
| 1518 | session_names.append(session_short) |
| 1519 | |
| 1520 | # Highlight commit hash |
| 1521 | line = line.replace(commit_hash, f"{CYAN}{commit_hash}{RESET}") |
| 1522 | |
| 1523 | # Add session info if found |
| 1524 | if session_names: |
| 1525 | # Insert session names after commit hash |
| 1526 | session_str = f"{GREEN}[{', '.join(session_names[:2])}]{RESET}" |
| 1527 | line = line.replace(commit_hash + f"{RESET}", commit_hash + f"{RESET} {session_str}") |
| 1528 | |
| 1529 | |
| 1530 | print(f" {line}") |
| 1531 | print() |
| 1532 | else: |
| 1533 | print(f"{DIM}No checkpoints yet{RESET}\n") |
| 1534 | |
| 1535 | print(f"{DIM}Restore: /c <hash>{RESET}") |
| 1536 | return None |
| 1537 | else: |
| 1538 | # Show current session's checkpoints |
| 1539 | checkpoints = session_manager.checkpoint_manager.list_checkpoints(show_all=False) |
| 1540 | if not checkpoints: |
| 1541 | print(f"{DIM}No checkpoints yet{RESET}") |
| 1542 | return None |
| 1543 | |
| 1544 | print(f"\n{BOLD}📍 Checkpoints:{RESET}\n") |
| 1545 | |
| 1546 | # Get checkpoint details from git log with timestamp |
| 1547 | for commit_hash, message in checkpoints[:10]: # Show first 10 (already newest first from git log) |
| 1548 | # Try to get timestamp from git |
| 1549 | timestamp_str = session_manager.checkpoint_manager._git_command( |
| 1550 | "--git-dir", session_manager.checkpoint_manager.bare_repo, |
| 1551 | "log", "-1", "--format=%ar", commit_hash |
| 1552 | ) |
| 1553 | if timestamp_str.startswith("error"): |
| 1554 | timestamp_str = "" |
| 1555 | |
| 1556 | # Get modified files |
| 1557 | files_str = session_manager.checkpoint_manager._git_command( |
| 1558 | "--git-dir", session_manager.checkpoint_manager.bare_repo, |
| 1559 | "diff-tree", "--no-commit-id", "--name-only", "-r", commit_hash |
| 1560 | ) |
| 1561 | files = [] |
| 1562 | if files_str and not files_str.startswith("error"): |
| 1563 | files = [f.strip() for f in files_str.split('\n') if f.strip()] |
| 1564 | |
| 1565 | # Format: hash | time ago | message |
| 1566 | time_part = f"{DIM}{timestamp_str}{RESET}" if timestamp_str else "" |
| 1567 | print(f" {CYAN}{commit_hash}{RESET} {time_part}") |
| 1568 | print(f" {DIM}└─{RESET} {message}") |
| 1569 | if files: |
| 1570 | files_display = ", ".join(files[:3]) |
| 1571 | if len(files) > 3: |
| 1572 | files_display += f" +{len(files)-3} more" |
| 1573 | print(f" {DIM} Files: {files_display}{RESET}") |
| 1574 | print() |
| 1575 | |
| 1576 | print(f"{DIM}Tip: Use '/c all' or '/ca' to see git graph{RESET}") |
| 1577 | print(f"{DIM}Restore: /c <hash>{RESET}") |
| 1578 | return None |
| 1579 | |
| 1580 | elif cmd == "restore": |
| 1581 | if not checkpoint_id: |
| 1582 | print(f"{RED}Usage: /c <checkpoint_id>{RESET}") |
| 1583 | return None |
| 1584 | |
| 1585 | print(f"{YELLOW}⚠ This will restore files AND conversation to checkpoint {checkpoint_id}{RESET}") |
| 1586 | print(f"{YELLOW}⚠ Future checkpoints will be discarded from history{RESET}") |
| 1587 | confirm = input(f"{BOLD}Continue? (y/N): {RESET}").strip().lower() |
| 1588 | |
| 1589 | if confirm != 'y': |
| 1590 | print(f"{DIM}Cancelled{RESET}") |
| 1591 | return None |
| 1592 | |
| 1593 | success, conversation_snapshot = session_manager.checkpoint_manager.restore_checkpoint(checkpoint_id) |
| 1594 | if success: |
| 1595 | print(f"{GREEN}✓ Restored files to checkpoint {checkpoint_id}{RESET}") |
| 1596 | if conversation_snapshot: |
| 1597 | print(f"{GREEN}✓ Restored conversation ({len(conversation_snapshot)} messages){RESET}") |
| 1598 | return conversation_snapshot |
| 1599 | else: |
| 1600 | print(f"{YELLOW}⚠ No conversation snapshot found for this checkpoint{RESET}") |
| 1601 | return None |
| 1602 | else: |
| 1603 | print(f"{RED}✗ Failed to restore checkpoint{RESET}") |
| 1604 | return None |
| 1605 | |
| 1606 | else: |
| 1607 | print(f"{RED}Unknown command: {cmd}{RESET}") |
| 1608 | return None |
| 1609 | |
| 1610 | |
| 1611 | if __name__ == "__main__": |
| 1612 | main() |
| 1613 |