Dispatch the message to user and tools call.
It automatically parses the "to_user" parameter in function calls,
removes it from the function's parameter list, and sends it to the "to_user" generator.
The name of the parameter corresponding to "to_user" can be specified by to_user_flag.
The remaining function call parameters are sent to the 'function_call' generator.
source: The source of the message to be parsed, which must be an asynchronous iterator.
to_user_flag: Specify the name of the parameter that represents the to_user part.
logger: A Logger.
Returns:
Name | Type |
Description |
make_generator |
Callable[[Literal['to_user', 'function_call']], AsyncGenerator]
|
Factory function, you can pass it the name string of the generator
to obtain the corresponding generator.
|
Example
make_generator_for_roles = await async_dispatcher_tools_call_for_openai(an_async_iterator)
to_user_generator = make_generator_for_roles("to_user")
function_call_generator = make_generator_for_roles("function_call")
Source code in src/agere/utils/dispatcher.py
| async def async_dispatcher_tools_call_for_openai(
source: AsyncIterator,
to_user_flag: str = "to_user",
logger: logging.Logger | None = None,
) -> Callable[[Literal["to_user", "tool_call"]], AsyncGenerator]:
"""Dispatch the message to user and tools call.
It automatically parses the "to_user" parameter in function calls,
removes it from the function's parameter list, and sends it to the "to_user" generator.
The name of the parameter corresponding to "to_user" can be specified by to_user_flag.
The remaining function call parameters are sent to the 'function_call' generator.
source: The source of the message to be parsed, which must be an asynchronous iterator.
to_user_flag: Specify the name of the parameter that represents the to_user part.
logger: A Logger.
Returns:
make_generator (Callable[[Literal["to_user", "function_call"]], AsyncGenerator]):
Factory function, you can pass it the name string of the generator
to obtain the corresponding generator.
Example:
>>> make_generator_for_roles = await async_dispatcher_tools_call_for_openai(an_async_iterator)
>>> to_user_generator = make_generator_for_roles("to_user")
>>> function_call_generator = make_generator_for_roles("function_call")
"""
logger = logger or get_null_logger()
to_user_queue = asyncio.Queue()
function_call_queue = asyncio.Queue()
async def splitter():
buffer: str = ''
tool_call_info: str = ''
before_to_user_content: str = ''
after_to_user_content: str = ''
to_user_start_active: bool = False
to_user_end_active: bool = False
to_user_key_start: int = 0
to_user_content_start: int = 0
find_to_user_content_start_position: int = 0
tool_call_index_now: int = 0
async def put_a_function():
await function_call_queue.put(
tool_call_info + before_to_user_content + after_to_user_content + "}"
)
async def do_check_to_user_end():
nonlocal buffer
nonlocal to_user_content_start
nonlocal to_user_content_end
nonlocal to_user_end_active
if to_user_end_active is True:
# to_user content finish
# Put the last to_user content in user queue
if (last_to_user := buffer[to_user_content_start + 1 : to_user_content_end]):
await to_user_queue.put(last_to_user)
# drop the following ", and possible empty characters
buffer = buffer[to_user_content_end + 2 :].lstrip()
else:
while to_user_content_start < len(buffer) - 1:
to_user_content_end = buffer.find('"', to_user_content_start + 1)
if to_user_content_end != -1 and buffer[to_user_content_end - 1] != '\\':
to_user_end_active = True
await do_check_to_user_end()
break
elif to_user_content_end != -1:
# The found double quote does not meet the requirements, continue to search for the next one.
# Put the recent to_user content in user queue.
await to_user_queue.put(buffer[to_user_content_start + 1 : to_user_content_end + 1])
to_user_content_start = to_user_content_end
else:
# There is no end marker throughout the entire buffer.
# Put the recent to_user content in user queue.
await to_user_queue.put(buffer[to_user_content_start + 1 :])
to_user_content_start = len(buffer) - 1
break
async for chunk in source:
chunk_choice = chunk.choices[0]
chunk_tool_calls = chunk_choice.delta.tool_calls
if chunk_choice.finish_reason == "tool_calls":
if to_user_end_active == to_user_start_active:
# Put the last function call.
after_to_user_content = buffer
await put_a_function()
await to_user_queue.put(None)
await function_call_queue.put(None)
continue
if chunk_choice.finish_reason is not None:
await to_user_queue.put(None)
await function_call_queue.put(None)
continue
# Content is not None means no tools call, then put the "content" to user quequ and continue.
# If no tools call, every chunk will be handle here.
# Every chunk when no tools call.
content = chunk_choice.delta.content
if content is not None:
await to_user_queue.put(content)
continue
# This dispatcher only handle tool_calls and content, so other situation will be ignored.
# Fisrt chunk when call tools.
if chunk_tool_calls is None:
# Here includs: 1. the first chunk when call tools; 2. function call; 3. others
continue
chunk_tool_call = chunk_tool_calls[0]
# Get the name of the function called
# Second chunk when call tools.
if chunk_tool_call.type == 'function':
tool_call_index = chunk_tool_call.index
if tool_call_index_now != tool_call_index:
# The second and subsequent function calls.
after_to_user_content = buffer
if to_user_end_active is True: # The content of 'to_user' exists and is complete.
# In cases where there is information for the user, messages from different functions are separated by a newline.
await to_user_queue.put("\n")
if to_user_end_active == to_user_start_active: # Exclude the case where the parameter parsing is incomplete.
await put_a_function()
buffer = ''
to_user_start_active = False
to_user_end_active = False
function_name = chunk_tool_call.function.name
tool_call_info = f'{{"tool_call_index": {chunk_tool_call.index}, "tool_call_id": "{chunk_tool_call.id}", "name": "{function_name}", "arguments": '
tool_call_index_now = tool_call_index
continue
# Split the message to user and the function call arguments
arguments = chunk_tool_call.function.arguments
buffer += arguments
if to_user_end_active:
# After to_user content
continue
if to_user_start_active is True:
# In 'to_user' param:
to_user_content_end = buffer.find('"', to_user_content_start + 1)
if to_user_content_end != -1 and buffer[to_user_content_end - 1] != '\\':
to_user_end_active = True
await do_check_to_user_end()
continue
# Before to_user start flag is found.
to_user_key_start = buffer.find(f'"{to_user_flag}":')
if to_user_key_start == -1:
continue # Do not find the "to_user" key, continue to receive the next chunk.
# In 'to_user' param:
before_to_user_content = buffer[:to_user_key_start]
to_user_content_start = buffer.find(
'"',
find_to_user_content_start_position or to_user_key_start + len(to_user_flag) + 3,
)
if to_user_content_start != -1 and buffer[to_user_content_start - 1] != '\\':
# In the dictionary, the double quotes representing key-value are regular double quotes '"',
# while double quotes inside strings are escaped double quotes '\"'.
# We can determine whether the double quotes are in a string or represent a key-value pari by
# checking if the character before the double quotes is '\'.
# In content of to_user.
to_user_start_active = True
to_user_content_end = buffer.find('"', to_user_content_start + 1)
if to_user_content_end != -1 and buffer[to_user_content_end - 1] != '\\':
# Content of to_user finish.
to_user_end_active = True
await do_check_to_user_end()
elif to_user_content_start != -1: # pragma: no cover
# Under normal circumstances, the code would not execute to this point.
# This branch is used to handle the exceptional case
# where the string content of the 'to_user' is not immediately followed after '"to_user":'
find_to_user_content_start_position = to_user_content_start # pragma: no cover
role_queue_dict = {"to_user": to_user_queue, "tool_call": function_call_queue}
def make_generator(role_name: str):
async def generator():
while True:
value = await role_queue_dict[role_name].get()
if value is None: # End of the queue
break
yield value
return generator()
splitter_task = asyncio.create_task(splitter())
return make_generator
|