This post is mainly based on

Why LangChain Expression Language?

  • Advantage
    • Enable building complex chains from basic components (runnables)
    • Bridge prototyping and production, with no code changes
  • Support
    • Streaming: for short time-to-first-token
    • Async call: any chain can be called with the sync (Jupyter) and async (LangServe) API
    • Parallel execution: for fetch from multiple retrievers or majority vote
    • Retries and fallbacks: for reliable & scalable
    • Access intermediate results
    • Data schemas: input and output Pydantic schemas inferred from the structure of the chain
  • Eco system
    • LangSmith tracing integration
    • LangServe deployment integration

Runnables

  • What is a runnable
    • An interface that defines a common set of invocation methods (invoke, batch, stream, ainvoke)
    • Every LCEL object (model, prompt, I/O) implements the Runnable interface
  • Runnables Composition
    • Runnables can be composed into chains
    • Every chain of runnable itself is a runnable
    • Therefore, chains of runnables automatically support all invocations
  • Advantage: easy to compose chains, parallelize components, add fallbacks via runnables

Composition of Runnables

Define Chain

| is LCEL’s runnable composition operator

prompt = ChatPromptTemplate.from_template("Tell me a short joke about {topic}")
output_parser = StrOutputParser()
model = ChatOpenAI(model="gpt-3.5-turbo")

chain = (
    {"topic": RunnablePassthrough()} 
    | prompt
    | model
    | output_parser
)

Inspect Chain

chain.get_graph()
chain.get_graph().print_ascii()
chain.get_prompts()

Invocation Methods

Invoke

chain.invoke("ice cream")

Stream

for chunk in chain.stream("ice cream"):
	print(chunk, end="", flush=True)

Batch

chain.batch(["ice cream", "spaghetti", "dumplings"])

Async

chain.ainvoke("ice cream")

Each runnable can be invoked independently

RunnablePassthrough().invoke('ice cream')
prompt.invoke({"topic": "ice cream"})
model.invoke([HumanMessage(content='tell me a short joke about ice cream')])
StrOutputParser().invoke(AIMessage(content="output to question"))

Runnable Protocol

Runnables expose schematic information about their input, output and config

  • input_schema property
  • output_schema property
  • config_schema method

Example:

prompt.input_schema.schema()
model.output_schema.schema()
chain.input_schema.schema()
chain.output_schema.schema()

All Runnables expose additional methods that can be used to modify their behavior, for example:

  • Add a retry policy
  • Add lifecycle listeners
  • Make Runnables configurable

Runnables Input/Output Schema

Component Input Type Output Type
Prompt dict PromptValue
ChatModel string, [ChatMessage] or PromptValue ChatMessage
OutputParser The output of an LLM or ChatModel Defined by parser
Retriever string [Documents]
Tool string or dict, defined by tool Defined by tool

RunnablePassthrough()

  • Pass inputs unchanged or add extra keys
  • Typically used in conjuction with RunnableParallel to assign data to a new key in the map

Example:

runnable = RunnableParallel(
	passed=RunnablePassthrough(),
	extra=RunnablePassthrough.assign(mult=lambda x: x["num"] * 3),
	modified=lambda x: x["num"] + 1,
)

runnable.invoke({"num": 1})

Output:

{
	'passed': {'num': 1}, 
	'extra': {'num': 1, 'mult': 3}, 
	'modified': 2
}

RunnableLambda()

  • Convert arbitrary functions into runnables
  • Accept SINGLE argument, use dict or write a wrapper for multiple arguments function

Example with a retry policy

def add_one(x: int) -> int:
	return x + 1

# Buggy code that will fail 50% of the time
def buggy_double(y: int) -> int:
	if random.random() > 0.5:
		print('This code failed, and will probably be retried!')
		raise ValueError('Error: buggy_double error')
	return y * 2

chain = (
	RunnableLambda(add_one) |
	RunnableLambda(buggy_double).with_retry(stop_after_attempt=10, wait_exponential_jitter=False)
)

print(chain.invoke(2))

Output

This code failed, and will probably be retried!
This code failed, and will probably be retried!
This code failed, and will probably be retried!
6

Chain Composition Operator

  • RunnableSequence
    • Invokes runnables sequentially, with one runnable’s output feed as the next runnable’s input
    • Construct using the | operator or by passing a list of runnables to RunnableSequence()
  • RunnableParallel
    • Invokes runnables concurrently, providing the same input to each
  • RunnableBranch
    • Dynamically select chain, depending on previous chain’s output

RunnableParallel()

  • Construct using a dict literal within a sequence or by passing a dict to RunnableParallel

Parallel Chain Example

model = ChatOpenAI()
joke_chain = (ChatPromptTemplate.from_template("tell me a joke about {topic}") | model)
poem_chain = (ChatPromptTemplate.from_template("write a 2-line poem about {topic}") | model)

map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)

map_chain.invoke({"topic": "bear"})

RunnableBranch()

  • Initialized with a list of (condition, runnable)
  • For the first condition to evaluate to True, run the corresponding runnable

RunnableBranch Example

Input chain:

prompt_str = """Given the user question below, 
classify it as either being about: `LangChain`, `Anthropic`, or `Other`. 

Do not respond with more than one word.

<question>{question}</question>

Classification:"""

chain = (
    PromptTemplate.from_template(prompt_str)
    | ChatAnthropic()
    | StrOutputParser()
)

Decision chain:

branch = RunnableBranch(
    (lambda x: "anthropic" in x["topic"].lower(), anthropic_chain),
    (lambda x: "langchain" in x["topic"].lower(), langchain_chain),
    general_chain,
)

Combine:

full_chain = {"topic": chain, "question": lambda x: x["question"]} | branch

full_chain.invoke({"question": "how do I use Anthropic?"})
full_chain.invoke({"question": "how do I use LangChain?"})
full_chain.invoke({"question": "whats 2 + 2"})

Alternative Example: using RunnableLambda to route between different outputs

def route(info):
    if "anthropic" in info["topic"].lower():
        return anthropic_chain
    elif "langchain" in info["topic"].lower():
        return langchain_chain
    else:
        return general_chain

full_chain = {"topic": chain, "question": lambda x: x["question"]} | RunnableLambda(route)

More Examples

Structured RAG Example

template = """Answer the question based only on the following context:
{context}

Question: {question}

Answer in the following language: {language}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
	["harrison worked at kensho", "bears like to eat honey"],
	embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()

model = ChatOpenAI()
output_parser = StrOutputParser()

chain = (
    {
        "context": itemgetter("question") | retriever,
        "question": itemgetter("question"),
        "language": itemgetter("language"),
    }
    | prompt
    | model
    | StrOutputParser()
)

chain.invoke({"question": "where did harrison work", "language": "italian"})

Here

  • itemgetter() get key-value from invoking json and extract data from the map
  • "context": itemgetter("question") | retriever is an individual chain
  • The first runnable in chain map the {question, language} input to {context, question, language} output

Formatting Output with JsonOutputParser()

  • JsonOutputParser operate on the input stream, and attempt to “auto-complete” the partial json into a valid state.
  • Allows users to specify an arbitrary JSON schema and query LLMs for outputs that conform to that schema

Example (no schema)

sop = StrOutputParser()
jop = JsonOutputParser()

partial_json = '{"setup": "Why don\'t scientists trust atoms?", "punchline": "Becau'
print(sop.invoke(partial_json))
print(jop.invoke(partial_json))

Output:

{'setup': "Why don't scientists trust atoms?", 'punchline': 'Becau
{'setup': "Why don't scientists trust atoms?", 'punchline': 'Becau'}

Example (with schema)

class Box(BaseModel):
    height: str = Field(description="Box height")
    width: str = Field(description="Box width")

parser = JsonOutputParser(pydantic_object=Box)

prompt = PromptTemplate(
    template="Answer the user query.\n{format_instructions}\n{query}\n",
    input_variables=["query"],
    partial_variables={"format_instructions": parser.get_format_instructions()},
)

chain = prompt | model

chain.invoke({"query": "A Rectangle with dimension 10x5"})

Output:

AIMessage(content='{"height": "10", "width": "5"}')

In the example above, the parser.get_format_instructions() convert JsonOutputParser(Box) into json formatting prompt.

Configure Chain Internals at Runtime

Runtime Configurability

  • Using Runtime Configurable, we can create uniform interface for different
    • Model parameters (e.g., temperature)
    • Chat models
    • Prompts
  • Example
    • One configurable_model can be configured to call different chat API in runtime
    • The ConfigurableField lets you configure particular fields of a runnable
    • The configurable_alternatives list out alternatives for any particular runnable that can be set during runtime

Example

configurable_model = ChatAnthropic().configurable_alternatives(
    ConfigurableField(id="model"), 
    default_key="anthropic", 
    openai=ChatOpenAI(),
    gpt4=ChatOpenAI(model="gpt-4"),
)

configurable_chain = ({"topic": RunnablePassthrough()} | prompt | configurable_model)

Invoke with default or alternative model:

configurable_chain.invoke("ice cream")

configurable_chain.invoke(
    "ice cream", 
    config={"model": "openai"}
)

stream = configurable_chain.stream(
    "ice cream", 
    config={"model": "gpt4"}
)

Fallbacks

In case one API is down, can fallback to alternative API

chain_with_fallback = openai_chain.with_fallbacks([anthropic_chain])
chain_with_fallback.invoke("ice cream")

@chain Decorator

  • Convert an arbitrary function into a chain with @chain decorator
  • Equivalent to wrapping function in a RunnableLambda
  • Advantage
    • Improved observability in LangSmith tracing
    • Any calls to runnables inside this function will be traced as nested childen

Example

prompt_story = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
prompt_topic = ChatPromptTemplate.from_template("What is the subject of this joke: {joke}")

@chain
def custom_chain(text):
    prompt_story_val = prompt_story.invoke({"topic": text})
    output_story = ChatOpenAI().invoke(prompt_story_val)
    parsed_story = StrOutputParser().invoke(output_story)

    chain_topic = prompt_topic | ChatOpenAI() | StrOutputParser()
    return chain_topic.invoke({"joke": parsed_story})

Add Chat History with MessagesPlaceholder()

Example

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "You're an assistant who's good at {ability}"),
        MessagesPlaceholder(variable_name="history"),
        ("human", "{question}"),
    ]
)

chain = prompt | ChatOpenAI()

chat_history = ChatMessageHistory()
chat_history.add_message(HumanMessage(content='What does cosine mean?'))
chat_history.add_ai_message('Cosine is a mathematical function commonly used in trigonometry. It calculates the ratio of the length of the side adjacent to an angle in a right triangle, to the length of the hypotenuse.')

chain.invoke({
	"ability": "math", 
	"question": "What's its inverse", 
	"history": chat_history.messages
})

Output:

AIMessage(content='The inverse of the cosine function is called the arccosine or inverse cosine. It is denoted as acos(x) or cos^(-1)(x). It helps determine the angle whose cosine is a given value.')