Complete code listing for the same explained in Orchestrating Salesforce with LangGraph + OpenAI
import os
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from langchain_openai import ChatOpenAI
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import tools_condition, ToolNode
from langgraph.graph import MessagesState
from langchain_core.messages import HumanMessage, SystemMessage
from dotenv import load_dotenv
load_dotenv()
# --- Optional: real Salesforce client if env vars present; else fall back to mock
USE_REAL_SF = all(k in os.environ for k in ["USER_NAME", "PASSWORD", "SECURITY_TOKEN"])
sf_client = None
if USE_REAL_SF:
try:
from simple_salesforce import Salesforce # pip install simple-salesforce
sf_client = Salesforce(
username=os.environ["USER_NAME"],
password=os.environ["PASSWORD"],
security_token=os.environ["SECURITY_TOKEN"],
domain=os.getenv("SF_DOMAIN", "login") # or 'test' for sandboxes
)
print("[SF] Connected to Salesforce org.")
except Exception as e:
print(f"[SF] Could not connect, falling back to mock. Reason: {e}")
sf_client = None
USE_REAL_SF = False
else:
print("[SF] Env vars not set; using mock Salesforce.")
# --- Tool 1: SOQL query
def sf_query_soql(query: str) -> List[Dict[str, Any]]:
\"\"\"
Run a SOQL query. Returns a list of records (dicts).
\"\"\"
if USE_REAL_SF and sf_client:
res = sf_client.query(query)
return res.get("records", [])
# Mock result
if "FROM Account" in query and "Acme" in query:
return [{"Id": "001xx000003AcmeAAA", "Name": "Acme Corp"}]
return []
# --- Tool 2: Create Case
def sf_create_case(account_id: str, subject: str, priority: str = "Medium",
origin: str = "Web") -> Dict[str, Any]:
\"\"\"
Create a Case tied to an Account.
\"\"\"
if USE_REAL_SF and sf_client:
res = sf_client.Case.create({
"AccountId": account_id,
"Subject": subject,
"Priority": priority,
"Origin": origin,
"Status": "New"
})
# Return a consistent shape
return {"success": res.get("success", False), "id": res.get("id")}
# Mock
return {"success": True, "id": "500xx00000MockCase"}
# --- Tool 3: Create Task
def sf_create_task(what_id: str, subject: str, due_date_iso: Optional[str] = None,
status: str = "Not Started", priority: str = "Normal") -> Dict[str, Any]:
\"\"\"
Create a Task (activity). 'what_id' can be Case Id, Opportunity Id, etc.
\"\"\"
if due_date_iso is None:
due_date_iso = (datetime.utcnow() + timedelta(days=1)).strftime("%Y-%m-%d")
if USE_REAL_SF and sf_client:
res = sf_client.Task.create({
"WhatId": what_id,
"Subject": subject,
"ActivityDate": due_date_iso,
"Status": status,
"Priority": priority
})
return {"success": res.get("success", False), "id": res.get("id")}
# Mock
return {"success": True, "id": "00Txx00000MockTask", "due": due_date_iso}
# --- LangChain model with tools
tools = [sf_query_soql, sf_create_case, sf_create_task]
llm = ChatOpenAI(model="gpt-4o")
# Sequential tool calls are safer for multi-step SF workflows
llm_with_tools = llm.bind_tools(tools, parallel_tool_calls=False)
# --- System message (keeps the assistant focused)
sys_msg = SystemMessage(content=(
"You are a Salesforce assistant. Given natural language tasks, decide when to:\\n"
"- Query Accounts/records via SOQL\\n"
"- Create a Case associated with an Account\\n"
"- Create a Task (follow-up) associated with a Case\\n"
"Ask for missing details if essential. Use tools to get IDs before creating records."
))
# --- Assistant node (unchanged structure)
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}
# --- Build the graph
builder = StateGraph(MessagesState)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "assistant")
builder.add_conditional_edges("assistant", tools_condition) # routes to tools if the LLM called one
builder.add_edge("tools", "assistant")
react_graph = builder.compile()
# --- (Optional) Visualize the graph with Mermaid PNG
try:
png_bytes = react_graph.get_graph(xray=True).draw_mermaid_png()
os.makedirs("images", exist_ok=True)
with open("images/sf_graph.png", "wb") as f:
f.write(png_bytes)
print("Graph saved as images/sf_graph.png")
except Exception as e:
print(f"(Skipping graph render) {e}")
# --- Run a demo
user_request = (
"Find the Account named 'Burlington Textiles Corp of America'. Create a High-priority Case titled "
"'Customers cannot log in'. Then create a follow-up Task for tomorrow titled "
"'Call customer with workaround' on that Case."
)
messages = [HumanMessage(content=user_request)]
result_state = react_graph.invoke({"messages": messages})
for m in result_state["messages"]:
m.pretty_print()