wip: [01-stabilize] paused at task 1/1 - OCR Hallucination Immune logic via Semantic delta window and fret-isolation

This commit is contained in:
2026-03-29 22:08:40 +09:00
parent aca7bf592a
commit 2507de45d3
4289 changed files with 732689 additions and 28672 deletions

View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
def main() -> None:
import json
import os
from pathlib import Path
import subprocess
from registry import registry
state_path = Path("/root/state.json")
if state_path.exists():
state = json.loads(state_path.read_text())
else:
state = {}
repo_root = registry.get("ROOT", os.getenv("ROOT"))
patch_path = Path("/root/model.patch")
subprocess.run(
f"git add -A && git diff --cached > {patch_path}",
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
cwd=repo_root,
)
patch = patch_path.read_text(errors="backslashreplace")
state["diff"] = patch.strip()
state_path.write_text(json.dumps(state))
def _del_diff():
from pathlib import Path
import json
state_path = Path("/root/state.json")
if state_path.exists():
state = json.loads(state_path.read_text())
else:
state = {}
state["diff"] = ""
state_path.write_text(json.dumps(state))
if __name__ == "__main__":
try:
main()
except Exception as e:
_del_diff()

View File

@@ -0,0 +1,2 @@
tools: {}
state_command: "_state_diff_state"

View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python3
import json
import os
from pathlib import Path
def main():
state_path = Path("/root/state.json")
if state_path.exists():
state = json.loads(state_path.read_text())
else:
state = {}
state["working_dir"] = os.getcwd()
state_path.write_text(json.dumps(state))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,710 @@
#!/usr/bin/env python3
"""This is an adaptation of the Anthropic Text Editor tool from
https://github.com/anthropics/anthropic-quickstarts/blob/main/computer-use-demo/computer_use_demo/tools/edit.py
However, we made it python 3.6 compatible and stateless (all state is saved in a json file)
"""
import argparse
import json
import re
import subprocess
import sys
from collections import defaultdict
from pathlib import Path
from typing import List, Optional, Tuple
import io
from registry import registry as REGISTRY
# There are some super strange "ascii can't decode x" errors,
# that can be solved with setting the default encoding for stdout
# (note that python3.6 doesn't have the reconfigure method)
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
TRUNCATED_MESSAGE: str = "<response clipped><NOTE>To save on context only part of this file has been shown to you. You should retry this tool after you have searched inside the file with `grep -n` in order to find the line numbers of what you are looking for.</NOTE>"
MAX_RESPONSE_LEN: int = 16000
MAX_WINDOW_EXPANSION_VIEW = int(REGISTRY.get("MAX_WINDOW_EXPANSION_VIEW", 0))
MAX_WINDOW_EXPANSION_EDIT_CONFIRM = int(REGISTRY.get("MAX_WINDOW_EXPANSION_EDIT_CONFIRM", 0))
USE_FILEMAP = REGISTRY.get("USE_FILEMAP", "false").lower() == "true"
USE_LINTER = REGISTRY.get("USE_LINTER", "false").lower() == "true"
Command = str
SNIPPET_LINES: int = 4
LINT_WARNING_TEMPLATE = """
<NOTE>Your edits have been applied, but the linter has found syntax errors.</NOTE>
<ERRORS>
{errors}
</ERRORS>
Please review the changes and make sure they are correct.
In addition to the above errors, please also check the following:
1. The edited file is correctly indented
2. The edited file does not contain duplicate lines
3. The edit does not break existing functionality
<IMPORTANT>In rare cases, the linter errors might not actually be errors or caused by your edit. Please use your own judgement.</IMPORTANT>
Edit the file again if necessary.
"""
def maybe_truncate(content: str, truncate_after: Optional[int] = MAX_RESPONSE_LEN):
"""Truncate content and append a notice if content exceeds the specified length."""
return (
content
if not truncate_after or len(content) <= truncate_after
else content[:truncate_after] + TRUNCATED_MESSAGE
)
class Flake8Error:
"""A class to represent a single flake8 error"""
def __init__(self, filename: str, line_number: int, col_number: int, problem: str):
self.filename = filename
self.line_number = line_number
self.col_number = col_number
self.problem = problem
@classmethod
def from_line(cls, line: str):
try:
prefix, _sep, problem = line.partition(": ")
filename, line_number, col_number = prefix.split(":")
except (ValueError, IndexError) as e:
msg = f"Invalid flake8 error line: {line}"
raise ValueError(msg) from e
return cls(filename, int(line_number), int(col_number), problem)
def __eq__(self, other):
if not isinstance(other, Flake8Error):
return NotImplemented
return (
self.filename == other.filename
and self.line_number == other.line_number
and self.col_number == other.col_number
and self.problem == other.problem
)
def __repr__(self):
return f"Flake8Error(filename={self.filename}, line_number={self.line_number}, col_number={self.col_number}, problem={self.problem})"
def _update_previous_errors(
previous_errors: List[Flake8Error], replacement_window: Tuple[int, int], replacement_n_lines: int
) -> List[Flake8Error]:
"""Update the line numbers of the previous errors to what they would be after the edit window.
This is a helper function for `_filter_previous_errors`.
All previous errors that are inside of the edit window should not be ignored,
so they are removed from the previous errors list.
Args:
previous_errors: list of errors with old line numbers
replacement_window: the window of the edit/lines that will be replaced
replacement_n_lines: the number of lines that will be used to replace the text
Returns:
list of errors with updated line numbers
"""
updated = []
lines_added = replacement_n_lines - (replacement_window[1] - replacement_window[0] + 1)
for error in previous_errors:
if error.line_number < replacement_window[0]:
# no need to adjust the line number
updated.append(error)
continue
if replacement_window[0] <= error.line_number <= replacement_window[1]:
# The error is within the edit window, so let's not ignore it
# either way (we wouldn't know how to adjust the line number anyway)
continue
# We're out of the edit window, so we need to adjust the line number
updated.append(Flake8Error(error.filename, error.line_number + lines_added, error.col_number, error.problem))
return updated
def format_flake8_output(
input_string: str,
show_line_numbers: bool = False,
*,
previous_errors_string: str = "",
replacement_window: Optional[Tuple[int, int]] = None,
replacement_n_lines: Optional[int] = None,
) -> str:
"""Filter flake8 output for previous errors and print it for a given file.
Args:
input_string: The flake8 output as a string
show_line_numbers: Whether to show line numbers in the output
previous_errors_string: The previous errors as a string
replacement_window: The window of the edit (lines that will be replaced)
replacement_n_lines: The number of lines used to replace the text
Returns:
The filtered flake8 output as a string
"""
# print(f"Replacement window: {replacement_window}")
# print("Replacement n lines:", replacement_n_lines)
# print("Previous errors string:", previous_errors_string)
# print("Input string:", input_string)
errors = [Flake8Error.from_line(line.strip()) for line in input_string.split("\n") if line.strip()]
# print(f"New errors before filtering: {errors=}")
lines = []
if previous_errors_string:
assert replacement_window is not None
assert replacement_n_lines is not None
previous_errors = [
Flake8Error.from_line(line.strip()) for line in previous_errors_string.split("\n") if line.strip()
]
# print(f"Previous errors before updating: {previous_errors=}")
previous_errors = _update_previous_errors(previous_errors, replacement_window, replacement_n_lines)
# print(f"Previous errors after updating: {previous_errors=}")
errors = [error for error in errors if error not in previous_errors]
# Sometimes new errors appear above the replacement window that were 'shadowed' by the previous errors
# they still clearly aren't caused by the edit.
errors = [error for error in errors if error.line_number >= replacement_window[0]]
# print(f"New errors after filtering: {errors=}")
for error in errors:
if not show_line_numbers:
lines.append(f"- {error.problem}")
else:
lines.append(f"- line {error.line_number} col {error.col_number}: {error.problem}")
return "\n".join(lines)
def flake8(file_path: str) -> str:
"""Run flake8 on a given file and return the output as a string"""
if Path(file_path).suffix != ".py":
return ""
cmd = REGISTRY.get("LINT_COMMAND", "flake8 --isolated --select=F821,F822,F831,E111,E112,E113,E999,E902 {file_path}")
# don't use capture_output because it's not compatible with python3.6
out = subprocess.run(cmd.format(file_path=file_path), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return out.stdout.decode()
class Filemap:
def show_filemap(self, file_contents: str, encoding: str = "utf8"):
import warnings
from tree_sitter_languages import get_language, get_parser
warnings.simplefilter("ignore", category=FutureWarning)
parser = get_parser("python")
language = get_language("python")
tree = parser.parse(bytes(file_contents.encode(encoding, errors="replace")))
# See https://tree-sitter.github.io/tree-sitter/using-parsers#pattern-matching-with-queries.
query = language.query("""
(function_definition
body: (_) @body)
""")
# TODO: consider special casing docstrings such that they are not elided. This
# could be accomplished by checking whether `body.text.decode('utf8')` starts
# with `"""` or `'''`.
elide_line_ranges = [
(node.start_point[0], node.end_point[0])
for node, _ in query.captures(tree.root_node)
# Only elide if it's sufficiently long
if node.end_point[0] - node.start_point[0] >= 5
]
# Note that tree-sitter line numbers are 0-indexed, but we display 1-indexed.
elide_lines = {line for start, end in elide_line_ranges for line in range(start, end + 1)}
elide_messages = [(start, f"... eliding lines {start+1}-{end+1} ...") for start, end in elide_line_ranges]
out = []
for i, line in sorted(
elide_messages + [(i, line) for i, line in enumerate(file_contents.splitlines()) if i not in elide_lines]
):
out.append(f"{i+1:6d} {line}")
return "\n".join(out)
class WindowExpander:
def __init__(self, suffix: str = ""):
"""Try to expand viewports to include whole functions, classes, etc. rather than
using fixed line windows.
Args:
suffix: Filename suffix
"""
self.suffix = suffix
if self.suffix:
assert self.suffix.startswith(".")
def _find_breakpoints(self, lines: List[str], current_line: int, direction=1, max_added_lines: int = 30) -> int:
"""Returns 1-based line number of breakpoint. This line is meant to still be included in the viewport.
Args:
lines: List of lines of the file
current_line: 1-based line number of the current viewport
direction: 1 for down, -1 for up
max_added_lines: Maximum number of lines to extend
Returns:
1-based line number of breakpoint. This line is meant to still be included in the viewport.
"""
assert 1 <= current_line <= len(lines)
assert 0 <= max_added_lines
# 1. Find line range that we want to search for breakpoints in
if direction == 1:
# down
if current_line == len(lines):
# already last line, can't extend down
return current_line
iter_lines = range(current_line, 1 + min(current_line + max_added_lines, len(lines)))
elif direction == -1:
# up
if current_line == 1:
# already first line, can't extend up
return current_line
iter_lines = range(current_line, -1 + max(current_line - max_added_lines, 1), -1)
else:
msg = f"Invalid direction {direction}"
raise ValueError(msg)
# 2. Find the best breakpoint in the line range
# Every condition gives a score, the best score is the best breakpoint
best_score = 0
best_breakpoint = current_line
for i_line in iter_lines:
next_line = None
line = lines[i_line - 1]
if i_line + direction in iter_lines:
next_line = lines[i_line + direction - 1]
score = 0
if line == "":
score = 1
if next_line == "":
# Double new blank line:
score = 2
if self.suffix == ".py" and any(
re.match(regex, line) for regex in [r"^\s*def\s+", r"^\s*class\s+", r"^\s*@"]
):
# We include decorators here, because they are always on top of the function/class definition
score = 3
if score > best_score:
best_score = score
best_breakpoint = i_line
if direction == 1 and i_line != current_line:
best_breakpoint -= 1
if i_line == 1 or i_line == len(lines):
score = 3
if score > best_score:
best_score = score
best_breakpoint = i_line
# print(f"Score {score} for line {i_line} ({line})")
# print(f"Best score {best_score} for line {best_breakpoint} ({lines[best_breakpoint-1]})")
if direction == 1 and best_breakpoint < current_line or direction == -1 and best_breakpoint > current_line:
# We don't want to shrink the view port, so we return the current line
return current_line
return best_breakpoint
def expand_window(self, lines: List[str], start: int, stop: int, max_added_lines: int) -> Tuple[int, int]:
"""
Args:
lines: All lines of the file
start: 1-based line number of the start of the viewport
stop: 1-based line number of the end of the viewport
max_added_lines: Maximum number of lines to extend (separately for each side)
Returns:
Tuple of 1-based line numbers of the start and end of the viewport.
Both inclusive.
"""
# print("Input:", start, stop)
assert 1 <= start <= stop <= len(lines), (start, stop, len(lines))
if max_added_lines <= 0:
# Already at max range, no expansion
return start, stop
new_start = self._find_breakpoints(lines, start, direction=-1, max_added_lines=max_added_lines)
new_stop = self._find_breakpoints(lines, stop, direction=1, max_added_lines=max_added_lines)
# print(f"Expanded window is {new_start} to {new_stop}")
assert new_start <= new_stop, (new_start, new_stop)
assert new_start <= start, (new_start, start)
assert start - new_start <= max_added_lines, (start, new_start)
assert new_stop >= stop, (new_stop, stop)
assert new_stop - stop <= max_added_lines, (new_stop, stop)
return new_start, new_stop
class EditTool:
"""
An filesystem editor tool that allows the agent to view, create, and edit files.
The tool parameters are defined by Anthropic and are not editable.
"""
name = "str_replace_editor"
def __init__(self):
super().__init__()
self._encoding = None
@property
def _file_history(self):
return defaultdict(list, json.loads(REGISTRY.get("file_history", "{}")))
@_file_history.setter
def _file_history(self, value: dict):
REGISTRY["file_history"] = json.dumps(value)
def __call__(
self,
*,
command: Command,
path: str,
file_text: Optional[str] = None,
view_range: Optional[List[int]] = None,
old_str: Optional[str] = None,
new_str: Optional[str] = None,
insert_line: Optional[int] = None,
**kwargs,
):
_path = Path(path)
self.validate_path(command, _path)
if command == "view":
return self.view(_path, view_range)
elif command == "create":
if file_text is None:
print("Parameter `file_text` is required for command: create")
sys.exit(1)
self.create_file(_path, file_text)
return None
elif command == "str_replace":
if old_str is None:
print("Parameter `old_str` is required for command: str_replace")
sys.exit(2)
return self.str_replace(_path, old_str, new_str)
elif command == "insert":
if insert_line is None:
print("Parameter `insert_line` is required for command: insert")
sys.exit(3)
if new_str is None:
print("Parameter `new_str` is required for command: insert")
sys.exit(4)
return self.insert(_path, insert_line, new_str)
elif command == "undo_edit":
return self.undo_edit(_path)
print(
f'Unrecognized command {command}. The allowed commands for the {self.name} tool are: "view", "create", "str_replace", "insert", "undo_edit"'
)
sys.exit(5)
def validate_path(self, command: str, path: Path):
"""
Check that the path/command combination is valid.
"""
# Check if its an absolute path
if not path.is_absolute():
suggested_path = Path.cwd() / path
print(
f"The path {path} is not an absolute path, it should start with `/`. Maybe you meant {suggested_path}?"
)
sys.exit(6)
# Check if path exists
if not path.exists() and command != "create":
print(f"The path {path} does not exist. Please provide a valid path.")
sys.exit(7)
if path.exists() and command == "create":
print(f"File already exists at: {path}. Cannot overwrite files using command `create`.")
sys.exit(8)
# Check if the path points to a directory
if path.is_dir():
if command != "view":
print(f"The path {path} is a directory and only the `view` command can be used on directories")
sys.exit(9)
def create_file(self, path: Path, file_text: str):
if not path.parent.exists():
print(f"The parent directory {path.parent} does not exist. Please create it first.")
sys.exit(21)
self.write_file(path, file_text)
self._file_history[path].append(file_text)
print(f"File created successfully at: {path}")
def view(self, path: Path, view_range: Optional[List[int]] = None):
"""Implement the view command"""
if path.is_dir():
if view_range:
print("The `view_range` parameter is not allowed when `path` points to a directory.")
sys.exit(10)
out = subprocess.run(
rf"find {path} -maxdepth 2 -not -path '*/\.*'",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout = out.stdout.decode()
stderr = out.stderr.decode()
if not stderr:
stdout = f"Here's the files and directories up to 2 levels deep in {path}, excluding hidden items:\n{stdout}\n"
print(stdout)
return
file_content = self.read_file(path)
if view_range:
if len(view_range) != 2 or not all(isinstance(i, int) for i in view_range):
print("Invalid `view_range`. It should be a list of two integers.")
sys.exit(11)
file_lines = file_content.split("\n")
n_lines_file = len(file_lines)
init_line, final_line = view_range
if init_line < 1 or init_line > n_lines_file:
print(
f"Invalid `view_range`: {view_range}. Its first element `{init_line}` should be within the range of lines of the file: {[1, n_lines_file]}"
)
sys.exit(12)
if final_line > n_lines_file:
print(
f"Invalid `view_range`: {view_range}. Its second element `{final_line}` should be smaller than the number of lines in the file: `{n_lines_file}`"
)
sys.exit(13)
if final_line != -1 and final_line < init_line:
print(
f"Invalid `view_range`: {view_range}. Its second element `{final_line}` should be larger or equal than its first `{init_line}`"
)
sys.exit(14)
if final_line == -1:
final_line = n_lines_file
# Expand the viewport to include the whole function or class
init_line, final_line = WindowExpander(suffix=path.suffix).expand_window(
file_lines, init_line, final_line, max_added_lines=MAX_WINDOW_EXPANSION_VIEW
)
file_content = "\n".join(file_lines[init_line - 1 : final_line])
else:
if path.suffix == ".py" and len(file_content) > MAX_RESPONSE_LEN and USE_FILEMAP:
try:
filemap = Filemap().show_filemap(file_content, encoding=self._encoding or "utf-8")
except Exception:
# If we fail to show the filemap, just show the truncated file content
pass
else:
print(
"<NOTE>This file is too large to display entirely. Showing abbreviated version. "
"Please use `str_replace_editor view` with the `view_range` parameter to show selected lines next.</NOTE>"
)
filemap = maybe_truncate(filemap.expandtabs())
print(filemap)
print(
"<IMPORTANT><NOTE>The above file has been abbreviated. Please use `str_replace editor view` with `view_range` to look at relevant files in detail.</NOTE></IMPORTANT>"
)
return
# Else just show
init_line = 1
# init_line is 1-based
print(self._make_output(file_content, str(path), init_line=init_line))
def str_replace(self, path: Path, old_str: str, new_str: Optional[str]):
"""Implement the str_replace command, which replaces old_str with new_str in the file content"""
# Read the file content
file_content = self.read_file(path).expandtabs()
old_str = old_str.expandtabs()
new_str = new_str.expandtabs() if new_str is not None else ""
# Check if old_str is unique in the file
occurrences = file_content.count(old_str)
if occurrences == 0:
print(f"No replacement was performed, old_str `{old_str}` did not appear verbatim in {path}.")
sys.exit(15)
elif occurrences > 1:
file_content_lines = file_content.split("\n")
lines = [idx + 1 for idx, line in enumerate(file_content_lines) if old_str in line]
print(
f"No replacement was performed. Multiple occurrences of old_str `{old_str}` in lines {lines}. Please ensure it is unique"
)
sys.exit(16)
if new_str == old_str:
print(f"No replacement was performed, old_str `{old_str}` is the same as new_str `{new_str}`.")
sys.exit(161)
pre_edit_lint = ""
if USE_LINTER:
try:
pre_edit_lint = flake8(str(path))
except Exception as e:
print(f"Warning: Failed to run pre-edit linter on {path}: {e}")
# Replace old_str with new_str
new_file_content = file_content.replace(old_str, new_str)
# Write the new content to the file
self.write_file(path, new_file_content)
post_edit_lint = ""
if USE_LINTER:
try:
post_edit_lint = flake8(str(path))
except Exception as e:
print(f"Warning: Failed to run post-edit linter on {path}: {e}")
epilogue = ""
if post_edit_lint:
...
replacement_window_start_line = file_content.split(old_str)[0].count("\n") + 1
replacement_lines = len(new_str.split("\n"))
replacement_window_end_line = replacement_window_start_line + replacement_lines - 1
replacement_window = (replacement_window_start_line, replacement_window_end_line)
errors = format_flake8_output(
post_edit_lint,
previous_errors_string=pre_edit_lint,
replacement_window=replacement_window,
replacement_n_lines=replacement_lines,
)
if errors.strip():
epilogue = LINT_WARNING_TEMPLATE.format(errors=errors)
# Save the content to history
self._file_history[path].append(file_content)
# Create a snippet of the edited section
replacement_line = file_content.split(old_str)[0].count("\n")
start_line = max(1, replacement_line - SNIPPET_LINES)
end_line = min(replacement_line + SNIPPET_LINES + new_str.count("\n"), len(new_file_content.splitlines()))
start_line, end_line = WindowExpander(suffix=path.suffix).expand_window(
new_file_content.split("\n"), start_line, end_line, max_added_lines=MAX_WINDOW_EXPANSION_EDIT_CONFIRM
)
snippet = "\n".join(new_file_content.split("\n")[start_line - 1 : end_line])
# Prepare the success message
success_msg = f"The file {path} has been edited. "
success_msg += self._make_output(snippet, f"a snippet of {path}", start_line)
success_msg += "Review the changes and make sure they are as expected. Edit the file again if necessary."
success_msg += epilogue
print(success_msg)
def insert(self, path: Path, insert_line: int, new_str: str):
"""Implement the insert command, which inserts new_str at the specified line in the file content."""
file_text = self.read_file(path).expandtabs()
new_str = new_str.expandtabs()
file_text_lines = file_text.split("\n")
n_lines_file = len(file_text_lines)
if insert_line < 0 or insert_line > n_lines_file:
print(
f"Invalid `insert_line` parameter: {insert_line}. It should be within the range of lines of the file: {[0, n_lines_file]}"
)
sys.exit(17)
new_str_lines = new_str.split("\n")
new_file_text_lines = file_text_lines[:insert_line] + new_str_lines + file_text_lines[insert_line:]
snippet_lines = (
file_text_lines[max(0, insert_line - SNIPPET_LINES) : insert_line]
+ new_str_lines
+ file_text_lines[insert_line : insert_line + SNIPPET_LINES]
)
new_file_text = "\n".join(new_file_text_lines)
snippet = "\n".join(snippet_lines)
self.write_file(path, new_file_text)
self._file_history[path].append(file_text)
# todo: Also expand these windows
success_msg = f"The file {path} has been edited. "
success_msg += self._make_output(
snippet,
"a snippet of the edited file",
max(1, insert_line - SNIPPET_LINES + 1),
)
success_msg += "Review the changes and make sure they are as expected (correct indentation, no duplicate lines, etc). Edit the file again if necessary."
print(success_msg)
def undo_edit(self, path: Path):
"""Implement the undo_edit command."""
if not self._file_history[path]:
print(f"No edit history found for {path}.")
sys.exit(18)
old_text = self._file_history[path].pop()
self.write_file(path, old_text)
print(f"Last edit to {path} undone successfully. {self._make_output(old_text, str(path))}")
def read_file(self, path: Path):
"""Read the content of a file from a given path; raise a ToolError if an error occurs."""
encodings = [
(None, None),
("utf-8", None),
("latin-1", None),
("utf-8", "replace"),
]
exception = None
for self._encoding, errors in encodings:
try:
text = path.read_text(encoding=self._encoding, errors=errors)
except UnicodeDecodeError as e:
exception = e
else:
break
else:
print(f"Ran into UnicodeDecodeError {exception} while trying to read {path}")
sys.exit(19)
return text
def write_file(self, path: Path, file: str):
"""Write the content of a file to a given path; raise a ToolError if an error occurs."""
try:
path.write_text(file, encoding=self._encoding or "utf-8")
except Exception as e:
print(f"Ran into {e} while trying to write to {path}")
sys.exit(20)
def _make_output(
self,
file_content: str,
file_descriptor: str,
init_line: int = 1,
expand_tabs: bool = True,
):
"""Generate output for the CLI based on the content of a file."""
file_content = maybe_truncate(file_content)
if expand_tabs:
file_content = file_content.expandtabs()
file_content = "\n".join([f"{i + init_line:6}\t{line}" for i, line in enumerate(file_content.split("\n"))])
return f"Here's the result of running `cat -n` on {file_descriptor}:\n" + file_content + "\n"
def main():
parser = argparse.ArgumentParser()
parser.add_argument("command", type=str)
parser.add_argument("path", type=str)
parser.add_argument("--file_text", type=str)
parser.add_argument("--view_range", type=int, nargs=2)
parser.add_argument("--old_str", type=str)
parser.add_argument("--new_str", type=str)
parser.add_argument("--insert_line", type=int)
args = parser.parse_args()
tool = EditTool()
tool(
command=args.command,
path=args.path,
file_text=args.file_text,
view_range=args.view_range,
old_str=args.old_str,
new_str=args.new_str,
insert_line=args.insert_line,
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,56 @@
tools:
str_replace_editor:
signature: |
str_replace_editor <command> <path> [<file_text>] [<view_range>] [<old_str>] [<new_str>] [<insert_line>]
# This docstrings was taken from openhands:
# https://github.com/All-Hands-AI/OpenHands/blob/main/openhands/agenthub/codeact_agent/function_calling.py
docstring: >
Custom editing tool for viewing, creating and editing files
* State is persistent across command calls and discussions with the user
* If `path` is a file, `view` displays the result of applying `cat -n`. If `path` is a directory, `view` lists non-hidden files and directories up to 2 levels deep
* The `create` command cannot be used if the specified `path` already exists as a file
* If a `command` generates a long output, it will be truncated and marked with `<response clipped>`
* The `undo_edit` command will revert the last edit made to the file at `path`
Notes for using the `str_replace` command:
* The `old_str` parameter should match EXACTLY one or more consecutive lines from the original file. Be mindful of whitespaces!
* If the `old_str` parameter is not unique in the file, the replacement will not be performed. Make sure to include enough context in `old_str` to make it unique
* The `new_str` parameter should contain the edited lines that should replace the `old_str`
arguments:
- name: command
type: string
description: "The commands to run. Allowed options are: `view`, `create`, `str_replace`, `insert`, `undo_edit`."
required: true
enum: ["view", "create", "str_replace", "insert", "undo_edit"]
- name: path
type: string
description: "Absolute path to file or directory, e.g. `/testbed/file.py` or `/testbed`."
required: true
- name: file_text
type: string
description: "Required parameter of `create` command, with the content of the file to be created."
required: false
argument_format: "--file_text {{value}}"
- name: old_str
type: string
description: "Required parameter of `str_replace` command containing the string in `path` to replace."
required: false
argument_format: "--old_str {{value}}"
- name: new_str
type: string
description: "Optional parameter of `str_replace` command containing the new string (if not given, no string will be added). Required parameter of `insert` command containing the string to insert."
required: false
argument_format: "--new_str {{value}}"
- name: insert_line
type: integer
description: "Required parameter of `insert` command. The `new_str` will be inserted AFTER the line `insert_line` of `path`."
required: false
argument_format: "--insert_line {{value}}"
- name: view_range
type: array
items:
type: integer
description: "Optional parameter of `view` command when `path` points to a file. If none is given, the full file is shown. If provided, the file will be shown in the indicated line number range, e.g. [11, 12] will show lines 11 and 12. Indexing at 1 to start. Setting `[start_line, -1]` shows all lines from `start_line` to the end of the file."
required: false
argument_format: "--view_range {{value|join(' ')}}"
state_command: "_state_anthropic"

View File

@@ -0,0 +1,3 @@
# Ignore failures, see https://github.com/SWE-agent/SWE-agent/issues/1179
pip install 'tree-sitter==0.21.3' || true
pip install 'tree-sitter-languages' || true

View File

@@ -0,0 +1,45 @@
#!/root/miniconda3/bin/python
import argparse
import warnings
# tree_sitter is throwing a FutureWarning
warnings.simplefilter("ignore", category=FutureWarning)
from tree_sitter_languages import get_language, get_parser
parser = argparse.ArgumentParser(
description="Print the contents of a Python file, skipping lengthy function and method definitions."
)
parser.add_argument("file_path", type=str, help="The path to the file to be read")
args = parser.parse_args()
# We assume that all input files are Python.
parser = get_parser("python")
language = get_language("python")
file_contents = open(args.file_path).read()
# We assume that files are utf8 encoded.
tree = parser.parse(bytes(file_contents, "utf8"))
# See https://tree-sitter.github.io/tree-sitter/using-parsers#pattern-matching-with-queries.
query = language.query("""
(function_definition
body: (_) @body)
""")
# TODO: consider special casing docstrings such that they are not elided. This
# could be accomplished by checking whether `body.text.decode('utf8')` starts
# with `"""` or `'''`.
elide_line_ranges = [
(node.start_point[0], node.end_point[0])
for node, _ in query.captures(tree.root_node)
# Only elide if it's sufficiently long
if node.end_point[0] - node.start_point[0] >= 5
]
# Note that tree-sitter line numbers are 0-indexed, but we display 1-indexed.
elide_lines = {line for start, end in elide_line_ranges for line in range(start, end + 1)}
elide_messages = [(start, f"... eliding lines {start+1}-{end+1} ...") for start, end in elide_line_ranges]
for i, line in sorted(
elide_messages + [(i, line) for i, line in enumerate(file_contents.splitlines()) if i not in elide_lines]
):
print(f"{i+1:6d} {line}")

View File

@@ -0,0 +1,9 @@
tools:
filemap:
signature: "filemap <file_path>"
docstring: "Print the contents of a Python file, skipping lengthy function and method definitions."
arguments:
- name: file_path
type: string
description: The path to the file to be read
required: true

View File

@@ -0,0 +1,2 @@
pip install 'tree-sitter==0.21.3'
pip install 'tree-sitter-languages'

View File

@@ -0,0 +1,5 @@
main() {
echo "###SWE-AGENT-EXIT-FORFEIT###"
}
main "$@"

View File

@@ -0,0 +1,5 @@
tools:
exit_forfeit:
signature: "exit_forfeit"
docstring: "Give up on the current challenge and terminate the session."
arguments: []

View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python3
# view_image view an image file as a base64-encoded markdown image
import base64
import mimetypes
import pathlib
import sys
VALID_MIME_TYPES = {
"image/png",
"image/jpeg",
"image/webp",
}
if len(sys.argv) != 2:
sys.exit(f"usage: {pathlib.Path(sys.argv[0]).name} <image-file>")
img_path = pathlib.Path(sys.argv[1])
if not img_path.exists():
sys.exit(f"Error: File '{img_path}' does not exist")
if not img_path.is_file():
sys.exit(f"Error: '{img_path}' is not a file")
try:
mime = mimetypes.guess_type(img_path.name)[0]
if mime not in VALID_MIME_TYPES:
sys.exit(f"Error: Unsupported image type: {mime}. Valid types are: {', '.join(VALID_MIME_TYPES)}")
# read the file, base64-encode, and convert bytes → str
b64 = base64.b64encode(img_path.read_bytes()).decode("ascii")
# write the exact markdown snippet to stdout
print(f"![{img_path.as_posix()}](data:{mime};base64,{b64})")
except Exception as e:
sys.exit(f"Error processing image: {e}")

View File

@@ -0,0 +1,9 @@
tools:
view_image:
signature: "view_image <image_file>"
docstring: "view an image file"
arguments:
- name: image_file
type: string
description: "the path to the image file to view"
required: true

View File

@@ -0,0 +1,2 @@
# This will never be called, but having a file here
# ensures that chmod +x commands on the bin directory don't fail

View File

@@ -0,0 +1 @@
tools: {}

View File

@@ -0,0 +1,45 @@
#!/usr/bin/env bash
# Define variables to exclude
EXCLUDE_VARS="PWD|LANG|PYTHONPATH|ROOT|PS0|PS1|PS2|_|OLDPWD|LC_ALL|LANG|LSCOLORS|SHLVL"
echo "Original Environment Variables:"
env | sort
# Only add Python 3.11 to PATH if no python exists
if ! command -v python &> /dev/null; then
echo -e "\nNo Python found in system, adding Python 3.11 to PATH"
export PATH="/root/python3.11/bin:$PATH"
# Create python/pip aliases
ln -s "/root/python3.11/bin/python3" "/root/python3.11/bin/python"
ln -s "/root/python3.11/bin/pip3" "/root/python3.11/bin/pip"
echo "Created symlinks: python -> python3, pip -> pip3"
else
echo -e "\nPython already exists in system, skipping Python 3.11 setup"
fi
# Attempt to read and set process 1 environment
echo -e "\nSetting environment variables from /proc/1/environ..."
if [ -r "/proc/1/environ" ]; then
while IFS= read -r -d '' var; do
# Skip excluded variables
if ! echo "$var" | grep -qE "^(${EXCLUDE_VARS})="; then
# If the variable is PATH, append and deduplicate
if [[ "$var" =~ ^PATH= ]]; then
# Combine paths and remove duplicates while preserving order
export PATH="$(echo "${PATH}:${var#PATH=}" | tr ':' '\n' | awk '!seen[$0]++' | tr '\n' ':' | sed 's/:$//')"
else
export "$var"
fi
fi
done < /proc/1/environ
echo "Successfully imported environment from /proc/1/environ"
else
echo "Cannot access /proc/1/environ - Permission denied"
fi
# Print updated environment variables
echo -e "\nUpdated Environment Variables:"
env | sort

View File

@@ -0,0 +1,10 @@
#!/usr/bin/env python
import sys
from registry import registry # type: ignore
if __name__ == "__main__":
var_name = sys.argv[1]
default_value = sys.argv[2] if len(sys.argv) > 2 else ""
print(registry.get(var_name, default_value))

View File

@@ -0,0 +1,10 @@
#!/usr/bin/env python
import sys
from registry import registry # type: ignore
if __name__ == "__main__":
var_name = sys.argv[1]
var_value = sys.argv[2] if len(sys.argv) > 2 else ""
registry[var_name] = var_value

View File

@@ -0,0 +1 @@
tools: {}

View File

@@ -0,0 +1,6 @@
#!/usr/bin/env bash
# script_dir=$(dirname "$(readlink -f "$0")")
bundle_dir=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
export PYTHONPATH="$bundle_dir/lib":$PYTHONPATH

View File

View File

@@ -0,0 +1,56 @@
import json
import os
from pathlib import Path
from typing import Any, List, Optional, Tuple, Union
class EnvRegistry:
"""Read and write variables into a file. This is used to persist state between tool
calls without using environment variables (which are problematic because you cannot
set them in a subprocess).
The default file location is `/root/.swe-agent-env`, though this can be overridden
by the `env_file` argument or the `SWE_AGENT_ENV_FILE` environment variable.
"""
def __init__(self, env_file: Optional[Path] = None):
self._env_file = env_file
@property
def env_file(self) -> Path:
if self._env_file is None:
env_file = Path(os.environ.get("SWE_AGENT_ENV_FILE", "/root/.swe-agent-env"))
else:
env_file = self._env_file
if not env_file.exists():
env_file.write_text("{}")
return env_file
def __getitem__(self, key: str) -> str:
return json.loads(self.env_file.read_text())[key]
def get(self, key: str, default_value: Any = None, fallback_to_env: bool = True) -> Any:
"""Get a value from registry:
Args:
key: The key to get the value for.
default_value: The default value to return if the key is not found in the registry.
fallback_to_env: If True, fallback to environment variables if the key is not found in the registry.
If there's no environment variable, return the default value.
"""
if fallback_to_env and key in os.environ:
default_value = os.environ[key]
return json.loads(self.env_file.read_text()).get(key, default_value)
def get_if_none(self, value: Any, key: str, default_value: Any = None) -> Any:
if value is not None:
return value
return self.get(key, default_value)
def __setitem__(self, key: str, value: Any):
env = json.loads(self.env_file.read_text())
env[key] = value
self.env_file.write_text(json.dumps(env))
registry = EnvRegistry()

View File

@@ -0,0 +1,6 @@
# Review on submit.
Provides an alternative for `submit` that does not immediately submit, but asks the
agent to perform additional reviewing steps.
Only `submit -f` will trigger the real submit.

View File

@@ -0,0 +1,54 @@
#!/usr/bin/env python3
import argparse
from pathlib import Path
import subprocess
import sys
import os
import io
from registry import registry
def main() -> None:
parser = argparse.ArgumentParser(description="Submit changes for review")
parser.add_argument("-f", "--force", action="store_true", help="Force submit without review")
args = parser.parse_args()
repo_root = registry.get("ROOT", os.getenv("ROOT"))
assert repo_root
patch_path = Path("/root/model.patch")
subprocess.run(
f"git add -A && git diff --cached > {patch_path}",
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
cwd=repo_root,
)
patch = patch_path.read_text(errors="backslashreplace")
submit_review_messages = registry.get("SUBMIT_REVIEW_MESSAGES", [])
n_stages = len(submit_review_messages)
current_stage = registry.get("SUBMIT_STAGE", 0)
if not args.force and current_stage != n_stages:
message = submit_review_messages[current_stage]
message = message.replace("{{diff}}", patch)
message = message.replace("{{problem_statement}}", registry.get("PROBLEM_STATEMENT", ""))
registry["SUBMIT_STAGE"] = current_stage + 1
print(message)
sys.exit(0)
print("<<SWE_AGENT_SUBMISSION>>")
print(patch)
print("<<SWE_AGENT_SUBMISSION>>")
if __name__ == "__main__":
# There are some super strange "ascii can't decode x" errors when printing to the terminal
# that can be solved with setting the default encoding for stdout
# (note that python3.6 doesn't have the reconfigure method)
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
main()

View File

@@ -0,0 +1,6 @@
tools:
submit:
signature: "submit"
docstring: "submits the current file"
# Do not actually show the -f argument to the model, only
# use it from the agent for submission after error

View File

@@ -0,0 +1,31 @@
main() {
if [ $# -eq 1 ]; then
local file_name="$1"
local dir="./"
elif [ $# -eq 2 ]; then
local file_name="$1"
if [ -d "$2" ]; then
local dir="$2"
else
echo "Directory $2 not found"
return
fi
else
echo "Usage: find_file <file_name> [<dir>]"
return
fi
dir=$(realpath "$dir")
local matches=$(find "$dir" -type f -name "$file_name")
# if no matches, return
if [ -z "$matches" ]; then
echo "No matches found for \"$file_name\" in $dir"
return
fi
# Calculate total number of matches
local num_matches=$(echo "$matches" | wc -l | awk '{$1=$1; print $0}')
echo "Found $num_matches matches for \"$file_name\" in $dir:"
echo "$matches" | awk '{print $0}'
}
main "$@"

View File

@@ -0,0 +1,39 @@
main() {
if [ $# -eq 1 ]; then
local search_term="$1"
local dir="./"
elif [ $# -eq 2 ]; then
local search_term="$1"
if [ -d "$2" ]; then
local dir="$2"
else
echo "Directory $2 not found"
return
fi
else
echo "Usage: search_dir <search_term> [<dir>]"
return
fi
dir=$(realpath "$dir")
local matches=$(find "$dir" -type f ! -path '*/.*' -exec grep -nIH -- "$search_term" {} + | cut -d: -f1 | sort | uniq -c)
# if no matches, return
if [ -z "$matches" ]; then
echo "No matches found for \"$search_term\" in $dir"
return
fi
# Calculate total number of matches
local num_matches=$(echo "$matches" | awk '{sum+=$1} END {print sum}')
# calculate total number of files matched
local num_files=$(echo "$matches" | wc -l | awk '{$1=$1; print $0}')
# if num_files is > 100, print an error
if [ $num_files -gt 100 ]; then
echo "More than $num_files files matched for \"$search_term\" in $dir. Please narrow your search."
return
fi
echo "Found $num_matches matches for \"$search_term\" in $dir:"
echo "$matches" | awk '{$2=$2; gsub(/^\.+\/+/, "./", $2); print $2 " ("$1" matches)"}'
echo "End of matches for \"$search_term\" in $dir"
}
main "$@"

View File

@@ -0,0 +1,55 @@
main() {
# Check if the first argument is provided
local search_term="${1:-}"
if [ -z "${search_term}" ]; then
echo "Usage: search_file <search_term> [<file>]"
return
fi
# Check if the second argument is provided
if [ $# -ge 2 ]; then
# Check if the provided argument is a valid file
if [ -f "$2" ]; then
local file="$2" # Set file if valid
else
echo "Usage: search_file <search_term> [<file>]"
echo "Error: File name $2 not found. Please provide a valid file name."
return # Exit if the file is not valid
fi
else
local CURRENT_FILE=$(_read_env CURRENT_FILE)
# Check if a file is open
if [ -z "${CURRENT_FILE:-}" ]; then
echo "No file open. Use the open command first."
return # Exit if no file is open
fi
local file="$CURRENT_FILE" # Set file to the current open file
fi
local search_term="$1"
file=$(realpath "$file")
# Use grep to directly get the desired formatted output
local matches=$(grep -nH -- "$search_term" "$file")
# Check if no matches were found
if [ -z "${matches:-}" ]; then
echo "No matches found for \"$search_term\" in $file"
return
fi
# Calculate total number of matches
local num_matches=$(echo "$matches" | wc -l | awk '{$1=$1; print $0}')
# calculate total number of lines matched
local num_lines=$(echo "$matches" | cut -d: -f1 | sort | uniq | wc -l | awk '{$1=$1; print $0}')
# if num_lines is > 100, print an error
if [ $num_lines -gt 100 ]; then
echo "More than $num_lines lines matched for \"$search_term\" in $file. Please narrow your search."
return
fi
# Print the total number of matches and the matches themselves
echo "Found $num_matches matches for \"$search_term\" in $file:"
echo "$matches" | cut -d: -f1-2 | sort -u -t: -k2,2n | while IFS=: read -r filename line_number; do
echo "Line $line_number:$(sed -n "${line_number}p" "$file")"
done
echo "End of matches for \"$search_term\" in $file"
}
main "$@"

View File

@@ -0,0 +1,37 @@
tools:
find_file:
signature: "find_file <file_name> [<dir>]"
docstring: "finds all files with the given name or pattern in dir. If dir is not provided, searches in the current directory"
arguments:
- name: file_name
type: string
description: "the name of the file or pattern to search for. supports shell-style wildcards (e.g. *.py)"
required: true
- name: dir
type: string
description: "the directory to search in (if not provided, searches in the current directory)"
required: false
search_dir:
signature: "search_dir <search_term> [<dir>]"
docstring: "searches for search_term in all files in dir. If dir is not provided, searches in the current directory"
arguments:
- name: search_term
type: string
description: "the term to search for"
required: true
- name: dir
type: string
description: "the directory to search in (if not provided, searches in the current directory)"
required: false
search_file:
signature: "search_file <search_term> [<file>]"
docstring: "searches for search_term in file. If file is not provided, searches in the current open file"
arguments:
- name: search_term
type: string
description: "the term to search for"
required: true
- name: file
type: string
description: "the file to search in (if not provided, searches in the current open file)"
required: false

View File

@@ -0,0 +1,3 @@
_write_env SEARCH_RESULTS "()"
_write_env SEARCH_FILES "()"
_write_env SEARCH_INDEX 0

View File

@@ -0,0 +1,17 @@
main() {
cd $ROOT
# Check if the patch file exists and is non-empty
if [ -s "/root/test.patch" ]; then
# Apply the patch in reverse
git apply -R < "/root/test.patch"
fi
git add -A
git diff --cached > /root/model.patch
echo "<<SWE_AGENT_SUBMISSION>>"
cat /root/model.patch
echo "<<SWE_AGENT_SUBMISSION>>"
}
main "$@"

View File

@@ -0,0 +1,5 @@
tools:
submit:
signature: "submit"
docstring: "submits the current file"
arguments: []

View File

@@ -0,0 +1,41 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
_autosave_screenshot_from_response,
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def click(x, y, button):
"""Click at the specified coordinates (x, y)."""
response = send_request(
config.port,
"click",
"POST",
{"x": x, "y": y, "button": button, "return_screenshot": config.autoscreenshot},
)
if response is None:
return
_print_response_with_metadata(response)
_autosave_screenshot_from_response(response, config.screenshot_mode)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("x", type=int)
parser.add_argument("y", type=int)
parser.add_argument("button", nargs="?", default="left", choices=["left", "right"])
args = parser.parse_args()
click(args.x, args.y, args.button)

View File

@@ -0,0 +1,28 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import _print_response_with_metadata, send_request
config = ClientConfig()
def close():
"""Close the currently open window."""
response = send_request(config.port, "close", "POST")
if response is None:
return
_print_response_with_metadata(response)
if __name__ == "__main__":
parser = ArgumentParser()
args = parser.parse_args()
close()

View File

@@ -0,0 +1,37 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
_autosave_screenshot_from_response,
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def double_click(x, y):
"""Double-click at the specified coordinates (x, y)."""
response = send_request(
config.port, "double_click", "POST", {"x": x, "y": y, "return_screenshot": config.autoscreenshot}
)
if response is None:
return
_print_response_with_metadata(response)
_autosave_screenshot_from_response(response, config.screenshot_mode)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("x", type=int)
parser.add_argument("y", type=int)
args = parser.parse_args()
double_click(args.x, args.y)

View File

@@ -0,0 +1,46 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
_autosave_screenshot_from_response,
_print_error,
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def drag(path):
"""Drag mouse along a path. Path should be a JSON list of x, y lists: '[[0, 0], [100, 100]]'."""
import json
try:
path_data = json.loads(path)
except json.JSONDecodeError:
_print_error("Path must be valid JSON")
return
response = send_request(
config.port, "drag", "POST", {"path": path_data, "return_screenshot": config.autoscreenshot}
)
if response is None:
return
_print_response_with_metadata(response)
_autosave_screenshot_from_response(response, config.screenshot_mode)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument(
"path", type=str, help="The path to drag the mouse along (JSON list of x, y lists) e.g. '[[0, 0], [100, 100]]'"
)
args = parser.parse_args()
drag(args.path)

View File

@@ -0,0 +1,39 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
_autosave_screenshot_from_response,
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def execute_script(script):
"""Execute a custom JavaScript code snippet on the current page."""
response = send_request(
config.port,
"execute_script",
"POST",
{"script": script, "return_screenshot": config.autoscreenshot},
)
if response is None:
return
_print_response_with_metadata(response)
_autosave_screenshot_from_response(response, config.screenshot_mode)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("script", type=str, help="The JavaScript code snippet to execute")
args = parser.parse_args()
execute_script(args.script)

View File

@@ -0,0 +1,48 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def get_console_output():
"""Get console output from the browser."""
response = send_request(config.port, "console", "GET", {})
if response is None:
return
_print_response_with_metadata(response)
if "console_messages" in response:
console_messages = response["console_messages"]
if console_messages:
print("\n--- Console Messages ---")
for ix, msg in enumerate(console_messages):
# timestamp = msg.get("timestamp", 0)
msg_type = msg.get("type", "log")
text = msg.get("text", "")
location = msg.get("location", {})
print(f"[{ix + 1}] {msg_type.upper()}: {text}")
if location:
url = location.get("url", "")
line_number = location.get("lineNumber", "")
column_number = location.get("columnNumber", "")
if url:
print(f" Location: {url}:{line_number}:{column_number}")
print("Console buffer cleared.")
if __name__ == "__main__":
parser = ArgumentParser()
args = parser.parse_args()
get_console_output()

View File

@@ -0,0 +1,35 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
_autosave_screenshot_from_response,
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def move(x, y):
"""Move mouse to the specified coordinates (x, y)."""
response = send_request(config.port, "move", "POST", {"x": x, "y": y, "return_screenshot": config.autoscreenshot})
if response is None:
return
_print_response_with_metadata(response)
_autosave_screenshot_from_response(response, config.screenshot_mode)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("x", type=int)
parser.add_argument("y", type=int)
args = parser.parse_args()
move(args.x, args.y)

View File

@@ -0,0 +1,33 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
_autosave_screenshot_from_response,
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def back():
"""Navigate back in the browser history."""
response = send_request(config.port, "back", "POST", {"return_screenshot": config.autoscreenshot})
if response is None:
return
_print_response_with_metadata(response)
_autosave_screenshot_from_response(response, config.screenshot_mode)
if __name__ == "__main__":
parser = ArgumentParser()
args = parser.parse_args()
back()

View File

@@ -0,0 +1,33 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
_autosave_screenshot_from_response,
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def forward():
"""Navigate forward in the browser history."""
response = send_request(config.port, "forward", "POST", {"return_screenshot": config.autoscreenshot})
if response is None:
return
_print_response_with_metadata(response)
_autosave_screenshot_from_response(response, config.screenshot_mode)
if __name__ == "__main__":
parser = ArgumentParser()
args = parser.parse_args()
forward()

View File

@@ -0,0 +1,36 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
_autosave_screenshot_from_response,
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def open(url):
"""Open the specified website URL."""
if Path(url).is_file():
url = f"file://{Path(url).resolve()}"
response = send_request(config.port, "goto", "POST", {"url": url, "return_screenshot": config.autoscreenshot})
if response is None:
return
_print_response_with_metadata(response)
_autosave_screenshot_from_response(response, config.screenshot_mode)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("url", type=str, help="The URL of the website to open")
args = parser.parse_args()
open(args.url)

View File

@@ -0,0 +1,51 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
_autosave_screenshot_from_response,
_print_error,
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def keypress(keys):
"""Press the specified keys. Keys should be a JSON string like '["ctrl", "c"]'."""
import json
try:
keys_data = json.loads(keys)
except json.JSONDecodeError:
_print_error("Keys must be valid JSON")
return
response = send_request(
config.port,
"keypress",
"POST",
{"keys": keys_data, "return_screenshot": config.autoscreenshot},
)
if response is None:
return
_print_response_with_metadata(response)
_autosave_screenshot_from_response(response, config.screenshot_mode)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument(
"keys",
type=str,
help='The keys to press (JSON string like \'["ctrl", "c"]\')',
)
args = parser.parse_args()
keypress(args.keys)

View File

@@ -0,0 +1,33 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
_autosave_screenshot_from_response,
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def reload():
"""Reload the current webpage."""
response = send_request(config.port, "reload", "POST", {"return_screenshot": config.autoscreenshot})
if response is None:
return
_print_response_with_metadata(response)
_autosave_screenshot_from_response(response, config.screenshot_mode)
if __name__ == "__main__":
parser = ArgumentParser()
args = parser.parse_args()
reload()

View File

@@ -0,0 +1,394 @@
#!/root/python3.11/bin/python3
"""Web Browser server Flask + Playwright backend."""
from __future__ import annotations
import atexit
import functools
import signal
import sys
import time
from pathlib import Path
from typing import Any
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from browser_manager import BrowserManager
from flask import Flask, Response, jsonify, request
from web_browser_config import ServerConfig
from web_browser_utils import catch_error, normalize_url, validate_request
config = ServerConfig()
browser_manager = BrowserManager()
def cleanup_on_exit():
"""Cleanup function for atexit and signal handlers."""
browser_manager.cleanup()
def signal_handler(signum, frame):
"""Handle shutdown signals."""
print(f"\nReceived signal {signum}, shutting down...")
cleanup_on_exit()
sys.exit(0)
# register cleanup handlers
atexit.register(cleanup_on_exit)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
app = Flask(__name__)
def require_website_open(func):
"""Decorator to ensure a website is open before executing the route."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
if not browser_manager.is_website_open():
return create_response({"status": "error", "message": "Please open a website first."}, False)
return func(*args, **kwargs)
return wrapper
def _get_response_metadata() -> dict[str, Any]:
"""Gather comprehensive metadata for API responses."""
metadata = {}
if browser_manager.is_website_open():
metadata["Browser Type"] = browser_manager.browser_name
metadata["Mouse Position"] = f"({browser_manager.mouse_x},{browser_manager.mouse_y})"
metadata["Viewport Size"] = f"{browser_manager.window_width}×{browser_manager.window_height}"
with browser_manager._browser_lock() as page:
metadata["Current URL"] = page.url
metadata["Page Title"] = page.title()
scroll_info = page.evaluate("""() => ({
scroll_position: { x: window.pageXOffset || document.documentElement.scrollLeft, y: window.pageYOffset || document.documentElement.scrollTop },
page_dimensions: { width: document.documentElement.scrollWidth, height: document.documentElement.scrollHeight },
visible_dimensions: { width: window.innerWidth, height: window.innerHeight }
})""")
scroll_pos = scroll_info["scroll_position"]
page_dims = scroll_info["page_dimensions"]
visible_dims = scroll_info["visible_dimensions"]
metadata["Scroll Position"] = f"({scroll_pos['x']},{scroll_pos['y']})"
metadata["Page Dimensions"] = f"{page_dims['width']}×{page_dims['height']}"
metadata["Visible Dimensions"] = f"{visible_dims['width']}×{visible_dims['height']}"
return metadata
def create_response(data: dict[str, Any], return_screenshot: bool) -> Response:
"""Create a JSON response with comprehensive metadata and optional screenshot."""
response_data = {**data}
if return_screenshot:
response_data.update(browser_manager.take_screenshot())
if "metadata" not in response_data:
response_data["metadata"] = {}
response_data["metadata"].update(_get_response_metadata())
return jsonify(response_data)
@app.route("/info", methods=["GET"])
@catch_error
def info():
if not browser_manager.is_website_open():
return create_response({"status": "success", "message": "No page open"}, False)
data = {
"status": "success",
"message": "Loaded info for current page",
}
return create_response(data, False)
@app.route("/close", methods=["POST"])
@catch_error
def close_browser():
browser_manager.cleanup()
browser_manager._init_browser()
return create_response({"status": "success", "message": "Closed browser"}, False)
@app.route("/set_window_size", methods=["POST"])
@validate_request("width", "height", "return_screenshot")
@require_website_open
@catch_error
def set_window_size():
width, height = request.json["width"], request.json["height"]
return_screenshot = request.json["return_screenshot"]
if width <= 0 or height <= 0:
return create_response(
{"status": "error", "message": f"Invalid dimensions ({width},{height}). Must be positive"}, False
)
with browser_manager._browser_lock() as page:
page.set_viewport_size({"width": width, "height": height})
browser_manager.window_width = width
browser_manager.window_height = height
browser_manager.constrain_mouse_position(page)
data = {"status": "success", "message": f"Set viewport to {width}×{height}"}
return create_response(data, return_screenshot)
@app.route("/screenshot", methods=["GET"])
@require_website_open
def screenshot():
data = {"status": "success", "message": "Screenshot"}
return create_response(data, True)
@app.route("/click", methods=["POST"])
@validate_request("x", "y", "button", "return_screenshot")
@catch_error
@require_website_open
def click():
x, y = round(request.json["x"]), round(request.json["y"])
button = request.json["button"]
return_screenshot = request.json["return_screenshot"]
x_valid, y_valid = browser_manager.validate_coordinates(x, y)
if not x_valid or not y_valid:
return create_response(
{
"status": "error",
"message": f"Invalid coordinates ({x},{y}). Must be within {browser_manager.window_width}x{browser_manager.window_height}",
},
False,
)
with browser_manager._browser_lock() as page:
page.mouse.click(x, y, button=button)
browser_manager.mouse_x, browser_manager.mouse_y = x, y
data = {"status": "success", "message": f"Clicked '{button}' at ({x},{y})"}
return create_response(data, return_screenshot)
@app.route("/double_click", methods=["POST"])
@validate_request("x", "y", "return_screenshot")
@catch_error
@require_website_open
def double_click():
x, y = round(request.json["x"]), round(request.json["y"])
return_screenshot = request.json["return_screenshot"]
x_valid, y_valid = browser_manager.validate_coordinates(x, y)
if not x_valid or not y_valid:
return create_response(
{
"status": "error",
"message": f"Invalid coordinates ({x},{y}). Must be within {browser_manager.window_width}x{browser_manager.window_height}",
},
False,
)
with browser_manager._browser_lock() as page:
page.mouse.dblclick(x, y)
browser_manager.mouse_x, browser_manager.mouse_y = x, y
data = {"status": "success", "message": f"Doubleclicked at ({x},{y})"}
return create_response(data, return_screenshot)
@app.route("/move", methods=["POST"])
@validate_request("x", "y", "return_screenshot")
@catch_error
@require_website_open
def move():
x, y = request.json["x"], request.json["y"]
return_screenshot = request.json["return_screenshot"]
x_valid, y_valid = browser_manager.validate_coordinates(x, y)
if not x_valid or not y_valid:
return create_response(
{
"status": "error",
"message": f"Invalid coordinates ({x},{y}). Must be within {browser_manager.window_width}x{browser_manager.window_height}",
},
False,
)
with browser_manager._browser_lock() as page:
page.mouse.move(x, y)
browser_manager.mouse_x, browser_manager.mouse_y = x, y
data = {"status": "success", "message": f"Moved mouse to ({x},{y})"}
return create_response(data, return_screenshot)
@app.route("/drag", methods=["POST"])
@validate_request("path", "return_screenshot")
@catch_error
@require_website_open
def drag():
path: list[list[int]] = request.json["path"]
return_screenshot = request.json["return_screenshot"]
if not path or len(path) < 2:
return create_response({"status": "error", "message": "Path needs at least two points"}, False)
for ix, point in enumerate(path):
if len(point) != 2:
return create_response(
{"status": "error", "message": f"Path point {ix} must have exactly 2 coordinates"}, False
)
x, y = point
x_valid, y_valid = browser_manager.validate_coordinates(x, y)
if not x_valid or not y_valid:
return create_response(
{
"status": "error",
"message": f"Invalid coordinates ({x},{y}) at path point {ix}. Must be within {browser_manager.window_width}x{browser_manager.window_height}",
},
False,
)
with browser_manager._browser_lock() as page:
page.mouse.move(*path[0])
page.mouse.down()
for x, y in path[1:]:
page.mouse.move(x, y)
page.mouse.up()
browser_manager.mouse_x, browser_manager.mouse_y = path[-1]
data = {"status": "success", "message": "Dragged the mouse along the path"}
return create_response(data, return_screenshot)
@app.route("/type", methods=["POST"])
@validate_request("text", "return_screenshot")
@catch_error
@require_website_open
def type_():
text = request.json["text"]
return_screenshot = request.json["return_screenshot"]
with browser_manager._browser_lock() as page:
page.keyboard.type(text)
data = {"status": "success", "message": f"Typed '{text}'"}
return create_response(data, return_screenshot)
@app.route("/scroll", methods=["POST"])
@validate_request("scroll_x", "scroll_y", "return_screenshot")
@catch_error
@require_website_open
def scroll():
delta_x, delta_y = request.json["scroll_x"], request.json["scroll_y"]
return_screenshot = request.json["return_screenshot"]
with browser_manager._browser_lock() as page:
page.mouse.wheel(delta_x, delta_y)
data = {"status": "success", "message": f"Scrolled by ({delta_x},{delta_y})"}
return create_response(data, return_screenshot)
@app.route("/execute_script", methods=["POST"])
@validate_request("script", "return_screenshot")
@catch_error
@require_website_open
def exec_script():
script = request.json["script"]
return_screenshot = request.json["return_screenshot"]
with browser_manager._browser_lock() as page:
result = page.evaluate(script)
data = {"status": "success", "message": (f"Script executed.\n<script_result>\n{result}\n</script_result>")}
return create_response(data, return_screenshot)
@app.route("/back", methods=["POST"])
@validate_request("return_screenshot")
@catch_error
@require_website_open
def back():
return_screenshot = request.json["return_screenshot"]
with browser_manager._browser_lock() as page:
page.go_back()
data = {"status": "success", "message": "Navigated back"}
return create_response(data, return_screenshot)
@app.route("/forward", methods=["POST"])
@validate_request("return_screenshot")
@catch_error
@require_website_open
def forward():
return_screenshot = request.json["return_screenshot"]
with browser_manager._browser_lock() as page:
page.go_forward()
data = {"status": "success", "message": "Navigated forward"}
return create_response(data, return_screenshot)
@app.route("/reload", methods=["POST"])
@validate_request("return_screenshot")
@catch_error
@require_website_open
def reload():
return_screenshot = request.json["return_screenshot"]
with browser_manager._browser_lock() as page:
page.reload()
data = {"status": "success", "message": "Reloaded the page"}
return create_response(data, return_screenshot)
@app.route("/wait", methods=["POST"])
@validate_request("ms", "return_screenshot")
@catch_error
@require_website_open
def wait():
milliseconds = request.json["ms"]
return_screenshot = request.json["return_screenshot"]
time.sleep(milliseconds / 1000.0)
data = {"status": "success", "message": f"Waited {milliseconds} ms"}
return create_response(data, return_screenshot)
@app.route("/keypress", methods=["POST"])
@validate_request("keys", "return_screenshot")
@catch_error
@require_website_open
def keypress():
keys: list[str] = request.json["keys"]
return_screenshot = request.json["return_screenshot"]
if not isinstance(keys, list):
return create_response({"status": "error", "message": "Keys must be a list"}, False)
if not keys:
return create_response({"status": "error", "message": "Keys list empty"}, False)
with browser_manager._browser_lock() as _:
for key in keys[:-1]:
browser_manager.key_down(key)
browser_manager.key_press(keys[-1])
for key in reversed(keys[:-1]):
browser_manager.key_up(key)
data = {"status": "success", "message": f"Pressed keys {keys}"}
return create_response(data, return_screenshot)
@app.route("/goto", methods=["POST"])
@validate_request("url", "return_screenshot")
@catch_error
def goto():
url = normalize_url(request.json["url"])
return_screenshot = request.json["return_screenshot"]
with browser_manager._browser_lock() as page:
page.goto(url, wait_until="load")
data = {"status": "success", "message": f"Navigated to {url}"}
return create_response(data, return_screenshot)
@app.route("/console", methods=["GET"])
@catch_error
@require_website_open
def get_console():
console_messages = browser_manager.get_console_output()
data = {
"status": "success",
"message": f"Retrieved {len(console_messages)} console messages",
"console_messages": console_messages,
}
return create_response(data, False)
def main():
"""Run the Flask server with proper cleanup handling."""
try:
app.run(host="0.0.0.0", port=config.port, threaded=False, use_reloader=False)
except KeyboardInterrupt:
print("\nShutting down gracefully...")
finally:
cleanup_on_exit()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,38 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
ScreenshotMode,
_handle_screenshot,
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def screenshot():
"""Capture a screenshot and handle it according to the default config.screenshot_mode."""
response = send_request(config.port, "screenshot", "GET")
if response is None:
return
screenshot_data = response["screenshot"]
_print_response_with_metadata(response)
if config.screenshot_mode == ScreenshotMode.SAVE:
_handle_screenshot(screenshot_data, ScreenshotMode.SAVE)
else:
_handle_screenshot(screenshot_data, ScreenshotMode.PRINT)
if __name__ == "__main__":
parser = ArgumentParser()
args = parser.parse_args()
screenshot()

View File

@@ -0,0 +1,40 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
_autosave_screenshot_from_response,
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def scroll(scroll_x, scroll_y):
"""Scroll by (scroll_x, scroll_y) pixels at current mouse position."""
response = send_request(
config.port,
"scroll",
"POST",
{"scroll_x": scroll_x, "scroll_y": scroll_y, "return_screenshot": config.autoscreenshot},
)
if response is None:
return
_print_response_with_metadata(response)
_autosave_screenshot_from_response(response, config.screenshot_mode)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("scroll_x", type=int, help="The number of pixels to scroll horizontally")
parser.add_argument("scroll_y", type=int, help="The number of pixels to scroll vertically")
args = parser.parse_args()
scroll(args.scroll_x, args.scroll_y)

View File

@@ -0,0 +1,40 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
_autosave_screenshot_from_response,
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def set_window_size(width, height):
"""Set the browser window size to the specified width and height."""
response = send_request(
config.port,
"set_window_size",
"POST",
{"width": width, "height": height, "return_screenshot": config.autoscreenshot},
)
if response is None:
return
_print_response_with_metadata(response)
_autosave_screenshot_from_response(response, config.screenshot_mode)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("width", type=int, help="The new width of the browser window")
parser.add_argument("height", type=int, help="The new height of the browser window")
args = parser.parse_args()
set_window_size(args.width, args.height)

View File

@@ -0,0 +1,34 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
_autosave_screenshot_from_response,
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def type(text):
"""Type the given text at the current cursor position."""
response = send_request(config.port, "type", "POST", {"text": text, "return_screenshot": config.autoscreenshot})
if response is None:
return
_print_response_with_metadata(response)
_autosave_screenshot_from_response(response, config.screenshot_mode)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("text", type=str, help="The text to type at the currently focused element")
args = parser.parse_args()
type(args.text)

View File

@@ -0,0 +1,39 @@
#!/root/python3.11/bin/python3
from __future__ import annotations
import sys
from argparse import ArgumentParser
from pathlib import Path
lib_path = str(Path(__file__).resolve().parent.parent / "lib")
sys.path.insert(0, lib_path)
from web_browser_config import ClientConfig
from web_browser_utils import (
_autosave_screenshot_from_response,
_print_response_with_metadata,
send_request,
)
config = ClientConfig()
def wait(ms):
"""Wait for the specified number of milliseconds."""
response = send_request(
config.port,
"wait",
"POST",
{"ms": ms, "return_screenshot": config.autoscreenshot},
)
if response is None:
return
_print_response_with_metadata(response)
_autosave_screenshot_from_response(response, config.screenshot_mode)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("ms", type=int, help="The number of milliseconds to wait")
args = parser.parse_args()
wait(args.ms)

View File

@@ -0,0 +1,155 @@
tools:
open_site:
signature: "open_site <url>"
docstring: "Open the specified website URL or local file path"
arguments:
- name: url
type: string
description: "The URL to open (can be a web URL or file path)"
required: true
close_site:
signature: "close_site"
docstring: "Close the currently open browser window"
arguments: []
screenshot_site:
signature: "screenshot_site"
docstring: "Take a screenshot of the current page"
arguments: []
click_mouse:
signature: "click_mouse <x> <y> [<button>]"
docstring: "Click at the specified coordinates (shown as a red crosshair) on the current page"
arguments:
- name: x
type: integer
description: "X coordinate"
required: true
- name: y
type: integer
description: "Y coordinate"
required: true
- name: button
type: string
description: "Mouse button to click (left or right, default: left)"
required: false
enum: ["left", "right"]
double_click_mouse:
signature: "double_click_mouse <x> <y>"
docstring: "Double-click at the specified coordinates (shown as a red crosshair) on the current page"
arguments:
- name: x
type: integer
description: "X coordinate"
required: true
- name: y
type: integer
description: "Y coordinate"
required: true
move_mouse:
signature: "move_mouse <x> <y>"
docstring: "Move mouse to the specified coordinates (shown as a red crosshair) on the current page"
arguments:
- name: x
type: integer
description: "X coordinate"
required: true
- name: y
type: integer
description: "Y coordinate"
required: true
drag_mouse:
signature: "drag_mouse <path>"
docstring: "Drag mouse along a path (JSON format: [[x1,y1],[x2,y2],...]) on the current page"
arguments:
- name: path
type: string
description: "JSON array of coordinate pairs for the drag path (e.g., '[[0,0],[100,100]]')"
required: true
type_text:
signature: "type_text <text>"
docstring: "Type the given text at the current focused element on the current page"
arguments:
- name: text
type: string
description: "Text to type"
required: true
scroll_on_page:
signature: "scroll_on_page <scroll_x> <scroll_y>"
docstring: "Scroll by the specified number of pixels on the current page"
arguments:
- name: scroll_x
type: integer
description: "Horizontal scroll amount in pixels"
required: true
- name: scroll_y
type: integer
description: "Vertical scroll amount in pixels"
required: true
execute_script_on_page:
signature: "execute_script_on_page <script>"
docstring: "Execute a custom JavaScript code snippet on the current page"
arguments:
- name: script
type: string
description: "JavaScript code to execute"
required: true
navigate_back:
signature: "navigate_back"
docstring: "Navigate back in the browser history"
arguments: []
navigate_forward:
signature: "navigate_forward"
docstring: "Navigate forward in the browser history"
arguments: []
reload_page:
signature: "reload_page"
docstring: "Reload the current webpage"
arguments: []
wait_time:
signature: "wait_time <ms>"
docstring: "Wait for the specified number of milliseconds"
arguments:
- name: ms
type: integer
description: "Time to wait in milliseconds"
required: true
press_keys_on_page:
signature: "press_keys_on_page <keys>"
docstring: "Press the specified keys (JSON format: [\"key1\", \"key2\"]) on the current page"
arguments:
- name: keys
type: string
description: "JSON array of keys to press (e.g., '[\"ctrl\", \"c\"]')"
required: true
set_browser_window_size:
signature: "set_browser_window_size <width> <height>"
docstring: "Set the browser window size to the specified dimensions"
arguments:
- name: width
type: integer
description: "Window width in pixels"
required: true
- name: height
type: integer
description: "Window height in pixels"
required: true
get_console_output:
signature: "get_console_output"
docstring: "Get console output messages from the browser (logs, errors, warnings, etc.)"
arguments: []

View File

@@ -0,0 +1,22 @@
#!/usr/bin/env bash
/root/python3.11/bin/python3 -m pip install flask requests playwright
/root/python3.11/bin/python3 -m playwright install-deps chromium
if [ -f /usr/bin/google-chrome ]; then
export WEB_BROWSER_CHROMIUM_EXECUTABLE_PATH=/usr/bin/google-chrome
elif [ -f /usr/bin/chromium ]; then
export WEB_BROWSER_CHROMIUM_EXECUTABLE_PATH=/usr/bin/chromium
elif [ -f /usr/bin/google-chrome-stable ]; then
export WEB_BROWSER_CHROMIUM_EXECUTABLE_PATH=/usr/bin/google-chrome-stable
else
/root/python3.11/bin/python3 -m playwright install chromium
fi
export WEB_BROWSER_SCREENSHOT_MODE=print
export WEB_BROWSER_PORT=19321
mkdir -p /root/.web_browser_logs
run_web_browser_server &> /root/.web_browser_logs/web-browser-server.log &

View File

@@ -0,0 +1,326 @@
from __future__ import annotations
import base64
import contextlib
from pathlib import Path
import threading
import time
from typing import Any
from playwright.sync_api import Browser, Page, Playwright, sync_playwright
from web_browser_config import ServerConfig
config = ServerConfig()
SUPPORTED_BROWSERS = {"chromium", "firefox"}
CROSSHAIR_JS = """
([x, y, id]) => {
const size = 20;
const thickness = 3;
const hId = id + '_h';
const vId = id + '_v';
const createLine = (elementId, styles) => {
let line = document.getElementById(elementId);
if (!line) {
line = document.createElement('div');
line.id = elementId;
Object.assign(line.style, {
position: 'fixed',
pointerEvents: 'none',
zIndex: '2147483647',
backgroundColor: 'red',
boxSizing: 'border-box',
margin: '0',
padding: '0',
border: 'none',
outline: 'none',
transform: 'translateZ(0)',
...styles
});
document.body.appendChild(line);
}
return line;
};
const hLine = createLine(hId, {
width: `${size}px`,
height: `${thickness}px`,
left: `${Math.round(x - size / 2)}px`,
top: `${Math.round(y - thickness / 2)}px`
});
const vLine = createLine(vId, {
width: `${thickness}px`,
height: `${size}px`,
left: `${Math.round(x - thickness / 2)}px`,
top: `${Math.round(y - size / 2)}px`
});
}
"""
REMOVE_CROSSHAIR_JS = """
(id) => {
const hEl = document.getElementById(id + '_h');
const vEl = document.getElementById(id + '_v');
if (hEl) hEl.remove();
if (vEl) vEl.remove();
}
"""
KEY_MAP = {
# Function keys
"f1": "F1", "f2": "F2", "f3": "F3", "f4": "F4", "f5": "F5", "f6": "F6",
"f7": "F7", "f8": "F8", "f9": "F9", "f10": "F10", "f11": "F11", "f12": "F12",
# Number keys (top row)
"0": "Digit0", "1": "Digit1", "2": "Digit2", "3": "Digit3", "4": "Digit4",
"5": "Digit5", "6": "Digit6", "7": "Digit7", "8": "Digit8", "9": "Digit9",
# Letter keys
"a": "KeyA", "b": "KeyB", "c": "KeyC", "d": "KeyD", "e": "KeyE", "f": "KeyF",
"g": "KeyG", "h": "KeyH", "i": "KeyI", "j": "KeyJ", "k": "KeyK", "l": "KeyL",
"m": "KeyM", "n": "KeyN", "o": "KeyO", "p": "KeyP", "q": "KeyQ", "r": "KeyR",
"s": "KeyS", "t": "KeyT", "u": "KeyU", "v": "KeyV", "w": "KeyW", "x": "KeyX",
"y": "KeyY", "z": "KeyZ",
# Arrow keys
"up": "ArrowUp", "down": "ArrowDown", "left": "ArrowLeft", "right": "ArrowRight",
"arrow_up": "ArrowUp", "arrow_down": "ArrowDown", "arrow_left": "ArrowLeft", "arrow_right": "ArrowRight",
# Navigation keys
"home": "Home", "end": "End", "page_up": "PageUp", "page_down": "PageDown",
"pageup": "PageUp", "pagedown": "PageDown",
# Editing keys
"backspace": "Backspace", "delete": "Delete", "insert": "Insert",
"enter": "Enter", "return": "Enter", "tab": "Tab", "escape": "Escape", "esc": "Escape",
# Modifier keys
"shift": "Shift", "ctrl": "Control", "control": "Control", "alt": "Alt", "meta": "Meta",
"shift_left": "ShiftLeft", "ctrl_or_meta": "ControlOrMeta", "control_or_meta": "ControlOrMeta",
# Punctuation and symbols
"space": " ", "spacebar": " ",
"backquote": "Backquote", "`": "Backquote", "backtick": "Backquote",
"minus": "Minus", "-": "Minus", "dash": "Minus",
"equal": "Equal", "=": "Equal", "equals": "Equal",
"backslash": "Backslash", "\\": "Backslash",
"bracket_left": "BracketLeft", "[": "BracketLeft",
"bracket_right": "BracketRight", "]": "BracketRight",
"semicolon": "Semicolon", ";": "Semicolon",
"quote": "Quote", "'": "Quote", "apostrophe": "Quote",
"comma": "Comma", ",": "Comma",
"period": "Period", ".": "Period", "dot": "Period",
"slash": "Slash", "/": "Slash",
# Numpad keys
"numpad_0": "Numpad0", "numpad_1": "Numpad1", "numpad_2": "Numpad2", "numpad_3": "Numpad3",
"numpad_4": "Numpad4", "numpad_5": "Numpad5", "numpad_6": "Numpad6", "numpad_7": "Numpad7",
"numpad_8": "Numpad8", "numpad_9": "Numpad9",
"numpad_add": "NumpadAdd", "numpad_subtract": "NumpadSubtract",
"numpad_multiply": "NumpadMultiply", "numpad_divide": "NumpadDivide",
"numpad_decimal": "NumpadDecimal", "numpad_enter": "NumpadEnter",
# Lock keys
"caps_lock": "CapsLock", "capslock": "CapsLock",
"num_lock": "NumLock", "numlock": "NumLock",
"scroll_lock": "ScrollLock", "scrolllock": "ScrollLock",
# Common combinations (case-insensitive aliases)
"ENTER": "Enter", "ESCAPE": "Escape", "BACKSPACE": "Backspace", "DELETE": "Delete",
"TAB": "Tab", "SPACE": " ", "UP": "ArrowUp", "DOWN": "ArrowDown",
"LEFT": "ArrowLeft", "RIGHT": "ArrowRight", "HOME": "Home", "END": "End",
}
class BrowserManager:
"""Manages Playwright browser instance with proper resource cleanup."""
def __init__(self):
self.headless = config.headless
self.browser_type = self._validate_browser_type(config.browser_type)
self.page: Page | None = None
self.screenshot_index = 0
self.mouse_x = 0
self.mouse_y = 0
self._lock = threading.RLock()
self.window_width = config.window_width
self.window_height = config.window_height
self.screenshot_delay = config.screenshot_delay
self.reconnect_timeout = config.reconnect_timeout
self.crosshair_id = config.crosshair_id
self.console_messages: list[dict[str, Any]] = []
self._init_browser()
def _validate_browser_type(self, browser_type: str) -> str:
"""Validate and return the browser type."""
browser_type = browser_type.lower()
if browser_type not in SUPPORTED_BROWSERS:
msg = (
f"Unsupported browser type: {browser_type}. "
f"Supported browsers: {', '.join(sorted(SUPPORTED_BROWSERS))}"
)
raise ValueError(msg)
return browser_type
def _init_browser(self):
self.playwright: Playwright = sync_playwright().start()
browser_launcher = getattr(self.playwright, self.browser_type)
executable_path = None
if self.browser_type == "chromium" and config.chromium_executable_path:
executable_path = config.chromium_executable_path
elif self.browser_type == "firefox" and config.firefox_executable_path:
executable_path = config.firefox_executable_path
launch_options = {"headless": self.headless}
if executable_path and Path(executable_path).exists():
launch_options["executable_path"] = executable_path
elif executable_path:
print(
f"Warning: Executable path '{executable_path}' does not exist, using default browser"
)
self.browser: Browser = browser_launcher.launch(**launch_options)
@property
def browser_name(self) -> str:
"""Get the name of the browser."""
return self.browser.browser_type.name
@contextlib.contextmanager
def _browser_lock(self):
"""Context manager for thread-safe browser operations."""
with self._lock:
yield self._ensure_browser()
def _ensure_browser(self) -> Page:
"""Launch Chromium lazily and move cursor to (0,0) once."""
if self.page is not None:
return self.page
ctx = self.browser.new_context(
viewport={"width": self.window_width, "height": self.window_height}
)
self.page = ctx.new_page()
self._setup_console_listener(self.page)
self.page.mouse.move(0, 0)
self.mouse_x = self.mouse_y = 0
return self.page
def is_website_open(self) -> bool:
"""Check if a website is currently open."""
return self.page is not None and self.page.url not in (None, "about:blank", "")
def cleanup(self):
"""Clean up browser resources safely."""
with self._lock:
for resource, name in [
(self.page, "page"),
(self.browser, "browser"),
(self.playwright, "playwright")
]:
if resource is not None:
try:
if name == "playwright":
resource.stop()
else:
resource.close()
except Exception:
pass # Ignore cleanup errors
self.browser = self.page = self.playwright = None
def _inject_crosshair(self, page: Page, x: int, y: int) -> bool:
"""Inject crosshair at given coordinates. Returns True if successful."""
try:
page.evaluate(CROSSHAIR_JS, [x, y, self.crosshair_id])
return True
except Exception:
return False
def _remove_crosshair(self, page: Page) -> None:
"""Remove crosshair elements from the page."""
try:
page.evaluate(REMOVE_CROSSHAIR_JS, self.crosshair_id)
except Exception:
pass
def take_screenshot(self) -> dict[str, Any]:
"""Capture screenshot with crosshair."""
with self._browser_lock() as page:
# this retry logic is a hack to ensure the page is loaded
# (at least enough to inject the crosshair)
timeout = time.time() + self.reconnect_timeout
while time.time() < timeout:
if self._inject_crosshair(page, self.mouse_x, self.mouse_y):
break
time.sleep(max(0.3, self.screenshot_delay))
time.sleep(self.screenshot_delay)
screenshot_data = page.screenshot(type="png")
self._remove_crosshair(page)
self.screenshot_index += 1
return {
"screenshot": base64.b64encode(screenshot_data).decode(),
"screenshot_index": self.screenshot_index,
}
def validate_coordinates(self, x: int, y: int) -> tuple[bool, bool]:
x_is_valid = 0 <= x <= self.window_width
y_is_valid = 0 <= y <= self.window_height
return (x_is_valid, y_is_valid)
def constrain_mouse_position(self, page: Page) -> bool:
"""Constrain mouse position to window bounds and move if needed.
"""
if self.window_width <= 0 or self.window_height <= 0:
return False
if self.mouse_x >= self.window_width or self.mouse_y >= self.window_height:
self.mouse_x = min(self.mouse_x, self.window_width - 1)
self.mouse_y = min(self.mouse_y, self.window_height - 1)
self.mouse_x = max(0, self.mouse_x)
self.mouse_y = max(0, self.mouse_y)
page.mouse.move(self.mouse_x, self.mouse_y)
return True
return False
def get_key(self, key: str) -> str:
"""Get the key from the key map."""
if key.lower() in KEY_MAP:
return KEY_MAP[key.lower()]
msg = f"Key {key} not found. Supported keys: {', '.join(KEY_MAP.keys())}"
raise ValueError(msg)
def key_down(self, key: str):
"""Press and hold a key."""
playwright_key = self.get_key(key)
self.page.keyboard.down(playwright_key)
def key_press(self, key: str):
"""Press a key."""
playwright_key = self.get_key(key)
self.page.keyboard.press(playwright_key)
def key_up(self, key: str):
"""Release a key."""
playwright_key = self.get_key(key)
self.page.keyboard.up(playwright_key)
def _setup_console_listener(self, page: Page):
"""Set up console message listener for the page."""
def on_console(msg):
self.console_messages.append({
"type": msg.type,
"text": msg.text,
"timestamp": time.time(),
"location": msg.location
})
page.on("console", on_console)
def get_console_output(self) -> list[dict[str, Any]]:
"""Get all console messages and clear the buffer."""
with self._lock:
messages = self.console_messages.copy()
self.console_messages.clear()
return messages

View File

@@ -0,0 +1,31 @@
from __future__ import annotations
import os
from dataclasses import dataclass, field
from web_browser_utils import ScreenshotMode
@dataclass
class ClientConfig:
"""Configuration for the web_browser client"""
port: int = int(os.getenv("WEB_BROWSER_PORT", "8009"))
autoscreenshot: bool = os.getenv("WEB_BROWSER_AUTOSCREENSHOT", "1") == "1"
screenshot_mode: ScreenshotMode = ScreenshotMode(
os.getenv("WEB_BROWSER_SCREENSHOT_MODE", ScreenshotMode.SAVE.value)
)
@dataclass
class ServerConfig:
"""Configuration for the web_browser server"""
port: int = int(os.getenv("WEB_BROWSER_PORT", "8009"))
window_width: int = int(os.getenv("WEB_BROWSER_WINDOW_WIDTH", 1024))
window_height: int = int(os.getenv("WEB_BROWSER_WINDOW_HEIGHT", 768))
headless: bool = os.getenv("WEB_BROWSER_HEADLESS", "1") != "0"
screenshot_delay: float = float(os.getenv("WEB_BROWSER_SCREENSHOT_DELAY", 0.2))
browser_type: str = os.getenv("WEB_BROWSER_BROWSER_TYPE", "chromium")
reconnect_timeout: float = float(os.getenv("WEB_BROWSER_RECONNECT_TIMEOUT", 15))
chromium_executable_path: str | None = os.getenv("WEB_BROWSER_CHROMIUM_EXECUTABLE_PATH")
firefox_executable_path: str | None = os.getenv("WEB_BROWSER_FIREFOX_EXECUTABLE_PATH")
crosshair_id: str = "__web_browser_crosshair__"

View File

@@ -0,0 +1,123 @@
from __future__ import annotations
import base64
import functools
import requests
import sys
from enum import Enum
from pathlib import Path
from flask import jsonify, request
class ScreenshotMode(Enum):
SAVE = "save" # saves screenshot to png file
PRINT = "print" # prints base64 encoded screenshot to stdout
def normalize_url(url: str) -> str:
# if starts with http:// or https://, return as is
# if starts with file://, return as is
# elif local file path exists, return as file://
# else: return as https://
if any(url.startswith(prefix) for prefix in ["http://", "https://", "file://"]):
return url
elif Path(url).exists():
return f"file://{Path(url).resolve()}"
else:
return "https://" + url
def validate_request(*required_keys):
"""Decorator to validate that all required keys are present in request JSON."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if not request.is_json:
return jsonify({"status": "error", "message": "Request must be JSON"})
request_data = request.get_json()
if not request_data:
return jsonify({"status": "error", "message": "Request body cannot be empty"})
missing_keys = [key for key in required_keys if key not in request_data]
if missing_keys:
return jsonify({
"status": "error",
"message": f"Missing required fields: {', '.join(missing_keys)}"
})
return func(*args, **kwargs)
return wrapper
return decorator
def catch_error(func):
"""Decorator to catch exceptions and return them as JSON."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
return jsonify({"status": "error", "message": str(e)})
return wrapper
def _print_error(message, error_type="ERROR"):
"""Print error message in a normalized format."""
print(f"{error_type}: {message}", file=sys.stderr)
def _format_metadata_info(response):
"""Format metadata information from API response for display."""
if "metadata" not in response or not response["metadata"]:
return ""
return "\n".join(f"{key}: {value}" for key, value in response["metadata"].items())
def send_request(port, endpoint, method="GET", data=None):
url = f"http://localhost:{port}/{endpoint}"
if method == "GET":
response = requests.get(url)
else:
response = requests.post(url, json=data)
if response.status_code != 200:
_print_error(f"Internal error communicating with backend: {response.text}", "INTERNAL ERROR")
return None
data = response.json()
if data["status"] == "error":
metadata_info = _format_metadata_info(data)
error_message = data['message']
_print_error(f"ACTION ERROR:\n{error_message}")
if metadata_info:
print(f"\nMETADATA:\n{metadata_info}", file=sys.stderr)
return None
return data
def _print_response_with_metadata(response):
"""Print response message with formatted metadata information."""
message = response.get("message", "")
metadata_info = _format_metadata_info(response)
print(f"ACTION RESPONSE:\n{message}")
if metadata_info:
print(f"\nMETADATA:\n{metadata_info}")
def _handle_screenshot(screenshot_data, mode):
"""Handle screenshot data according to the specified mode"""
if mode == ScreenshotMode.SAVE:
path = Path("latest_screenshot.png")
path.write_bytes(base64.b64decode(screenshot_data))
print(f"![Screenshot]({path})")
elif mode == ScreenshotMode.PRINT:
print(f"![Screenshot](data:image/png;base64,{screenshot_data})")
else:
raise ValueError(f"Invalid screenshot mode: {mode}")
def _autosave_screenshot_from_response(response, mode):
"""Handle screenshot from response data according to the specified mode"""
if "screenshot" in response:
_handle_screenshot(response["screenshot"], mode)

View File

@@ -0,0 +1 @@
<script>console.log("Hello from console!"); console.error("Test error"); console.warn("Test warning");</script><h1>Test Page</h1>

View File

@@ -0,0 +1,25 @@
#!/usr/bin/env python3
import json
import os
from pathlib import Path
from registry import registry # type: ignore
def main():
state_path = Path("/root/state.json")
if state_path.exists():
state = json.loads(state_path.read_text())
else:
state = {}
current_file = registry.get("CURRENT_FILE")
open_file = "n/a" if not current_file else str(Path(current_file).resolve())
state["open_file"] = open_file
state["working_dir"] = os.getcwd()
state_path.write_text(json.dumps(state))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,29 @@
#!/usr/bin/env python3
import sys
from pathlib import Path
from windowed_file import WindowedFile # type: ignore
def main():
if len(sys.argv) < 2:
print("Usage: create <filename>")
sys.exit(1)
path = Path(sys.argv[1])
if not path.parent.is_dir():
path.parent.mkdir(parents=True, exist_ok=True)
if path.exists():
print(f"Warning: File '{path}' already exists.")
sys.exit(1)
path.touch()
wfile = WindowedFile(path=path)
wfile.first_line = 0
wfile.print_window()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,37 @@
#!/usr/bin/env python3
import sys
from typing import List
from windowed_file import WindowedFile # type: ignore
def main(args: List[str]) -> int:
if len(args) > 1:
print("goto allows only one line number at a time.")
return 1
if not args:
print("Usage: goto <line>")
return 1
try:
line_number = int(args[0])
except ValueError:
print("Usage: goto <line>")
print("Error: <line> must be a number")
return 1
wf = WindowedFile()
if line_number > wf.n_lines:
print(f"Error: <line> must be less than or equal to {wf.n_lines}")
return 1
# Convert from 1-based line numbers (user input) to 0-based (internal representation)
wf.goto(line_number - 1, mode="top")
wf.print_window()
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))

View File

@@ -0,0 +1,49 @@
#!/usr/bin/env python3
import sys
from typing import Optional
from windowed_file import FileNotOpened, WindowedFile # type: ignore
def main(path: Optional[str] = None, line_number: Optional[str] = None) -> None:
if path is None:
try:
WindowedFile(exit_on_exception=False).print_window()
# If this passes, then there was already a file open and we just show it again
sys.exit(0)
except FileNotOpened:
print('Usage: open "<file>"')
sys.exit(1)
assert path is not None
wf = WindowedFile(path=path)
if line_number is not None:
try:
line_num = int(line_number)
except ValueError:
print('Usage: open "<file>" [<line_number>]')
print("Error: <line_number> must be a number")
sys.exit(1)
if line_num > wf.n_lines:
print(f"Warning: <line_number> ({line_num}) is greater than the number of lines in the file ({wf.n_lines})")
print(f"Warning: Setting <line_number> to {wf.n_lines}")
line_num = wf.n_lines
elif line_num < 1:
print(f"Warning: <line_number> ({line_num}) is less than 1")
print("Warning: Setting <line_number> to 1")
line_num = 1
else:
# Default to middle of window if no line number provided
line_num = wf.first_line
wf.goto(line_num - 1, mode="top")
wf.print_window()
if __name__ == "__main__":
args = sys.argv[1:]
file_path = args[0] if args else None
line_number = args[1] if len(args) > 1 else None
main(file_path, line_number)

View File

@@ -0,0 +1,12 @@
#!/usr/bin/env python3
from windowed_file import WindowedFile # type: ignore
def main():
wf = WindowedFile()
wf.scroll(wf.window)
wf.print_window()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,13 @@
#!/usr/bin/env python3
from windowed_file import WindowedFile # type: ignore
def main():
wf = WindowedFile()
wf.scroll(-wf.window)
wf.print_window()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,38 @@
tools:
goto:
signature: "goto <line_number>"
docstring: "moves the window to show <line_number>"
arguments:
- name: line_number
type: integer
description: "the line number to move the window to"
required: true
open:
signature: 'open "<path>" [<line_number>]'
docstring: "opens the file at the given path in the editor. If line_number is provided, the window will be move to include that line"
arguments:
- name: path
type: string
description: "the path to the file to open"
required: true
- name: line_number
type: integer
description: "the line number to move the window to (if not provided, the window will start at the top of the file)"
required: false
create:
signature: "create <filename>"
docstring: "creates and opens a new file with the given name"
arguments:
- name: filename
type: string
description: "the name of the file to create"
required: true
scroll_up:
signature: "scroll_up"
docstring: "moves the window up {WINDOW} lines"
arguments: []
scroll_down:
signature: "scroll_down"
docstring: "moves the window down {WINDOW} lines"
arguments: []
state_command: "_state"

View File

@@ -0,0 +1,15 @@
#!/usr/bin/env bash
# script_dir=$(dirname "$(readlink -f "$0")")
bundle_dir=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
export PYTHONPATH="$bundle_dir/lib":$PYTHONPATH
# Write default environment variables into the environment storage
_write_env "WINDOW" "${WINDOW:-100}"
_write_env "OVERLAP" "${OVERLAP:-2}"
_write_env "FIRST_LINE" "${FIRST_LINE:-0}"
_write_env "CURRENT_FILE" "${CURRENT_FILE:-}"
# install jq
# apt-get update && apt-get install -y jq

View File

View File

@@ -0,0 +1,144 @@
#!/usr/bin/env python3
"""This helper command is used to parse and print flake8 output."""
# ruff: noqa: UP007 UP006 UP035
import subprocess
from pathlib import Path
from typing import List, Optional, Tuple
try:
from sweagent import TOOLS_DIR
except ImportError:
pass
else:
import sys
default_lib = TOOLS_DIR / "windowed" / "lib"
assert default_lib.is_dir()
sys.path.append(str(default_lib))
sys.path.append(str(TOOLS_DIR / "registry" / "lib"))
from registry import registry
class Flake8Error:
"""A class to represent a single flake8 error"""
def __init__(self, filename: str, line_number: int, col_number: int, problem: str):
self.filename = filename
self.line_number = line_number
self.col_number = col_number
self.problem = problem
@classmethod
def from_line(cls, line: str):
try:
prefix, _sep, problem = line.partition(": ")
filename, line_number, col_number = prefix.split(":")
except (ValueError, IndexError) as e:
msg = f"Invalid flake8 error line: {line}"
raise ValueError(msg) from e
return cls(filename, int(line_number), int(col_number), problem)
def __eq__(self, other):
if not isinstance(other, Flake8Error):
return NotImplemented
return (
self.filename == other.filename
and self.line_number == other.line_number
and self.col_number == other.col_number
and self.problem == other.problem
)
def __repr__(self):
return f"Flake8Error(filename={self.filename}, line_number={self.line_number}, col_number={self.col_number}, problem={self.problem})"
def _update_previous_errors(
previous_errors: List[Flake8Error], replacement_window: Tuple[int, int], replacement_n_lines: int
) -> List[Flake8Error]:
"""Update the line numbers of the previous errors to what they would be after the edit window.
This is a helper function for `_filter_previous_errors`.
All previous errors that are inside of the edit window should not be ignored,
so they are removed from the previous errors list.
Args:
previous_errors: list of errors with old line numbers
replacement_window: the window of the edit/lines that will be replaced
replacement_n_lines: the number of lines that will be used to replace the text
Returns:
list of errors with updated line numbers
"""
updated = []
lines_added = replacement_n_lines - (replacement_window[1] - replacement_window[0] + 1)
for error in previous_errors:
if error.line_number < replacement_window[0]:
# no need to adjust the line number
updated.append(error)
continue
if replacement_window[0] <= error.line_number <= replacement_window[1]:
# The error is within the edit window, so let's not ignore it
# either way (we wouldn't know how to adjust the line number anyway)
continue
# We're out of the edit window, so we need to adjust the line number
updated.append(Flake8Error(error.filename, error.line_number + lines_added, error.col_number, error.problem))
return updated
def format_flake8_output(
input_string: str,
show_line_numbers: bool = False,
*,
previous_errors_string: str = "",
replacement_window: Optional[Tuple[int, int]] = None,
replacement_n_lines: Optional[int] = None,
) -> str:
"""Filter flake8 output for previous errors and print it for a given file.
Args:
input_string: The flake8 output as a string
show_line_numbers: Whether to show line numbers in the output
previous_errors_string: The previous errors as a string
replacement_window: The window of the edit (lines that will be replaced)
replacement_n_lines: The number of lines used to replace the text
Returns:
The filtered flake8 output as a string
"""
errors = [Flake8Error.from_line(line.strip()) for line in input_string.split("\n") if line.strip()]
# print(f"New errors before filtering: {errors=}")
lines = []
if previous_errors_string:
assert replacement_window is not None
assert replacement_n_lines is not None
previous_errors = [
Flake8Error.from_line(line.strip()) for line in previous_errors_string.split("\n") if line.strip()
]
# print(f"Previous errors before updating: {previous_errors=}")
previous_errors = _update_previous_errors(previous_errors, replacement_window, replacement_n_lines)
# print(f"Previous errors after updating: {previous_errors=}")
errors = [error for error in errors if error not in previous_errors]
# Sometimes new errors appear above the replacement window that were 'shadowed' by the previous errors
# they still clearly aren't caused by the edit.
errors = [error for error in errors if error.line_number >= replacement_window[0]]
# print(f"New errors after filtering: {errors=}")
for error in errors:
if not show_line_numbers:
lines.append(f"- {error.problem}")
else:
lines.append(f"- line {error.line_number} col {error.col_number}: {error.problem}")
return "\n".join(lines)
def flake8(file_path: str) -> str:
"""Run flake8 on a given file and return the output as a string"""
if Path(file_path).suffix != ".py":
return ""
cmd = registry.get("LINT_COMMAND", "flake8 --isolated --select=F821,F822,F831,E111,E112,E113,E999,E902 {file_path}")
# don't use capture_output because it's not compatible with python3.6
out = subprocess.run(cmd.format(file_path=file_path), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return out.stdout.decode()

View File

@@ -0,0 +1,315 @@
import json
import os
from pathlib import Path
from typing import Any, List, Optional, Tuple, Union
try:
from sweagent import TOOLS_DIR
except ImportError:
pass
else:
import sys
sys.path.append(str(TOOLS_DIR / "registry" / "lib"))
from registry import registry
class FileNotOpened(Exception):
"""Raised when no file is opened."""
class TextNotFound(Exception):
"""Raised when the text is not found in the window."""
def _find_all(a_str: str, sub: str):
start = 0
while True:
start = a_str.find(sub, start)
if start == -1:
return
yield start
start += len(sub)
class ReplacementInfo:
def __init__(self, first_replaced_line: int, n_search_lines: int, n_replace_lines: int, n_replacements: int):
self.first_replaced_line = first_replaced_line
self.n_search_lines = n_search_lines
self.n_replace_lines = n_replace_lines
self.n_replacements = n_replacements
def __repr__(self):
return f"ReplacementInfo(first_replaced_line={self.first_replaced_line}, n_search_lines={self.n_search_lines}, n_replace_lines={self.n_replace_lines}, n_replacements={self.n_replacements})"
class InsertInfo:
def __init__(self, first_inserted_line: int, n_lines_added: int):
self.first_inserted_line = first_inserted_line
self.n_lines_added = n_lines_added
class WindowedFile:
def __init__(
self,
path: Optional[Path] = None,
*,
first_line: Optional[int] = None,
window: Optional[int] = None,
exit_on_exception: bool = True,
):
"""
Args:
path: Path to the file to open.
first_line: First line of the display window.
window: Number of lines to display.
exit_on_exception: If False, will raise exception.
If true, will print an error message and exit.
Will create file if not found.
Internal convention/notes:
* All line numbers are 0-indexed.
* Previously, we used "current_line" for the internal state
of the window position, pointing to the middle of the window.
Now, we use `first_line` for this purpose (it's simpler this way).
"""
_path = registry.get_if_none(path, "CURRENT_FILE")
self._exit_on_exception = exit_on_exception
if not _path:
if self._exit_on_exception:
print("No file open. Use the open command first.")
exit(1)
raise FileNotOpened
self.path = Path(_path)
if self.path.is_dir():
msg = f"Error: {self.path} is a directory. You can only open files. Use cd or ls to navigate directories."
if self._exit_on_exception:
print(msg)
exit(1)
raise IsADirectoryError(msg)
if not self.path.exists():
msg = f"Error: File {self.path} not found"
if self._exit_on_exception:
print(msg)
exit(1)
raise FileNotFoundError(msg)
registry["CURRENT_FILE"] = str(self.path.resolve())
self.window = int(registry.get_if_none(window, "WINDOW"))
self.overlap = int(registry.get("OVERLAP", 0))
# Ensure that we get a valid current line by using the setter
self._first_line = 0
self.first_line = int(
registry.get_if_none(
first_line,
"FIRST_LINE",
0,
)
)
self.offset_multiplier = 1 / 6
self._original_text = self.text
self._original_first_line = self.first_line
@property
def first_line(self) -> int:
return self._first_line
@first_line.setter
def first_line(self, value: Union[int, float]):
self._original_first_line = self.first_line
value = int(value)
self._first_line = max(0, min(value, self.n_lines - self.window))
registry["FIRST_LINE"] = self.first_line
@property
def text(self) -> str:
return self.path.read_text()
@text.setter
def text(self, new_text: str):
self._original_text = self.text
self.path.write_text(new_text)
@property
def n_lines(self) -> int:
return len(self.text.splitlines())
@property
def line_range(self) -> Tuple[int, int]:
"""Return first and last line (inclusive) of the display window, such
that exactly `window` many lines are displayed.
This means `line_range[1] - line_range[0] == window-1` as long as there are
at least `window` lines in the file. `first_line` does the handling
of making sure that we don't go out of bounds.
"""
return self.first_line, min(self.first_line + self.window - 1, self.n_lines - 1)
def get_window_text(
self, *, line_numbers: bool = False, status_line: bool = False, pre_post_line: bool = False
) -> str:
"""Get the text in the current display window with optional status/extra information
Args:
line_numbers: include line numbers in the output
status_line: include the status line in the output (file path, total lines)
pre_post_line: include the pre/post line in the output (number of lines above/below)
"""
start_line, end_line = self.line_range
lines = self.text.split("\n")[start_line : end_line + 1]
out_lines = []
if status_line:
out_lines.append(f"[File: {self.path} ({self.n_lines} lines total)]")
if pre_post_line:
if start_line > 0:
out_lines.append(f"({start_line} more lines above)")
if line_numbers:
out_lines.extend(f"{i + start_line + 1}:{line}" for i, line in enumerate(lines))
else:
out_lines.extend(lines)
if pre_post_line:
if end_line < self.n_lines - 1:
out_lines.append(f"({self.n_lines - end_line - 1} more lines below)")
return "\n".join(out_lines)
def set_window_text(self, new_text: str, *, line_range: Optional[Tuple[int, int]] = None) -> None:
"""Replace the text in the current display window with a new string."""
text = self.text.split("\n")
if line_range is not None:
start, stop = line_range
else:
start, stop = self.line_range
# Handle empty replacement text (deletion case)
new_lines = new_text.split("\n") if new_text else []
text[start : stop + 1] = new_lines
self.text = "\n".join(text)
def replace_in_window(
self,
search: str,
replace: str,
*,
reset_first_line: str = "top",
) -> "ReplacementInfo":
"""Search and replace in the window.
Args:
search: The string to search for (can be multi-line).
replace: The string to replace it with (can be multi-line).
reset_first_line: If "keep", we keep the current line. Otherwise, we
`goto` the line where the replacement started with this mode.
"""
window_text = self.get_window_text()
# Update line number
index = window_text.find(search)
if index == -1:
if self._exit_on_exception:
print(f"Error: Text not found: {search}")
exit(1)
raise TextNotFound
window_start_line, _ = self.line_range
replace_start_line = window_start_line + len(window_text[:index].split("\n")) - 1
new_window_text = window_text.replace(search, replace)
self.set_window_text(new_window_text)
if reset_first_line == "keep":
pass
else:
self.goto(replace_start_line, mode=reset_first_line)
return ReplacementInfo(
first_replaced_line=replace_start_line,
n_search_lines=len(search.split("\n")),
n_replace_lines=len(replace.split("\n")),
n_replacements=1,
)
def find_all_occurrences(self, search: str, zero_based: bool = True) -> List[int]:
"""Returns the line numbers of all occurrences of the search string."""
indices = list(_find_all(self.text, search))
line_numbers = []
for index in indices:
line_no = len(self.text[:index].split("\n"))
if zero_based:
line_numbers.append(line_no - 1)
else:
line_numbers.append(line_no)
return line_numbers
def replace(self, search: str, replace: str, *, reset_first_line: str = "top") -> "ReplacementInfo":
indices = list(_find_all(self.text, search))
if not indices:
if self._exit_on_exception:
print(f"Error: Text not found: {search}")
exit(1)
raise TextNotFound
replace_start_line = len(self.text[: indices[0]].split("\n"))
new_text = self.text.replace(search, replace)
self.text = new_text
if reset_first_line == "keep":
pass
else:
self.goto(replace_start_line, mode=reset_first_line)
return ReplacementInfo(
first_replaced_line=replace_start_line,
n_search_lines=len(search.split("\n")),
n_replace_lines=len(replace.split("\n")),
n_replacements=len(indices),
)
def print_window(self, *, line_numbers: bool = True, status_line: bool = True, pre_post_line: bool = True):
print(self.get_window_text(line_numbers=line_numbers, status_line=status_line, pre_post_line=pre_post_line))
def goto(self, line: int, mode: str = "top"):
if mode == "top":
self.first_line = line - self.window * self.offset_multiplier
else:
raise NotImplementedError
def scroll(self, n_lines: int):
if n_lines > 0:
self.first_line += n_lines - self.overlap
elif n_lines < 0:
self.first_line += n_lines + self.overlap
def undo_edit(self):
self.text = self._original_text
self.first_line = self._original_first_line
def insert(self, text: str, line: Optional[int] = None, *, reset_first_line: str = "top") -> "InsertInfo":
# Standardize empty text handling
if not text:
return InsertInfo(first_inserted_line=(self.n_lines if line is None else line), n_lines_added=0)
# Remove single trailing newline if it exists
text = text[:-1] if text.endswith("\n") else text
if line is None:
# Append to end of file
if not self.text:
new_text = text
else:
current_text = self.text[:-1] if self.text.endswith("\n") else self.text
new_text = current_text + "\n" + text
insert_line = self.n_lines
elif line < 0:
# Insert at start of file
if not self.text:
new_text = text
else:
current_text = self.text[1:] if self.text.startswith("\n") else self.text
new_text = text + "\n" + current_text
insert_line = 0
else:
# Insert at specific line
lines = self.text.split("\n")
lines.insert(line, text)
new_text = "\n".join(lines)
insert_line = line
self.text = new_text
if reset_first_line != "keep":
self.goto(insert_line, mode=reset_first_line)
return InsertInfo(first_inserted_line=insert_line, n_lines_added=len(text.split("\n")))

View File

@@ -0,0 +1,128 @@
#!/usr/bin/env python3
import argparse
import sys
from typing import Tuple, Union
try:
from sweagent import TOOLS_DIR
except ImportError:
pass
else:
default_lib = TOOLS_DIR / "windowed" / "lib"
assert default_lib.is_dir()
sys.path.append(str(default_lib))
sys.path.append(str(TOOLS_DIR / "registry" / "lib"))
from windowed_file import FileNotOpened, WindowedFile # type: ignore
from flake8_utils import flake8, format_flake8_output # type: ignore
_USAGE_MSG = """Usage: edit <start_line>:<end_line>
<replacement_text>
end_of_edit"""
_EDIT_SUCCESS_MSG = """File updated. Please review the changes and make sure they are correct
(correct indentation, no duplicate lines, etc). Edit the file again if necessary."""
_LINT_ERROR_TEMPLATE = """Your proposed edit has introduced new syntax error(s). Please read this error message carefully and then retry editing the file.
ERRORS:
{errors}
This is how your edit would have looked if applied
------------------------------------------------
{window_applied}
------------------------------------------------
This is the original code before your edit
------------------------------------------------
{window_original}
------------------------------------------------
Your changes have NOT been applied. Please fix your edit command and try again.
DO NOT re-run the same failed edit command. Running it again will lead to the same error."""
def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument("line_range", help="Line range in format start:end")
parser.add_argument("replacement_text", help="Text to insert", nargs="?")
return parser
def parse_line_range(line_range: str) -> Tuple[int, int]:
try:
start, end = map(int, line_range.split(":"))
return start - 1, end - 1
except ValueError:
print(_USAGE_MSG)
exit(1)
def main(line_range: str, replacement_text: Union[str, None] = None):
# Handle file opening
try:
wf = WindowedFile(exit_on_exception=False)
except FileNotOpened:
print("No file opened. Use the `open` command first.")
exit(1)
# Parse line range
start_line, end_line = parse_line_range(line_range)
if replacement_text is None:
# Read replacement text from stdin (e.g., when sent via bash heredoc)
# if not provided as argument
replacement_lines = []
while True:
try:
line = input()
if line == "end_of_edit":
break
replacement_lines.append(line)
except EOFError:
break
replacement_text = "\n".join(replacement_lines)
else:
if replacement_text.endswith("\n"):
replacement_text = replacement_text[:-1]
if replacement_text is None:
print(_USAGE_MSG)
exit(1)
# Get pre-edit linting errors
pre_edit_lint = flake8(wf.path)
# Perform the edit
wf.set_window_text(replacement_text, line_range=(start_line, end_line))
# Check for new linting errors
post_edit_lint = flake8(wf.path)
new_flake8_output = format_flake8_output(
post_edit_lint,
previous_errors_string=pre_edit_lint,
replacement_window=(start_line, end_line),
replacement_n_lines=len(replacement_text.splitlines()),
)
if new_flake8_output:
# Show error and revert changes
with_edits = wf.get_window_text(line_numbers=True, status_line=True, pre_post_line=True)
wf.undo_edit()
without_edits = wf.get_window_text(line_numbers=True, status_line=True, pre_post_line=True)
print(
_LINT_ERROR_TEMPLATE.format(
errors=new_flake8_output, window_applied=with_edits, window_original=without_edits
)
)
exit(1)
# Success - update window position and show result
wf.goto(start_line, mode="top")
print(_EDIT_SUCCESS_MSG)
wf.print_window()
if __name__ == "__main__":
main(**vars(get_parser().parse_args()))

View File

@@ -0,0 +1,31 @@
tools:
edit:
signature: |
edit <start_line>:<end_line>
<replacement_text>
end_of_edit
# Note: Without function calling we should add back:
# The replacement text is terminated by a line with only
# end_of_edit on
docstring: >
Replaces lines <start_line> through <end_line> (inclusive) with the given text
in the open file.
All of the <replacement text> will be entered, so make
sure your indentation is formatted properly.
Please note that THIS COMMAND REQUIRES PROPER INDENTATION.
If you'd like to add the line ' print(x)' you must fully write that out, with all those spaces before the code!
end_name: "end_of_edit"
arguments:
- name: start_line
type: integer
description: "the line number to start the edit at"
required: true
- name: end_line
type: integer
description: "the line number to end the edit at (inclusive)"
required: true
- name: replacement_text
type: string
description: "the text to replace the current selection with"
required: true

View File

@@ -0,0 +1,5 @@
_write_env "CURRENT_FILE" "${CURRENT_FILE:-}"
_write_env "CURRENT_LINE" "${CURRENT_LINE:-0}"
_write_env "WINDOW" "$WINDOW"
pip install flake8

View File

@@ -0,0 +1,172 @@
#!/usr/bin/env python3
import argparse
try:
from sweagent import TOOLS_DIR
except ImportError:
pass
else:
import sys
default_lib = TOOLS_DIR / "windowed" / "lib"
assert default_lib.is_dir()
sys.path.append(str(default_lib))
sys.path.append(str(TOOLS_DIR / "registry" / "lib"))
from windowed_file import FileNotOpened, TextNotFound, WindowedFile # type: ignore
from flake8_utils import flake8, format_flake8_output # type: ignore
RETRY_WITH_OUTPUT_TOKEN = "###SWE-AGENT-RETRY-WITH-OUTPUT###"
_NOT_FOUND = """Your edit was not applied (file not modified): Text {search!r} not found in displayed lines (or anywhere in the file).
Please modify your search string. Did you forget to properly handle whitespace/indentation?
You can also call `open` again to re-display the file with the correct context.
"""
_NOT_FOUND_IN_WINDOW_MSG = """Your edit was not applied (file not modified): Text {search!r} not found in displayed lines.
However, we found the following occurrences of your search string in the file:
{occurrences}
You can use the `goto` command to navigate to these locations before running the edit command again.
"""
_MULTIPLE_OCCURRENCES_MSG = """Your edit was not applied (file not modified): Found more than one occurrence of {search!r} in the currently displayed lines.
Please make your search string more specific (for example, by including more lines of context).
"""
_NO_CHANGES_MADE_MSG = """Your search and replace strings are the same. No changes were made. Please modify your search or replace strings."""
_SINGLE_EDIT_SUCCESS_MSG = """Text replaced. Please review the changes and make sure they are correct:
1. The edited file is correctly indented
2. The edited file does not contain duplicate lines
3. The edit does not break existing functionality
Edit the file again if necessary."""
_MULTIPLE_EDITS_SUCCESS_MSG = """Replaced {n_replacements} occurrences. Please review the changes and make sure they are correct:
1. The edited file is correctly indented
2. The edited file does not contain duplicate lines
3. The edit does not break existing functionality
Edit the file again if necessary."""
_LINT_ERROR_TEMPLATE = """Your proposed edit has introduced new syntax error(s). Please read this error message carefully and then retry editing the file.
ERRORS:
{errors}
This is how your edit would have looked if applied
------------------------------------------------
{window_applied}
------------------------------------------------
This is the original code before your edit
------------------------------------------------
{window_original}
------------------------------------------------
Your changes have NOT been applied. Please fix your edit command and try again.
DO NOT re-run the same failed edit command. Running it again will lead to the same error.
"""
def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument("search", type=str)
parser.add_argument("replace", type=str)
parser.add_argument("replace_all", type=bool, nargs="?", default=False)
return parser
def main(search: str, replace: str, replace_all: bool):
try:
wf = WindowedFile(exit_on_exception=False)
except FileNotOpened:
print("No file opened. Either `open` or `create` a file first.")
exit(1)
# Turn \\n into \n etc., i.e., undo the escaping
# args.replace = args.replace.encode("utf8").decode("unicode_escape")
if search == replace:
print(_NO_CHANGES_MADE_MSG)
print(RETRY_WITH_OUTPUT_TOKEN)
exit(2)
pre_edit_lint = flake8(wf.path)
try:
if not replace_all:
window_text = wf.get_window_text()
if window_text.count(search) > 1:
print(_MULTIPLE_OCCURRENCES_MSG.format(search=search))
print(RETRY_WITH_OUTPUT_TOKEN)
exit(4)
replacement_info = wf.replace_in_window(search, replace)
# todo: Should warn if more than one occurrence was found?
else:
# todo: Give overview of all replaced occurrences/number of replacements
replacement_info = wf.replace(search, replace)
except TextNotFound:
line_no_founds = wf.find_all_occurrences(search, zero_based=False)
if line_no_founds:
print(
_NOT_FOUND_IN_WINDOW_MSG.format(
search=search, occurrences="\n".join([f"- line {line_no}" for line_no in line_no_founds])
)
)
else:
print(_NOT_FOUND.format(search=search))
print(RETRY_WITH_OUTPUT_TOKEN)
exit(3)
post_edit_lint = flake8(wf.path)
if not replace_all:
# Try to filter out pre-existing errors
replacement_window = (
replacement_info.first_replaced_line,
replacement_info.first_replaced_line + replacement_info.n_search_lines - 1,
)
# print(f"{replacement_info=}")
# print(f"{replacement_window=}")
# print(f"{pre_edit_lint=}")
# print(f"{post_edit_lint=}")
new_flake8_output = format_flake8_output(
post_edit_lint,
previous_errors_string=pre_edit_lint,
replacement_window=replacement_window,
replacement_n_lines=replacement_info.n_replace_lines,
)
else:
# Cannot easily compare the error strings, because line number changes are hard to keep track of
# So we show all linter errors.
new_flake8_output = format_flake8_output(post_edit_lint)
if new_flake8_output:
with_edits = wf.get_window_text(line_numbers=True, status_line=True, pre_post_line=True)
wf.undo_edit()
without_edits = wf.get_window_text(line_numbers=True, status_line=True, pre_post_line=True)
print(
_LINT_ERROR_TEMPLATE.format(
errors=new_flake8_output, window_applied=with_edits, window_original=without_edits,
)
)
print(RETRY_WITH_OUTPUT_TOKEN)
exit(4)
if not replace_all:
print(_SINGLE_EDIT_SUCCESS_MSG)
else:
print(_MULTIPLE_EDITS_SUCCESS_MSG.format(n_replacements=replacement_info.n_replacements))
wf.print_window()
if __name__ == "__main__":
main(**vars(get_parser().parse_args()))

View File

@@ -0,0 +1,77 @@
#!/usr/bin/env python3
import argparse
from typing import Union
from windowed_file import FileNotOpened, WindowedFile # type: ignore
from flake8_utils import flake8, format_flake8_output # type: ignore
RETRY_WITH_OUTPUT_TOKEN = "###SWE-AGENT-RETRY-WITH-OUTPUT###"
_LINT_ERROR_TEMPLATE = """Your proposed edit has introduced new syntax error(s).
Please read this error message carefully and then retry editing the file.
ERRORS:
{errors}
This is how your edit would have looked if applied
------------------------------------------------
{window_applied}
------------------------------------------------
This is the original code before your edit
------------------------------------------------
{window_original}
------------------------------------------------
Your changes have NOT been applied. Please fix your edit command and try again.
DO NOT re-run the same failed edit command. Running it again will lead to the same error.
"""
def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument("text", type=str)
parser.add_argument("line", type=int, nargs="?", default=None)
return parser
def main(text: str, line: Union[int, None] = None):
try:
wf = WindowedFile(exit_on_exception=False)
except FileNotOpened:
print("No file opened. Use the `create` or `open` command first.")
print(RETRY_WITH_OUTPUT_TOKEN)
exit(1)
pre_edit_lint = flake8(wf.path)
insert_info = wf.insert(text, line=line - 1 if line is not None else None)
post_edit_lint = flake8(wf.path)
# Try to filter out pre-existing errors
replacement_window = (insert_info.first_inserted_line, insert_info.first_inserted_line)
new_flake8_output = format_flake8_output(
post_edit_lint,
previous_errors_string=pre_edit_lint,
replacement_window=replacement_window,
replacement_n_lines=insert_info.n_lines_added,
)
if new_flake8_output:
with_edits = wf.get_window_text(line_numbers=True, status_line=True, pre_post_line=True)
wf.undo_edit()
without_edits = wf.get_window_text(line_numbers=True, status_line=True, pre_post_line=True)
print(
_LINT_ERROR_TEMPLATE.format(
errors=new_flake8_output, window_applied=with_edits, window_original=without_edits
)
)
print(RETRY_WITH_OUTPUT_TOKEN)
exit(4)
wf.print_window()
if __name__ == "__main__":
main(**vars(get_parser().parse_args()))

View File

@@ -0,0 +1,60 @@
tools:
edit:
signature: |
edit <search> <replace> [<replace-all>]
docstring: >
Replace first occurrence of <search> with <replace> in the currently displayed lines.
If replace-all is True , replace all occurrences of <search> with <replace>.
For example, if you are looking at this file:
def fct():
print("Hello world")
and you want to edit the file to read:
def fct():
print("Hello")
print("world")
you can search for `Hello world` and replace with `"Hello"\n print("world")`
(note the extra spaces before the print statement!).
Tips:
1. Always include proper whitespace/indentation
2. When you are adding an if/with/try statement, you need to INDENT the block that follows, so make sure to include it in both your search and replace strings!
3. If you are wrapping code in a try statement, make sure to also add an 'except' or 'finally' block.
Before every edit, please
1. Explain the code you want to edit and why it is causing the problem
2. Explain the edit you want to make and how it fixes the problem
3. Explain how the edit does not break existing functionality
arguments:
- name: search
type: string
description: "the text to search for (make sure to include proper whitespace if needed)"
required: true
- name: replace
type: string
description: "the text to replace the search with (make sure to include proper whitespace if needed)"
required: true
- name: replace-all
type: boolean
description: "replace all occurrences rather than the first occurrence within the displayed lines"
required: false
insert:
signature: |
insert <text> [<line>]
docstring: >
Insert <text> at the end of the currently opened file or after <line> if specified.
arguments:
- name: text
type: string
description: "the text to insert"
required: true
- name: line
type: integer
description: "the line number to insert the text as new lines after"
required: false

View File

@@ -0,0 +1,5 @@
_write_env "CURRENT_FILE" "${CURRENT_FILE:-}"
_write_env "CURRENT_LINE" "${CURRENT_LINE:-0}"
_write_env "WINDOW" "$WINDOW"
pip install flake8

View File

@@ -0,0 +1,78 @@
#!/usr/bin/env python3
import argparse
from windowed_file import FileNotOpened, WindowedFile # type: ignore
from flake8_utils import flake8, format_flake8_output # type: ignore
_LINT_ERROR_TEMPLATE = """
Your proposed edit has introduced new syntax error(s). Please read this error message carefully and then retry editing the file.
ERRORS:
{errors}
This is how your edit would have looked if applied
------------------------------------------------
{window_applied}
------------------------------------------------
This is the original code before your edit
------------------------------------------------
{window_original}
------------------------------------------------
Your changes have NOT been applied. Please fix your edit command and try again.
DO NOT re-run the same failed edit command. Running it again will lead to the same error.
"""
_SUCCESS_MSG = "Edit successful."
def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument("replace", type=str)
return parser
def main(replace: str):
try:
wf = WindowedFile(exit_on_exception=False)
except FileNotOpened:
print("No file opened. Either `open` a file first.")
exit(1)
pre_edit_lint = flake8(wf.path)
start_line, end_line = wf.line_range
replace_lines = len(replace.split("\n"))
wf.set_window_text(replace)
post_edit_lint = flake8(wf.path)
replacement_window = (
start_line,
end_line,
)
new_flake8_output = format_flake8_output(
post_edit_lint,
previous_errors_string=pre_edit_lint,
replacement_window=replacement_window,
replacement_n_lines=replace_lines,
)
if new_flake8_output:
with_edits = wf.get_window_text(line_numbers=True, status_line=True, pre_post_line=True)
wf.undo_edit()
without_edits = wf.get_window_text(line_numbers=True, status_line=True, pre_post_line=True)
print(
_LINT_ERROR_TEMPLATE.format(
errors=new_flake8_output, window_applied=with_edits, window_original=without_edits
)
)
exit(4)
print(_SUCCESS_MSG)
if __name__ == "__main__":
main(**vars(get_parser().parse_args()))

View File

@@ -0,0 +1,11 @@
tools:
edit:
signature: |
edit <text>
docstring: >
Replace the currently displayed lines with <text>.
arguments:
- name: text
type: string
description: "the text to replace the currently displayed lines with"
required: true

View File

@@ -0,0 +1,5 @@
_write_env "CURRENT_FILE" "${CURRENT_FILE:-}"
_write_env "CURRENT_LINE" "${CURRENT_LINE:-0}"
_write_env "WINDOW" "$WINDOW"
pip install flake8