How to Deploy Azure OpenAI Assistant on Databricks Agent Framework
Azure OpenAI Assistant offers an easy to build agent to retrieve documents and perform various Q&A tasks and summarisation, as needed.
But once we need to make this application available to our target audience that might not have access to our development environment, we have to start setting up the larger picture, such as:
- An UI/UX for your end users to access your agent
- Governance and permissions access of the agent
- Memory and Session management
- Monitoring your endpoint
- Collecting Feedbacks from your end users
The great thing is that Databricks actually have the capability to cover all of these points easily using Mosaic AI Agent Framework. Let’s look at how to achieve this technically.
Mosaic AI Agent Framework makes it easy for developers to take feedback about the GenAI application and rapidly iterate on changes to test every hypothesis. It basically deploys an UI accessible to your end users, and sets up evaluation and feedback collection pipelines automatically so you only have to worry about making your app perform better rather than the developing technical functionalities around it.
Let’s deep dive into how to integrate these two systems together seamlessly!
Steps
We’ll be following the steps illustrated below.
- First, authenticate and use the OpenAI Assistant in our Databricks notebook.
- Then, we’ll wrap the code in a custom MLFlow class called ChatModel which provides a standardized way to create production-ready conversational AI models.
- We will then log and register that custom model on MLFlow and store it in our Unity Catalog for added governance.
- And finally deploy it using the Mosaic Agent Framework.
Step 1. Authenticate and Use the Azure OpenAI Assistant
First step is to first authenticate and load our assistant on our Databricks notebook. We can do that with the OpenAI package (using openai==1.35.3). We’ll be storing our credentials in a separate environment file.
import os
from openai import AzureOpenAI
# Initialize the Azure OpenAI client
azure_client = AzureOpenAI(
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_key=os.environ["AZURE_OPENAI_API_KEY"],
api_version="2024-05-01-preview",
)
Because we’ll be using Langchain (version 0.3.0) to wrap the model, we can enable streaming by extending our langchain class and re-defining the stream function.
from langchain.agents.openai_assistant import OpenAIAssistantRunnable
class StreamingOpenAIAssistant(OpenAIAssistantRunnable):
def stream(self, input, azure_client, config = None, **kwargs):
client = azure_client
with client.beta.threads.runs.stream(
assistant_id=os.environ["AZURE_OPENAI_ASSISTANT_ID"], # Replace with your assistant ID
) as stream:
for text in stream.text_deltas:
yield text
Then have our streaming agent and test it for both normal invocation and streaming.
# Instantiate the agent
stream_agent = StreamingOpenAIAssistant(assistant_id=os.environ["AZURE_OPENAI_ASSISTANT_ID"], client=azure_client, as_agent=True, streaming=True)
input_example = {"content": "What is the best way to parallelize a model inference?"}
# Normal invocation
output = stream_agent.invoke(input_example)
print (output)
# Streaming invocation
output = stream_agent.stream(input_example)
for chunk in output:
print(chunk)
When running, the output from the streaming should trickle chunks by chunks. Seems all good here!
Step 2. Wrap the Agent in MLFlow ChatModel
Second step is to wrap our code into MLFlow ChatModel so that we can deploy it seamlessly on the Agent Framework. We essentially need to add our Azure OpenAI client authentication methods in the init function, write the predict and predict_stream functions.
from typing import Optional, Dict, List, Generator
from mlflow.pyfunc import ChatModel
from mlflow.types.llm import (
ChatCompletionRequest,
ChatCompletionResponse,
ChatCompletionChunk,
ChatMessage,
ChatChoice,
ChatParams,
ChatChoiceDelta,
ChatChunkChoice,
)
class MyAgent(ChatModel):
class StreamingOpenAIAssistant(OpenAIAssistantRunnable):
def stream(self, input, config = None, **kwargs):
# Initialize the Azure OpenAI client
azure_client = AzureOpenAI(
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_key=os.environ["AZURE_OPENAI_API_KEY"],
api_version="2024-05-01-preview"
)
client = azure_client # Or your Azure OpenAI client
with client.beta.threads.runs.stream(
assistant_id=os.environ["AZURE_OPENAI_ASSISTANT_ID"],
for text in stream.text_deltas:
yield text
def __init__(self):
# Initialize the Azure OpenAI client
self.azure_client = AzureOpenAI(
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_key=os.environ["AZURE_OPENAI_API_KEY"],
api_version="2024-05-01-preview",
)
self.stream_agent = self.StreamingOpenAIAssistant(assistant_id=os.environ["AZURE_OPENAI_ASSISTANT_ID"],
client=self.azure_client,
as_agent=True,
streaming=True)
"""
Defines a custom agent that processes ChatCompletionRequests
and returns ChatCompletionResponses.
"""
def predict(self, context, messages: list[ChatMessage], params: ChatParams) -> ChatCompletionResponse:
last_user_question_text = messages[-1].content
response_message = self.stream_agent.invoke({"content": last_user_question_text}).return_values["output"]
response_message = ChatMessage(
role="assistant",
content=(
response_message
)
)
return ChatCompletionResponse(
choices=[ChatChoice(message=response_message)]
)
"""
Helper for constructing a ChatCompletionChunk instance for wrapping streaming agent output
"""
def _create_chat_completion_chunk(self, content) -> ChatCompletionChunk:
return ChatCompletionChunk(
choices=[ChatChunkChoice(
delta=ChatChoiceDelta(
role="assistant",
content=content
)
)]
)
"""
Function to support stream prediction
"""
def predict_stream(
self, context, messages: List[ChatMessage], params: ChatParams
) -> Generator[ChatCompletionChunk, None, None]:
last_user_question_text = messages[-1].content
for chunk in self.stream_agent.stream({"content": last_user_question_text}):
yield self._create_chat_completion_chunk(chunk)
We can test our newly wrapped class by instantiating it and doing inference with it.
# Instantiating our MLFlow Agent
chatmodel_agent = MyAgent()
# Using our defined predict_stream function
msg_input_example = [ChatMessage(role="user", content="What is Databricks?")]
response = chatmodel_agent.predict_stream(context=None, messages=msg_input_example, params=None)
for msg in response:
print(msg)
The streaming response should come in the format of ChatCompletionChunk, which is the supported OpenAI format for streaming chunks.
Step 3. Log and Register the Custom ChatModel
For better reproducibility and model governance, we can now log and register our model. In order to do that, it is recommended to move all of our agent’s code into a single notebook or python script that will be used for deployment. Basically all the code in Step 2 would be moved in a separate script. Then, the following command would be ran on a driver notebook.
You can see the example in this Git Repos.
Then, we can start logging our model to MLFlow Experiments with mlflow.pyfunc.log_model and by pointing to the path where our agent notebook is located, here custom_agent_model. We also add the dependencies needed here, such as forcing the httpx package to the 0.27.2 version to fix some library dependencies issues known with langchain.
import os
import mlflow
with mlflow.start_run():
logged_agent_info=mlflow.pyfunc.log_model(
python_model=os.path.join(os.getcwd(), "custom_agent_model"),
artifact_path="agent",
extra_pip_requirements=["ydata-profiling==4.12.2",
"langchain-community==0.3.0",
"langchain==0.3.0",
"databricks-agents==0.15.0",
"mlflow>=2.20.0",
"httpx==0.27.2",
"protobuf==4.22"]
)
Then registering the model in Unity Catalog:
# Set Model Registry to Unity Catalog
mlflow.set_registry_uri("databricks-uc")
uc_model_name = f"my_catalog.my_schema.chatmodel_agent"
# Register to UC
uc_registered_model_info = mlflow.register_model(model_uri=logged_agent_info.model_uri, name=uc_model_name)
Step 4. Deploy the Model to Mosaic Agent Framework
Finally, we can deploy our model to Mosaic Agent Framework to deploy our Chat UI, feedback and monitoring capabilities!
from databricks import agents
# Deploy to enable the Review APP and create an API endpoint
# Note: scaling down to zero will provide unexpected behavior for the chat app. Set it to false for a prod-ready application.
deployment_info = agents.deploy(uc_model_name, model_version=1, scale_to_zero=True) # model_version=uc_registered_model_info.version
instructions_to_reviewer = f"""## Instructions for Testing the Assistant
Your inputs are invaluable for the development team. By providing detailed feedback and corrections, you help us fix issues and improve the overall quality of the application. We rely on your expertise to identify any gaps or areas needing enhancement."""
# Add the user-facing instructions to the Review App
agents.set_review_instructions(uc_model_name, instructions_to_reviewer)
After some deployment time for the endpoint to be available, we can now access and enjoy our agent on the UI !
Hope that was helpful, the git repository containing all the code will be available soon. Stay tuned!