-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathloop.py
308 lines (276 loc) · 11.7 KB
/
loop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
"""Agentic sampling loop that calls the Anthropic API and local implementation of anthropic-defined computer use tools."""
import sys
from collections.abc import Callable
from datetime import datetime
from typing import cast
import httpx
from anthropic import (
Anthropic,
AnthropicBedrock,
AnthropicVertex,
APIError,
APIResponseValidationError,
APIStatusError,
)
from playwright.sync_api import Page
from anthropic.types.beta import (
BetaCacheControlEphemeralParam,
BetaMessage,
BetaMessageParam,
BetaTextBlock,
BetaTextBlockParam,
BetaToolResultBlockParam,
BetaToolUseBlockParam,
)
from playwright_computer_use.async_api import PlaywrightToolbox, ToolResult
COMPUTER_USE_BETA_FLAG = {
"20241022": "computer-use-2024-10-22",
"20250124": "computer-use-2025-01-24",
}
PROMPT_CACHING_BETA_FLAG = "prompt-caching-2024-07-31"
# This system prompt is optimized for the Docker environment in this repository and
# specific tool combinations enabled.
# We encourage modifying this system prompt to ensure the model has context for the
# environment it is running in, and to provide any additional information that may be
# helpful for the task at hand.
SYSTEM_PROMPT = f"""<SYSTEM_CAPABILITY>
* You are utilising an firefox browser with internet access. The entirity of the task you are given can be solved by navigating from this web page.
* You can only use one page, and you can't open new tabs.
* When viewing a page it can be helpful to zoom out so that you can see everything on the page. Either that, or make sure you scroll down to see everything before deciding something isn't available.
* When using your computer function calls, they take a while to run and send back to you. Where possible/feasible, try to chain multiple of these calls all into one function calls request. At the end always ask for a screenshot, to make sure the state of the page is as you expect.
* The current date is {datetime.today().strftime("%A, %B %-d, %Y")}.
</SYSTEM_CAPABILITY>
"""
async def sampling_loop(
*,
model: str,
anthropic_client: Anthropic,
system_prompt: str = SYSTEM_PROMPT,
messages: list[BetaMessageParam],
page: Page,
tools: PlaywrightToolbox,
only_n_most_recent_images: int | None = None,
max_tokens: int = 4096,
enable_prompt_caching: bool = True,
verbose: bool = False,
):
"""Agentic sampling loop for the assistant/tool interaction of computer use."""
assert page is not None, "playwright page must be provided"
system = BetaTextBlockParam(
type="text",
text=system_prompt,
)
if verbose:
for message in messages:
if message["role"] == "user":
print(f"user > {message['content']}")
if message["role"] == "assistant":
for content_block in message["content"]:
if content_block["type"] == "text":
print(f"assistant > {content_block['text']}")
if message["role"] == "tool":
print(
f"tool call > {content_block['name']} {content_block['input']}"
)
while True:
enable_prompt_caching = False
betas = [COMPUTER_USE_BETA_FLAG[tools.beta_version]]
image_truncation_threshold = only_n_most_recent_images or 0
if enable_prompt_caching:
betas.append(PROMPT_CACHING_BETA_FLAG)
_inject_prompt_caching(messages)
# Because cached reads are 10% of the price, we don't think it's
# ever sensible to break the cache by truncating images
only_n_most_recent_images = 0
system["cache_control"] = {"type": "ephemeral"}
if only_n_most_recent_images:
_maybe_filter_to_n_most_recent_images(
messages,
only_n_most_recent_images,
min_removal_threshold=image_truncation_threshold,
)
# Call the API
# we use raw_response to provide debug information to streamlit. Your
# implementation may be able call the SDK directly with:
# `response = client.messages.create(...)` instead.
try:
if verbose:
sys.stdout.write("Calling Model")
sys.stdout.flush()
response = anthropic_client.beta.messages.create(
max_tokens=max_tokens,
messages=messages,
model=model,
system=[system],
tools=tools.to_params(),
betas=betas,
)
if verbose:
sys.stdout.write(
"\r\033[K"
) # Move to the beginning of the line and clear it
sys.stdout.flush()
except (APIStatusError, APIResponseValidationError) as e:
raise e
except APIError as e:
return [{"role": "system", "content": system_prompt}] + messages
response_params = _response_to_params(response)
messages.append(
{
"role": "assistant",
"content": response_params,
}
)
tool_result_content: list[BetaToolResultBlockParam] = []
for content_block in response_params:
if content_block["type"] == "tool_use":
if verbose:
print(
f"tool call > {content_block['name']} {content_block['input']}"
)
result = await tools.run_tool(
name=content_block["name"],
input=content_block["input"],
tool_use_id=content_block["id"],
)
tool_result_content.append(result)
if verbose and content_block["type"] == "text":
print(f"assistant > {content_block['text']}")
if not tool_result_content:
return [{"role": "system", "content": system_prompt}] + messages
messages.append({"content": tool_result_content, "role": "user"})
def anthropic_to_invariant(
messages: list[dict], keep_empty_tool_response: bool = False
) -> list[dict]:
"""Converts a list of messages from the Anthropic API to the Invariant API format."""
output = []
for message in messages:
if message["role"] == "system":
output.append({"role": "system", "content": message["content"]})
if message["role"] == "user":
if isinstance(message["content"], list):
for sub_message in message["content"]:
assert sub_message["type"] == "tool_result"
if sub_message["content"]:
assert len(sub_message["content"]) == 1
assert sub_message["content"][0]["type"] == "image"
output.append(
{
"role": "tool",
"content": "local_base64_img: "
+ sub_message["content"][0]["source"]["data"],
"tool_id": sub_message["tool_use_id"],
}
)
else:
if keep_empty_tool_response and any(
[sub_message[k] for k in sub_message]
):
output.append(
{
"role": "tool",
"content": {"is_error": True}
if sub_message["is_error"]
else {},
"tool_id": sub_message["tool_use_id"],
}
)
else:
output.append({"role": "user", "content": message["content"]})
if message["role"] == "assistant":
for sub_message in message["content"]:
if sub_message["type"] == "text":
output.append(
{"role": "assistant", "content": sub_message.get("text")}
)
if sub_message["type"] == "tool_use":
output.append(
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"tool_id": sub_message.get("id"),
"type": "function",
"function": {
"name": sub_message.get("name"),
"arguments": sub_message.get("input"),
},
}
],
}
)
return output
def _maybe_filter_to_n_most_recent_images(
messages: list[BetaMessageParam],
images_to_keep: int,
min_removal_threshold: int,
):
"""Only keep latest images.
With the assumption that images are screenshots that are of diminishing value as
the conversation progresses, remove all but the final `images_to_keep` tool_result
images in place, with a chunk of min_removal_threshold to reduce the amount we
break the implicit prompt cache.
"""
if images_to_keep is None:
return messages
tool_result_blocks = cast(
list[BetaToolResultBlockParam],
[
item
for message in messages
for item in (
message["content"] if isinstance(message["content"], list) else []
)
if isinstance(item, dict) and item.get("type") == "tool_result"
],
)
total_images = sum(
1
for tool_result in tool_result_blocks
for content in tool_result.get("content", [])
if isinstance(content, dict) and content.get("type") == "image"
)
images_to_remove = total_images - images_to_keep
# for better cache behavior, we want to remove in chunks
images_to_remove -= images_to_remove % min_removal_threshold
for tool_result in tool_result_blocks:
if isinstance(tool_result.get("content"), list):
new_content = []
for content in tool_result.get("content", []):
if isinstance(content, dict) and content.get("type") == "image":
if images_to_remove > 0:
images_to_remove -= 1
continue
new_content.append(content)
tool_result["content"] = new_content
def _response_to_params(
response: BetaMessage,
) -> list[BetaTextBlockParam | BetaToolUseBlockParam]:
res: list[BetaTextBlockParam | BetaToolUseBlockParam] = []
for block in response.content:
if isinstance(block, BetaTextBlock):
res.append({"type": "text", "text": block.text})
else:
res.append(cast(BetaToolUseBlockParam, block.model_dump()))
return res
def _inject_prompt_caching(
messages: list[BetaMessageParam],
):
"""Set cache breakpoints for the 3 most recent turns.
One cache breakpoint is left for tools/system prompt, to be shared across sessions
"""
breakpoints_remaining = 3
for message in reversed(messages):
if message["role"] == "user" and isinstance(
content := message["content"], list
):
if breakpoints_remaining:
breakpoints_remaining -= 1
content[-1]["cache_control"] = BetaCacheControlEphemeralParam(
{"type": "ephemeral"}
)
else:
content[-1].pop("cache_control", None)
# we'll only every have one extra turn per loop
break