@dping28 Sure. I made a very simple workflow below just for the sake of testing:
Below is the code for the pipeline in Open WebUI. You just need to add your own credentials in the âPlaceYourOwnHereâ. Hope it works for you.
â
from typing import List, Union, Generator, Iterator, Callable, Any
from schemas import OpenAIChatMessage
import subprocess
import requests
import json
from urllib.parse import urlparse, urljoin
from pydantic import BaseModel, Field
import asyncio
import os
import uuid
class N8NAPIClient:
def init(self, N8N_API_BASE_URL: str, N8N_API_KEY: str) â None:
if not N8N_API_BASE_URL.startswith((âhttp://â, âhttps://â)):
N8N_API_BASE_URL = âhttps://â + N8N_API_BASE_URL
self.n8n_base_url = N8N_API_BASE_URL
self.n8n_api_key = N8N_API_KEY
def get_workflow(self, workflow_id: str) -> dict[str, Any]:
return self.GET(path=f'/api/v1/workflows/{workflow_id}')
def get_execution(self, workflow_id: str, execution_id: str):
return self.GET(path=f'/api/v1/executions/{execution_id}')
def trigger_webhook(self, webhook_id: str, input: str):
return self.POST(path=f'/webhook-test/{webhook_id}', data=input)
def chat(self, webhook: str, session_id: str, input: str):
return self.POST(path=f'/webhook/{webhook}/chat', data=json.dumps({
"action": "sendMessage",
"sessionId": session_id,
"chatInput": input,
}))
def POST(self, path: str, data: str) -> None:
headers = {
'content-type': 'application/json',
"x-n8n-api-key": self.n8n_api_key,
"cookie": f'n8n-auth={self.n8n_api_key}'
}
url = f'{self.n8n_base_url}{"//" if not path.startswith("/") else ""}{path}'
# Validate URL
parsed_url = urlparse(url)
if not all([parsed_url.scheme, parsed_url.netloc]):
raise ValueError(f"Invalid URL: {url}. Make sure it includes a scheme (http:// or https://) and a domain.")
resp = requests.post(url, headers=headers, data=data)
resp.raise_for_status()
return resp.json()
def GET(self, path: str) -> None:
headers = {
"x-n8n-api-key": self.n8n_api_key,
"cookie": f'n8n-auth={self.n8n_api_key}'
}
url = f'{self.n8n_base_url}{"//" if not path.startswith("/") else ""}{path}'
# Validate URL
parsed_url = urlparse(url)
if not all([parsed_url.scheme, parsed_url.netloc]):
raise ValueError(f"Invalid URL: {url}. Make sure it includes a scheme (http:// or https://) and a domain.")
resp = requests.get(url, headers=headers)
resp.raise_for_status()
return resp.json()
class Pipeline:
class Values(BaseModel):
N8N_API_BASE_URL: str = ââ
N8N_API_KEY: str = ââ
N8N_WORKFLOW_ID: str = ââ
def __init__(self):
self.name = "N8N sample pipeline"
self.values = self.Values(
**{
'N8N_API_BASE_URL': os.getenv("N8N_API_BASE_URL", "PlaceYourOwnHere"),
'N8N_API_KEY': os.getenv("N8N_API_KEY", "PlaceYourOwnHere"),
'N8N_WORKFLOW_ID': os.getenv("N8N_WORKFLOW_ID", "PlaceYourOwnHere"),
}
)
self.n8n = N8NAPIClient(N8N_API_BASE_URL=self.values.N8N_API_BASE_URL,
N8N_API_KEY=self.values.N8N_API_KEY)
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup:{self.name}")
pass
async def on_shutdown(self):
# This function is called when the server is stopped.
print(f"on_shutdown:{self.name}")
pass
def pipe(
self, user_message: str, model_id: str, messages: List[dict], body: dict
) -> Union[str, Generator, Iterator]:
# better to get the workflow id from a parameter here but just doing a quick and dirty approach here
#session_id = 'a191a4f1-4802-4dcf-8ded-07f3a82711f7'
session_id = str(uuid.uuid4()) # this should probably be unique per chat session to keep state, here we generate a new session id each time we call the chat function
response = self.n8n.chat(webhook='PlaceYourOwnHere', session_id=session_id, input=user_message)
return response['output']