Source code for agntcy_acp.langgraph.io_mapper

# Copyright AGNTCY Contributors (https://github.com/agntcy)
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import Union

from agntcy_iomapper import IOMappingAgent, IOMappingAgentMetadata
from langchain_core.language_models import BaseChatModel
from langgraph.graph import StateGraph

from agntcy_acp.langgraph import acp_node

logger = logging.getLogger(__name__)


[docs] def add_io_mapped_edge( g: StateGraph, start: Union[str, acp_node.ACPNode], end: Union[str, acp_node.ACPNode], iomapper_config: IOMappingAgentMetadata, llm: BaseChatModel, ) -> IOMappingAgent: """ Adds an I/O-mapped edge to a LangGraph StateGraph. Parameters: g: The LangGraph StateGraph to which the edge will be added. start: The starting node of the edge, which can be specified either as a string identifier or as an instance of an ACPNode. end: The ending node of the edge, which can be specified either as a string identifier or as an instance of an ACPNode. iomapper_config: A dictionary containing all the metadata necessary for the IO mapper agent to perform data translation. Defaults to an empty dictionary. llm: An instance of llm model Returns: None: This function modifies the graph in place by adding the specified edge. """ if isinstance(start, str): start_key = start else: start_key = start.get_name() node: acp_node.ACPNode = start if "input_fields" not in iomapper_config: iomapper_config["input_fields"] = [node.outputPath] if isinstance(end, str): end_key = end else: end_key = end.get_name() node: acp_node.ACPNode = end if "output_fields" not in iomapper_config: iomapper_config["output_fields"] = [node.inputPath] mapping_agent = IOMappingAgent(metadata=iomapper_config, llm=llm) iom_name = f"{start_key}_{end_key}" g.add_node(iom_name, mapping_agent.langgraph_node) g.add_edge(start_key, iom_name) g.add_edge(iom_name, end_key) return mapping_agent
[docs] def add_io_mapped_conditional_edge( g: StateGraph, start: Union[str, acp_node.ACPNode], path, iomapper_config_map: dict, llm: BaseChatModel, ) -> dict: """ Adds a conditional I/O-mapped edge to a LangGraph StateGraph. Parameters: g: The LangGraph StateGraph to which the conditional edge will be added. start: The starting node of the edge, which can be specified either as a string identifier or as an instance of an ACPNode. path: The conditional path that determines the conditions under which the edge will be traversed. The type and structure of 'path' should be specified based on its use case. iomapper_config_map: A dictionary containing metadata that the IO mapper agent requires for data translation. This map is used to configure the agent based on different conditions. llm: An instance of llm model Returns: None: This function modifies the graph in place by adding the specified conditional edge. """ start_node = None if isinstance(start, str): start_key = start else: start_key = start.get_name() start_node: acp_node.ACPNode = start condition_map = {} iom_map = {} for map_key, v in iomapper_config_map.items(): end_node = None if isinstance(v["end"], str): end_key = v["end"] else: end_key = v["end"].get_name() end_node = v["end"] if v["metadata"] is None: # No IO Mapper is needed condition_map[map_key] = end_key else: if start_node and "input_fields" not in v["metadata"]: v["metadata"]["input_fields"] = [start_node.outputPath] if end_node and "output_fields" not in v["metadata"]: v["metadata"]["output_fields"] = [end_node.inputPath] mapping_agent = IOMappingAgent(metadata=v["metadata"], llm=llm) iom_name = f"{start_key}_{end_key}" g.add_node(iom_name, mapping_agent.langgraph_node) g.add_edge(iom_name, end_key) iom_map[end_key] = mapping_agent condition_map[map_key] = iom_name g.add_conditional_edges(start_key, path, condition_map) return iom_map