Skip to content

How do I stream tool calls that have nested agent streaming responses? #1673

Open
@asher-aqi

Description

@asher-aqi

Question

Hi

The current high level setup:
I have an agent that has access to a tool called generate_curriculum and inside of said tool there are 2 agents that stream structured outputs.

The top level agent that calls the generate_curriculum tool is also process all the chunks to a data stream protocol helper to convert the agent's graph nodes into this format.

Here is the code for the top level agent.

   async def chat_stream(
        self,
        prompt: str,
        message_history: list,
        **kwargs: Any,
    ) -> AsyncGenerator[bytes, Any]:
        """Stream chat interaction with the agent asynchronously"""

        async def stream_messages():
            # Build the input message list
            input_messages = [prompt]

            deps = kwargs.get("deps", None)

            async with self.iter(
                input_messages, message_history=message_history, deps=deps
            ) as agent_run:
                # Handle intermediate stream responses
                async for node in agent_run:
                    async for chunk in to_data_stream_protocol(node, agent_run):
                        yield chunk

                # After stream completes, get final messages and return with last chunk
                final_messages = agent_run.result.new_messages_json()
                
                # Send final message with new messages for history
                yield chunk, final_messages

        return stream_messages()

Here is the generate_curriculum tool:

async def generate_curriculum(ctx: RunContext[LearningSession], complete_learning_goal_summary: str) -> AsyncGenerator[bytes, Any]:
    """
    This tool is used to generate a learning roadmap/curriculum for the user based on a comprehensive indepth learning goal that has been discovered through an interview process.

    Underneath the hood, this tool will use the TheoryManagerAgent to generate a title and description for the session that reflects the refined learning goal.
    And it will also use the TheoryManagerAgent to agentic research and generat a comprehensive roadmap.
    """
    import logging
    logger = logging.getLogger("app")

    # Update session title and description now that we have a refined learning goal
    theory_manager_agent = TheoryManagerAgent(
        model_name="openai:gpt-4.1-mini"
    )

    title_and_description = await theory_manager_agent.generate_title_and_description(session_data=StudySessionCreate(
        preferences=StudyPreferences(
            study_goal=complete_learning_goal_summary,
        )
    ))

    learning_session = ctx.deps.session

    learning_session.title = title_and_description.title
    learning_session.description = title_and_description.description
    learning_session.preferences = StudyPreferences(
        study_goal=complete_learning_goal_summary,
    )

    await learning_session.save()
    
    state = CurriculumState(learning_goal=complete_learning_goal_summary)
    state.initialize_agents()

    async def stream_curriculum(state):
        async for chunk in generate_topics(state):
            logger.info(chunk)
            if not chunk[0] and chunk[1] != []:
                yield chunk[1]
            else:
                state.topics = chunk[1]
                async for chunk in parallel_concept_generation(state.topics, state):
                    if chunk[0] != "state":
                        logger.info(chunk)
                        yield chunk
                    else:
                        state = chunk[1]

        roadmap = roadmap_assembly(state)

        mini_curriculum = MiniCurriculum(
            study_goal=complete_learning_goal_summary,
            roadmap=roadmap
        )

        yield mini_curriculum

        learning_session.mini_curriculum = mini_curriculum
        await learning_session.save()
    return stream_curriculum(state)

For the sake of reducing this question size, I've omitted the actual generation functions but just know that the agents inside them are being streamed like this

async with agent.run_stream(user_prompt=prompt, output_type=List[TopicNodeBase], deps=SearchData()) as result:
        async for message, last in result.stream_structured(debounce_by=0.01):  
            try:
                chunk = await result.validate_structured_output(  
                    message,
                    allow_partial=not last,
                )
            except ValidationError:
                continue
            yield last, chunk

The problem and current results:
The data is streaming fine in the format I want. I'm basically forcing all the chunks to be formatted to this format:
a:{{toolCallId:id, result:{chunk.json()}}}

All the chunks are serializing fine and it shows correctly in my network tab but once the stream ends I get this error.

pydantic_core._pydantic_core.PydanticSerializationError: Unable to serialize unknown type: <class 'async_generator'>

It seems to me that the issue is because PydanticAI is trying to handle the final tool result as if it is one object and I guess it doesn't naturally handle AsyncGenerators and quite frankly I can't really think of a way to yield the intermediate chunks while also satisfying PydanticAI's need for the fully serializable object that it intends to parse/validate.

The overall goal of this is to stream the output of each of the agents so I can display the generation process in my frontend components. Previously my functionality was a synchronous workflow made in a Pydantic Graph but I realized I can't stream with a graph.

I had also tried taking the logic out of each graph node's run function and manually stepping through the graph nodes at a high level and then using streaming versions of said logic in between. That also didn't work because the inner tool calls made by the agents in that graph didn't share the same run context as the higher level agent that called the tool which runs the graph. (The graph agents weren't updating the overall context's message history so it made the openai client fail on subsequent requests).

Thats why I'm trying a more flatten version that exists in the tool call itself without it having to be in a graph (which wasn't that necessary in the first place but it was organized nicely and allowed me to easily introduce quality check logic in between)

Full Traceback for the Async Generator serialization exception:

ERROR:    Exception in ASGI application
  + Exception Group Traceback (most recent call last):
  |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/_utils.py", line 76, in collapse_excgroups
  |     yield
  |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/responses.py", line 263, in __call__
  |     async with anyio.create_task_group() as task_group:
  |                ^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/uvicorn/protocols/http/httptools_impl.py", line 409, in run_asgi
    |     result = await app(  # type: ignore[func-returns-value]
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/uvicorn/middleware/proxy_headers.py", line 60, in __call__
    |     return await self.app(scope, receive, send)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/fastapi/applications.py", line 1054, in __call__
    |     await super().__call__(scope, receive, send)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/applications.py", line 112, in __call__
    |     await self.middleware_stack(scope, receive, send)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/middleware/errors.py", line 187, in __call__
    |     raise exc
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/middleware/errors.py", line 165, in __call__
    |     await self.app(scope, receive, _send)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/middleware/base.py", line 183, in __call__
    |     raise app_exc
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/middleware/base.py", line 141, in coro
    |     await self.app(scope, receive_or_disconnect, send_no_error)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/middleware/cors.py", line 93, in __call__
    |     await self.simple_response(scope, receive, send, request_headers=headers)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/middleware/cors.py", line 144, in simple_response
    |     await self.app(scope, receive, send)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/opentelemetry/instrumentation/asgi/__init__.py", line 743, in __call__
    |     await self.app(scope, otel_receive, otel_send)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
    |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    |     raise exc
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    |     await app(scope, receive, sender)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/routing.py", line 714, in __call__
    |     await self.middleware_stack(scope, receive, send)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/routing.py", line 734, in app
    |     await route.handle(scope, receive, send)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/routing.py", line 288, in handle
    |     await self.app(scope, receive, send)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/routing.py", line 76, in app
    |     await wrap_app_handling_exceptions(app, request)(scope, receive, send)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    |     raise exc
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    |     await app(scope, receive, sender)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/routing.py", line 74, in app
    |     await response(scope, receive, send)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/responses.py", line 262, in __call__
    |     with collapse_excgroups():
    |          ^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 158, in __exit__
    |     self.gen.throw(value)
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/_utils.py", line 82, in collapse_excgroups
    |     raise exc
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/responses.py", line 266, in wrap
    |     await func()
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/responses.py", line 246, in stream_response
    |     async for chunk in self.body_iterator:
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/app/services/study_session_service.py", line 684, in interview_chat_stream
    |     async for chunk in await stream:
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/app/services/learning/base_agent.py", line 113, in stream_messages
    |     async for chunk in to_data_stream_protocol(node, agent_run):
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/app/services/learning/agent_utils.py", line 104, in to_data_stream_protocol
    |     async with node.stream(run.ctx) as request_stream:
    |                ^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 210, in __aenter__
    |     return await anext(self.gen)
    |            ^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/_agent_graph.py", line 278, in stream
    |     async with self._stream(ctx) as streamed_response:
    |                ^^^^^^^^^^^^^^^^^
    |   File "/usr/local/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 210, in __aenter__
    |     return await anext(self.gen)
    |            ^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/_agent_graph.py", line 301, in _stream
    |     async with ctx.deps.model.request_stream(
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 210, in __aenter__
    |     return await anext(self.gen)
    |            ^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/models/instrumented.py", line 141, in request_stream
    |     async with super().request_stream(
    |                ^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 210, in __aenter__
    |     return await anext(self.gen)
    |            ^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/models/wrapper.py", line 37, in request_stream
    |     async with self.wrapped.request_stream(messages, model_settings, model_request_parameters) as response_stream:
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 210, in __aenter__
    |     return await anext(self.gen)
    |            ^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/models/openai.py", line 210, in request_stream
    |     response = await self._completions_create(
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/models/openai.py", line 264, in _completions_create
    |     openai_messages = await self._map_messages(messages)
    |                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/models/openai.py", line 331, in _map_messages
    |     async for item in self._map_user_message(message):
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/models/openai.py", line 394, in _map_user_message
    |     content=part.model_response_str(),
    |             ^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/messages.py", line 372, in model_response_str
    |     return tool_return_ta.dump_json(self.content).decode()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic/type_adapter.py", line 631, in dump_json
    |     return self.serializer.to_json(
    |            ^^^^^^^^^^^^^^^^^^^^^^^^
    | pydantic_core._pydantic_core.PydanticSerializationError: Unable to serialize unknown type: <class 'async_generator'>
    +------------------------------------

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/uvicorn/protocols/http/httptools_impl.py", line 409, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/uvicorn/middleware/proxy_headers.py", line 60, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/middleware/errors.py", line 187, in __call__
    raise exc
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/middleware/errors.py", line 165, in __call__
    await self.app(scope, receive, _send)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/middleware/base.py", line 183, in __call__
    raise app_exc
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/middleware/base.py", line 141, in coro
    await self.app(scope, receive_or_disconnect, send_no_error)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/middleware/cors.py", line 93, in __call__
    await self.simple_response(scope, receive, send, request_headers=headers)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/middleware/cors.py", line 144, in simple_response
    await self.app(scope, receive, send)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/opentelemetry/instrumentation/asgi/__init__.py", line 743, in __call__
    await self.app(scope, otel_receive, otel_send)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/routing.py", line 714, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/routing.py", line 734, in app
    await route.handle(scope, receive, send)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/routing.py", line 288, in handle
    await self.app(scope, receive, send)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/routing.py", line 76, in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/routing.py", line 74, in app
    await response(scope, receive, send)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/responses.py", line 262, in __call__
    with collapse_excgroups():
         ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 158, in __exit__
    self.gen.throw(value)
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/_utils.py", line 82, in collapse_excgroups
    raise exc
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/responses.py", line 266, in wrap
    await func()
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/starlette/responses.py", line 246, in stream_response
    async for chunk in self.body_iterator:
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/app/services/study_session_service.py", line 684, in interview_chat_stream
    async for chunk in await stream:
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/app/services/learning/base_agent.py", line 113, in stream_messages
    async for chunk in to_data_stream_protocol(node, agent_run):
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/app/services/learning/agent_utils.py", line 104, in to_data_stream_protocol
    async with node.stream(run.ctx) as request_stream:
               ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 210, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/_agent_graph.py", line 278, in stream
    async with self._stream(ctx) as streamed_response:
               ^^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 210, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/_agent_graph.py", line 301, in _stream
    async with ctx.deps.model.request_stream(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 210, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/models/instrumented.py", line 141, in request_stream
    async with super().request_stream(
               ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 210, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/models/wrapper.py", line 37, in request_stream
    async with self.wrapped.request_stream(messages, model_settings, model_request_parameters) as response_stream:
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 210, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/models/openai.py", line 210, in request_stream
    response = await self._completions_create(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/models/openai.py", line 264, in _completions_create
    openai_messages = await self._map_messages(messages)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/models/openai.py", line 331, in _map_messages
    async for item in self._map_user_message(message):
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/models/openai.py", line 394, in _map_user_message
    content=part.model_response_str(),
            ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic_ai/messages.py", line 372, in model_response_str
    return tool_return_ta.dump_json(self.content).decode()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/asher/Desktop/AcquiredIntelligence/Zettel/backend/study-session-service/.venv/lib/python3.12/site-packages/pydantic/type_adapter.py", line 631, in dump_json
    return self.serializer.to_json(
           ^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.PydanticSerializationError: Unable to serialize unknown type: <class 'async_generator'>

Additional Context

PydanticAI Version = 0.1.6
Python Version = 3.12.9
I don't think there is anymore relevant information.

Metadata

Metadata

Assignees

Labels

questionFurther information is requested

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions