LangChain Expression Language
This post is mainly based on
Why LangChain Expression Language?
- Advantage
- Enable building complex chains from basic components (runnables)
- Bridge prototyping and production, with no code changes
- Support
- Streaming: for short time-to-first-token
- Async call: any chain can be called with the sync (Jupyter) and async (LangServe) API
- Parallel execution: for fetch from multiple retrievers or majority vote
- Retries and fallbacks: for reliable & scalable
- Access intermediate results
- Data schemas: input and output Pydantic schemas inferred from the structure of the chain
- Eco system
- LangSmith tracing integration
- LangServe deployment integration
Runnables
- What is a runnable
- An interface that defines a common set of invocation methods (invoke, batch, stream, ainvoke)
- Every LCEL object (model, prompt, I/O) implements the Runnable interface
- Runnables Composition
- Runnables can be composed into chains
- Every chain of runnable itself is a runnable
- Therefore, chains of runnables automatically support all invocations
- Advantage: easy to compose chains, parallelize components, add fallbacks via runnables
Composition of Runnables
Define Chain
|
is LCEL’s runnable composition operator
prompt = ChatPromptTemplate.from_template("Tell me a short joke about {topic}")
output_parser = StrOutputParser()
model = ChatOpenAI(model="gpt-3.5-turbo")
chain = (
{"topic": RunnablePassthrough()}
| prompt
| model
| output_parser
)
Inspect Chain
chain.get_graph()
chain.get_graph().print_ascii()
chain.get_prompts()
Invocation Methods
Invoke
chain.invoke("ice cream")
Stream
for chunk in chain.stream("ice cream"):
print(chunk, end="", flush=True)
Batch
chain.batch(["ice cream", "spaghetti", "dumplings"])
Async
chain.ainvoke("ice cream")
Each runnable can be invoked independently
RunnablePassthrough().invoke('ice cream')
prompt.invoke({"topic": "ice cream"})
model.invoke([HumanMessage(content='tell me a short joke about ice cream')])
StrOutputParser().invoke(AIMessage(content="output to question"))
Runnable Protocol
Runnables expose schematic information about their input, output and config
- input_schema property
- output_schema property
- config_schema method
Example:
prompt.input_schema.schema()
model.output_schema.schema()
chain.input_schema.schema()
chain.output_schema.schema()
All Runnables expose additional methods that can be used to modify their behavior, for example:
- Add a retry policy
- Add lifecycle listeners
- Make Runnables configurable
Runnables Input/Output Schema
Component | Input Type | Output Type |
Prompt | dict |
PromptValue |
ChatModel | string , [ChatMessage] or PromptValue |
ChatMessage |
OutputParser | The output of an LLM or ChatModel | Defined by parser |
Retriever | string |
[Documents] |
Tool | string or dict , defined by tool |
Defined by tool |
RunnablePassthrough()
- Pass inputs unchanged or add extra keys
- Typically used in conjuction with RunnableParallel to assign data to a new key in the map
Example:
runnable = RunnableParallel(
passed=RunnablePassthrough(),
extra=RunnablePassthrough.assign(mult=lambda x: x["num"] * 3),
modified=lambda x: x["num"] + 1,
)
runnable.invoke({"num": 1})
Output:
{
'passed': {'num': 1},
'extra': {'num': 1, 'mult': 3},
'modified': 2
}
RunnableLambda()
- Convert arbitrary functions into runnables
- Accept SINGLE argument, use
dict
or write a wrapper for multiple arguments function
Example with a retry policy
def add_one(x: int) -> int:
return x + 1
# Buggy code that will fail 50% of the time
def buggy_double(y: int) -> int:
if random.random() > 0.5:
print('This code failed, and will probably be retried!')
raise ValueError('Error: buggy_double error')
return y * 2
chain = (
RunnableLambda(add_one) |
RunnableLambda(buggy_double).with_retry(stop_after_attempt=10, wait_exponential_jitter=False)
)
print(chain.invoke(2))
Output
This code failed, and will probably be retried!
This code failed, and will probably be retried!
This code failed, and will probably be retried!
6
Chain Composition Operator
RunnableSequence
- Invokes runnables sequentially, with one runnable’s output feed as the next runnable’s input
- Construct using the
|
operator or by passing a list of runnables toRunnableSequence()
RunnableParallel
- Invokes runnables concurrently, providing the same input to each
RunnableBranch
- Dynamically select chain, depending on previous chain’s output
RunnableParallel()
- Construct using a dict literal within a sequence or by passing a dict to RunnableParallel
Parallel Chain Example
model = ChatOpenAI()
joke_chain = (ChatPromptTemplate.from_template("tell me a joke about {topic}") | model)
poem_chain = (ChatPromptTemplate.from_template("write a 2-line poem about {topic}") | model)
map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)
map_chain.invoke({"topic": "bear"})
RunnableBranch()
- Initialized with a list of
(condition, runnable)
- For the first condition to evaluate to
True
, run the corresponding runnable
RunnableBranch Example
Input chain:
prompt_str = """Given the user question below,
classify it as either being about: `LangChain`, `Anthropic`, or `Other`.
Do not respond with more than one word.
<question>{question}</question>
Classification:"""
chain = (
PromptTemplate.from_template(prompt_str)
| ChatAnthropic()
| StrOutputParser()
)
Decision chain:
branch = RunnableBranch(
(lambda x: "anthropic" in x["topic"].lower(), anthropic_chain),
(lambda x: "langchain" in x["topic"].lower(), langchain_chain),
general_chain,
)
Combine:
full_chain = {"topic": chain, "question": lambda x: x["question"]} | branch
full_chain.invoke({"question": "how do I use Anthropic?"})
full_chain.invoke({"question": "how do I use LangChain?"})
full_chain.invoke({"question": "whats 2 + 2"})
Alternative Example: using RunnableLambda
to route between different outputs
def route(info):
if "anthropic" in info["topic"].lower():
return anthropic_chain
elif "langchain" in info["topic"].lower():
return langchain_chain
else:
return general_chain
full_chain = {"topic": chain, "question": lambda x: x["question"]} | RunnableLambda(route)
More Examples
Structured RAG Example
template = """Answer the question based only on the following context:
{context}
Question: {question}
Answer in the following language: {language}
"""
prompt = ChatPromptTemplate.from_template(template)
vectorstore = FAISS.from_texts(
["harrison worked at kensho", "bears like to eat honey"],
embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()
model = ChatOpenAI()
output_parser = StrOutputParser()
chain = (
{
"context": itemgetter("question") | retriever,
"question": itemgetter("question"),
"language": itemgetter("language"),
}
| prompt
| model
| StrOutputParser()
)
chain.invoke({"question": "where did harrison work", "language": "italian"})
Here
itemgetter()
get key-value from invoking json and extract data from the map"context": itemgetter("question") | retriever
is an individual chain- The first runnable in chain map the {question, language} input to {context, question, language} output
Formatting Output with JsonOutputParser()
JsonOutputParser
operate on the input stream, and attempt to “auto-complete” the partial json into a valid state.- Allows users to specify an arbitrary JSON schema and query LLMs for outputs that conform to that schema
Example (no schema)
sop = StrOutputParser()
jop = JsonOutputParser()
partial_json = '{"setup": "Why don\'t scientists trust atoms?", "punchline": "Becau'
print(sop.invoke(partial_json))
print(jop.invoke(partial_json))
Output:
{'setup': "Why don't scientists trust atoms?", 'punchline': 'Becau
{'setup': "Why don't scientists trust atoms?", 'punchline': 'Becau'}
Example (with schema)
class Box(BaseModel):
height: str = Field(description="Box height")
width: str = Field(description="Box width")
parser = JsonOutputParser(pydantic_object=Box)
prompt = PromptTemplate(
template="Answer the user query.\n{format_instructions}\n{query}\n",
input_variables=["query"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
chain = prompt | model
chain.invoke({"query": "A Rectangle with dimension 10x5"})
Output:
AIMessage(content='{"height": "10", "width": "5"}')
In the example above, the parser.get_format_instructions()
convert JsonOutputParser(Box)
into json formatting prompt.
Configure Chain Internals at Runtime
Runtime Configurability
- Using Runtime Configurable, we can create uniform interface for different
- Model parameters (e.g., temperature)
- Chat models
- Prompts
- Example
- One
configurable_model
can be configured to call different chat API in runtime - The
ConfigurableField
lets you configure particular fields of a runnable - The
configurable_alternatives
list out alternatives for any particular runnable that can be set during runtime
- One
Example
configurable_model = ChatAnthropic().configurable_alternatives(
ConfigurableField(id="model"),
default_key="anthropic",
openai=ChatOpenAI(),
gpt4=ChatOpenAI(model="gpt-4"),
)
configurable_chain = ({"topic": RunnablePassthrough()} | prompt | configurable_model)
Invoke with default or alternative model:
configurable_chain.invoke("ice cream")
configurable_chain.invoke(
"ice cream",
config={"model": "openai"}
)
stream = configurable_chain.stream(
"ice cream",
config={"model": "gpt4"}
)
Fallbacks
In case one API is down, can fallback to alternative API
chain_with_fallback = openai_chain.with_fallbacks([anthropic_chain])
chain_with_fallback.invoke("ice cream")
@chain Decorator
- Convert an arbitrary function into a chain with
@chain
decorator - Equivalent to wrapping function in a
RunnableLambda
- Advantage
- Improved observability in LangSmith tracing
- Any calls to runnables inside this function will be traced as nested childen
Example
prompt_story = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
prompt_topic = ChatPromptTemplate.from_template("What is the subject of this joke: {joke}")
@chain
def custom_chain(text):
prompt_story_val = prompt_story.invoke({"topic": text})
output_story = ChatOpenAI().invoke(prompt_story_val)
parsed_story = StrOutputParser().invoke(output_story)
chain_topic = prompt_topic | ChatOpenAI() | StrOutputParser()
return chain_topic.invoke({"joke": parsed_story})
Add Chat History with MessagesPlaceholder()
Example
prompt = ChatPromptTemplate.from_messages(
[
("system", "You're an assistant who's good at {ability}"),
MessagesPlaceholder(variable_name="history"),
("human", "{question}"),
]
)
chain = prompt | ChatOpenAI()
chat_history = ChatMessageHistory()
chat_history.add_message(HumanMessage(content='What does cosine mean?'))
chat_history.add_ai_message('Cosine is a mathematical function commonly used in trigonometry. It calculates the ratio of the length of the side adjacent to an angle in a right triangle, to the length of the hypotenuse.')
chain.invoke({
"ability": "math",
"question": "What's its inverse",
"history": chat_history.messages
})
Output:
AIMessage(content='The inverse of the cosine function is called the arccosine or inverse cosine. It is denoted as acos(x) or cos^(-1)(x). It helps determine the angle whose cosine is a given value.')