from openai import OpenAI
client = OpenAI(base_url="http://127.0.0.1:18181/v1", api_key="nexa")
completion = client.chat.completions.create(
model="NexaAI/Qwen3-VL-4B-Instruct-GGUF",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Give me a meal plan for today."}
],
temperature=0.7,
)
print(completion.choices[0].message.content)
Streaming Response
from openai import OpenAI
client = OpenAI(base_url="http://127.0.0.1:18181/v1", api_key="nexa")
response = client.chat.completions.create(
model="NexaAI/Qwen3-VL-4B-Instruct-GGUF",
messages=[
{"role": "system", "content": "You are helpful assistant."},
{"role": "user", "content": "Give me a meal plan for today."},
],
temperature=0.7,
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="")
UI with Chainlit
import chainlit as cl
from openai import AsyncOpenAI
# Configure the async OpenAI client
client = AsyncOpenAI(api_key="nexa", base_url="http://127.0.0.1:18181/v1")
settings = {
"model": "NexaAI/Qwen3-VL-4B-Instruct-GGUF",
"temperature": 0.7,
"max_tokens": 500,
"top_p": 1,
"frequency_penalty": 0,
"presence_penalty": 0
}
@cl.on_chat_start
def start_chat():
# Initialize message history
cl.user_session.set("message_history", [{"role": "system", "content": "You are a helpful chatbot."}])
@cl.on_message
async def main(message: cl.Message):
# Retrieve the message history from the session
message_history = cl.user_session.get("message_history")
message_history.append({"role": "user", "content": message.content})
# Create an initial empty message to send back to the user
msg = cl.Message(content="")
await msg.send()
# Use streaming to handle partial responses
stream = await client.chat.completions.create(messages=message_history, stream=True, **settings)
async for part in stream:
if token := part.choices[0].delta.content or "":
await msg.stream_token(token)
# Append the assistant's last response to the history
message_history.append({"role": "assistant", "content": msg.content})
cl.user_session.set("message_history", message_history)
# Update the message after streaming completion
await msg.update()
RAG (Retrieval Augmented Generation)
import chainlit as cl
from openai import AsyncOpenAI
import chromadb
from chromadb.config import Settings
from PyPDF2 import PdfReader
from sentence_transformers import SentenceTransformer
client = AsyncOpenAI(api_key="nexa", base_url="http://127.0.0.1:18181/v1")
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
settings = {
"model": "NexaAI/Qwen3-VL-4B-Instruct-GGUF",
"temperature": 0.7,
"max_tokens": 500,
"top_p": 1,
"frequency_penalty": 0,
"presence_penalty": 0
}
chroma_client = chromadb.Client(Settings(anonymized_telemetry=False))
def get_embedding(text):
return embedding_model.encode(text).tolist()
def chunk_text(text, chunk_size=500):
words = text.split()
chunks = []
for i in range(0, len(words), chunk_size):
chunk = " ".join(words[i:i + chunk_size])
chunks.append(chunk)
return chunks
@cl.on_chat_start
async def start_chat():
cl.user_session.set("message_history", [{"role": "system", "content": "You are a helpful assistant. Answer questions based on the provided context."}])
files = None
while files is None:
files = await cl.AskFileMessage(
content="Please upload a text file to begin!",
accept=["text/plain", "application/pdf", "text/markdown"],
max_size_mb=20,
timeout=180,
).send()
file = files[0]
msg = cl.Message(content=f"Processing `{file.name}`...")
await msg.send()
if file.name.endswith('.pdf'):
reader = PdfReader(file.path)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
else:
with open(file.path, "r", encoding="utf-8") as f:
text = f.read()
chunks = chunk_text(text)
try:
collection = chroma_client.get_collection(name="documents")
chroma_client.delete_collection(name="documents")
except Exception:
pass
collection = chroma_client.create_collection(name="documents")
for i, chunk in enumerate(chunks):
embedding = get_embedding(chunk)
collection.add(
embeddings=[embedding],
documents=[chunk],
ids=[f"chunk_{i}"]
)
cl.user_session.set("collection", collection)
msg.content = f"Processing `{file.name}` done. Indexed {len(chunks)} chunks. You can now ask questions!"
await msg.update()
@cl.on_message
async def main(message: cl.Message):
collection = cl.user_session.get("collection")
message_history = cl.user_session.get("message_history")
query_embedding = get_embedding(message.content)
results = collection.query(
query_embeddings=[query_embedding],
n_results=3
)
context = "\n\n".join(results["documents"][0])
temp_history = message_history.copy()
enhanced_message = f"Context:\n{context}\n\nQuestion: {message.content}"
temp_history.append({"role": "user", "content": enhanced_message})
msg = cl.Message(content="")
await msg.send()
stream = await client.chat.completions.create(messages=temp_history, stream=True, **settings)
async for part in stream:
if token := part.choices[0].delta.content or "":
await msg.stream_token(token)
message_history.append({"role": "user", "content": message.content})
message_history.append({"role": "assistant", "content": msg.content})
cl.user_session.set("message_history", message_history)
await msg.update()
<?php
/**
* Plugin Name: Your Plugin Name
* Plugin URI: https://wordpress.org/plugins/your-plugin/
* Description: A clear, concise description of what your plugin does.
* Version: 1.0.0
* Author: Your Name or Company
* Author URI: https://your-website.com/
* License: GPL-2.0-or-later
* License URI: https://www.gnu.org/licenses/gpl-2.0.html
* Text Domain: your-plugin
* Domain Path: /languages
*/// Prevent direct access
if (!defined('ABSPATH')) {
exit;
}
readme.txt Structure
The readme.txt file determines how your plugin appears on WordPress.org:
=== Plugin Name ===
Contributors: yourusername
Donate link: https://your-website.com/
Tags: tag1, tag2, tag3, tag4, tag5
Requires at least: 5.0
Tested up to: 6.8
Stable tag: 1.0.0
License: GPLv2 or later
License URI: https://www.gnu.org/licenses/gpl-2.0.html
Short description (max 150 characters)
== Description ==
Detailed description of your plugin...
== Installation ==
1. Upload the plugin files...
2. Activate the plugin...
3. Configure settings...
== Frequently Asked Questions ==
= Question 1 =
Answer 1
== Screenshots ==
1. Screenshot description
2. Another screenshot description
== Changelog ==
= 1.0.0 =
* Initial release
Important:
Maximum 5 tags
Keep “Tested up to” current (within 3 major versions)
// Check user permissions
if (!current_user_can('manage_options')) {
wp_die('Unauthorized access');
}
5. SQL Queries
global $wpdb;
// ❌ NEVER do this (SQL injection risk)
$results = $wpdb->get_results("SELECT * FROM table WHERE id = {$_GET['id']}");
// ✅ ALWAYS use prepared statements
$results = $wpdb->get_results($wpdb->prepare(
"SELECT * FROM {$wpdb->prefix}table WHERE id = %d",
absint($_GET['id'])
));
WordPress.org Submission Requirements
1. External Services Documentation
If your plugin connects to external APIs, you MUST document it in readme.txt:
== External Services ==
This plugin connects to the [Service Name] API to provide [functionality].
**Service Used:** Service Name (https://api.example.com/)
**Purpose:** The plugin sends [what data] to [service] to [why].
**Data Sent:** When a user [action], the following data is transmitted:
- User's [data type 1]
- [data type 2]
**When Data is Sent:** Data is sent to [service] only when:
- A user actively [action 1]
- [condition 2]
**Privacy & Terms:**
- Privacy Policy: https://example.com/privacy
- Terms of Use: https://example.com/terms
- API Data Usage: https://example.com/api-usage
This is critical! Failure to document external services will result in rejection.
2. Ownership Verification
If your plugin name or author URI references a domain that doesn’t match your WordPress.org email, you must:
Use an email from that domain, OR
Add a public declaration on the website
Example footer text:
"[Plugin Name] is owned and operated by [Your Name]"
Add this to the footer or a dedicated page on both domains mentioned in your plugin.
3. No Inline Scripts or Styles
All JavaScript and CSS must be in separate files and properly enqueued. No exceptions.
Create an SVN password (different from your login password)
Checkout Repository
svn co https://plugins.svn.wordpress.org/your-plugin/ your-plugin-svn
Publishing Your Plugin
# 1. Copy files to trunk
cp -r /path/to/your-plugin/* your-plugin-svn/trunk/
# 2. Add files to SVN
cd your-plugin-svn
svn add trunk/* --force
# 3. Commit to trunk
svn ci -m "Initial release v1.0.0" --username yourusername
# 4. Create a tag
svn cp trunk tags/1.0.0
# 5. Commit the tag
svn ci -m "Tagging version 1.0.0" --username yourusername
Updating Your Plugin
# 1. Update trunk
cp -r /path/to/updated-files/* your-plugin-svn/trunk/
# 2. Commit changes
svn ci -m "Update to version 1.0.1" --username yourusername
# 3. Create new tag
svn cp trunk tags/1.0.1
# 4. Commit the tag
svn ci -m "Tagging version 1.0.1" --username yourusername
Adding Assets
# Add banner and icon to assets folder
cp banner-1544x500.png your-plugin-svn/assets/
cp icon-256x256.png your-plugin-svn/assets/
cd your-plugin-svn/assets
svn add *.png
svn ci -m "Add plugin assets" --username yourusername
Creating a WordPress plugin that gets approved requires:
Clean, secure code following WordPress standards
Proper documentation of all features and external services
Thorough testing before submission
Quick response to review feedback
Ongoing maintenance after approval
The review process exists to protect WordPress users and maintain quality standards. By following these guidelines, you’ll save time and increase your chances of quick approval.
Remember: The WordPress.org Plugin Directory serves millions of users. Your plugin represents not just your work, but the quality of the entire ecosystem.
Good luck with your plugin development! 🚀
Quick Reference Commands
SVN Commands
# Checkout
svn co https://plugins.svn.wordpress.org/your-plugin/
# Add files
svn add file.php
# Commit
svn ci -m "Commit message" --username yourusername
# Create tag
svn cp trunk tags/1.0.0
# Update
svn up
# Status
svn status
# Diff
svn diff
If your CLI commands in Windsurf stop executing or hang on macOS, the issue is usually caused by zsh conflicts inside Cascade. The quick fix is to make Windsurf use bash instead of zsh for its internal terminal.
MCP (Model Context Protocol) is a way to give AI assistants access to your application’s data and functionality. Think of it as creating “tools” that an AI can use to help users.
Simple Analogy:
Your app is like a toolbox 🧰
MCP tools are like individual tools (hammer, screwdriver, etc.)
The AI is like a smart assistant that knows which tool to use
🎯 What We’ll Build
A simple “Hello World” MCP server that:
Has a basic API endpoint
Provides data to an AI assistant
Lets the AI answer questions about your app
Time to complete: 15 minutes ⏱️
📋 Prerequisites
You need:
✅ Node.js 18+ installed
✅ Basic knowledge of JavaScript/TypeScript
✅ A text editor (VS Code recommended)
✅ Terminal/Command line access
🏗️ Step-by-Step Tutorial
Step 1: Create a New Next.js Project
# Create a new Next.js app
npx create-next-app@latest my-mcp-app
# When prompted, choose:# ✅ TypeScript: Yes# ✅ ESLint: Yes# ✅ Tailwind CSS: Yes# ✅ src/ directory: No# ✅ App Router: Yes (IMPORTANT!)# ✅ Turbopack: No# ✅ Import alias: Yes (@/*)# Navigate to the project
cd my-mcp-app
What just happened?
Created a new Next.js project with App Router
App Router is needed for MCP (uses /app directory)
# Create the API directory structure
mkdir -p app/api/hello
Create app/api/hello/route.ts:
// app/api/hello/route.ts
import { NextResponse } from 'next/server';
export async function GET() {
// This is your data that the AI can access
const data = {
message: "Hello from MCP!",
timestamp: new Date().toISOString(),
tips: [
"MCP lets AI access your app data",
"You can create multiple endpoints",
"AI can call these endpoints automatically"
]
};
return NextResponse.json(data);
}
What does this do?
Creates an API endpoint at /api/hello
Returns JSON data that includes a message and tips
This data will be available to the AI
Step 4: Test Your Endpoint
Start the development server:
npm run dev
Open your browser and visit:
http://localhost:3000/api/hello
You should see:
{
"message": "Hello from MCP!",
"timestamp": "2025-10-25T00:00:00.000Z",
"tips": [
"MCP lets AI access your app data",
"You can create multiple endpoints",
"AI can call these endpoints automatically"
]
}
✅ Success! Your first MCP endpoint is working!
Step 5: Install AI SDK
Now let’s add AI capabilities:
# Install Vercel AI SDK and OpenAI
npm install ai @ai-sdk/openai zod
What are these packages?
ai – Vercel AI SDK for building AI apps
@ai-sdk/openai – OpenAI integration
zod – Schema validation (for tool parameters)
Step 6: Create MCP Tools
Create a file to define your MCP tools:
# Create lib directory
mkdir -p lib
Create lib/mcp-tools.ts:
// lib/mcp-tools.ts
import { tool } from 'ai';
import { z } from 'zod';
// Define your MCP tools
export const mcpTools = {
// Tool 1: Get hello message
get_hello_message: tool({
description: 'Get a hello message from the server',
parameters: z.object({}), // No parameters needed
execute: async () => {
// Fetch data from your API
const response = await fetch('http://localhost:3000/api/hello');
const data = await response.json();
return data;
},
}),
// Tool 2: Get current time
get_current_time: tool({
description: 'Get the current server time',
parameters: z.object({}),
execute: async () => {
return {
time: new Date().toLocaleTimeString(),
date: new Date().toLocaleDateString(),
timezone: Intl.DateTimeFormat().resolvedOptions().timeZone,
};
},
}),
};
What does this do?
Defines 2 tools that the AI can use
get_hello_message – Fetches data from your API
get_current_time – Returns current time
Each tool has a description (tells AI what it does)
Each tool has parameters (inputs the AI can provide)
Each tool has an execute function (what it actually does)
Step 7: Create AI Chat Endpoint
Create app/api/chat/route.ts:
// app/api/chat/route.ts
import { openai } from '@ai-sdk/openai';
import { streamText } from 'ai';
import { mcpTools } from '@/lib/mcp-tools';
export async function POST(request: Request) {
// Get the user's message
const { messages } = await request.json();
// Call OpenAI with MCP tools
const result = await streamText({
model: openai('gpt-4o-mini'),
messages,
tools: mcpTools, // Give AI access to your tools
maxSteps: 5, // Allow AI to use multiple tools
});
// Stream the response back to the user
return result.toDataStreamResponse();
}
The hello message is "Hello from MCP!"
Here are some tips:
- MCP lets AI access your app data
- You can create multiple endpoints
- AI can call these endpoints automatically
🔧 Used tools: get_hello_message
Question 2:
What time is it?
AI Response:
The current time is 12:34:56 PM
Date: October 25, 2025
Timezone: America/New_York
🔧 Used tools: get_current_time
🎉 Congratulations! Your MCP server is working!
🎨 Visual Flow
/api/helloMCP ToolsOpenAI/api/chatChat UIUser/api/helloMCP ToolsOpenAI/api/chatChat UIUser"What's the hello message?"POST /api/chatSend message + toolsDecide to use get_hello_messageCall get_hello_message()GET /api/hello{message, tips}Return dataGenerate responseStream responseServer-Sent EventsShow formatted response
User: "Who are the users and how old are they?"
AI: (uses get_users tool automatically)
→ Calls /api/users
→ Gets data
→ Processes it
→ Responds naturally
AI: "There are 2 users:
- Alice is 30 years old
- Bob is 25 years old"
Key Differences Table
Aspect
API Endpoint
MCP Tool
What it is
HTTP route
Function wrapper
Who calls it
Anyone (HTTP)
Only AI
How to call
fetch(), curl
AI decides automatically
Response
Raw JSON
Processed by AI
Purpose
Serve data
Give AI capabilities
Reusable
Yes, by anyone
Only by AI
Why Do You Need BOTH?
API Endpoint:
✅ The actual data source
✅ Can be used by other parts of your app
✅ Can be tested independently
✅ Follows REST conventions
✅ Can be cached and secured
MCP Tool:
✅ Tells AI what the endpoint does
✅ Provides context (description)
✅ Defines parameters AI can use
✅ Allows AI to use it intelligently
✅ Makes your app AI-powered
Restaurant Analogy
API Endpoint = Kitchen
Has the actual food (data)
Anyone can order from it
Returns raw dishes
MCP Tool = Waiter
Knows what’s available (description)
Takes your order (parameters)
Gets food from kitchen (calls API)
Serves it nicely (formats response)
AI = Smart Waiter
Understands what you want
Knows which dishes to recommend
Can combine multiple dishes
Explains the menu in your language
The Complete Flow
Your DataAPI EndpointMCP ToolOpenAIUserYour DataAPI EndpointMCP ToolOpenAIUserAI reads tool descriptionsDecides to use get_hello_messageAI processes & formats"What's the hello message?"execute()GET /api/helloFetch dataReturn dataJSON responseStructured data"The hello message is..."
Can You Skip the API Endpoint?
Short answer: Yes, but not recommended!
// ❌ MCP Tool without API (works but not ideal)
get_data: tool({
execute: async () => {
// Directly query database
const data = await db.query('SELECT * FROM users');
return data;
},
})
// ✅ Better: MCP Tool + API (recommended)
get_data: tool({
execute: async () => {
// Call your API
const res = await fetch('http://localhost:3000/api/users');
return await res.json();
},
})
Why separate API is better:
✅ Reusable by other parts of your app
✅ Can be tested independently
✅ Can be called by non-AI clients
✅ Easier to maintain and debug
✅ Can add authentication/caching
When to Use What
Use Just API Endpoint When:
Building a regular web app
Need data for your frontend
Other services need to call it
No AI involved
Use API + MCP Tool When:
Want AI to access the data
Building AI chat features
Need intelligent data retrieval
Want natural language interface
Summary: The Relationship
API Endpoint (data source)
↓
MCP Tool (AI access layer)
↓
OpenAI (intelligence)
↓
User (natural language)
Think of it this way:
API Endpoint = What you have (data)
MCP Tool = How AI accesses it (wrapper)
Together = AI-powered application 🚀
📚 Key Takeaways
MCP = Tools for AI
You create tools (functions)
AI decides when to use them
AI can use multiple tools to answer questions
Three Main Parts:
API endpoints (your data)
MCP tools (how AI accesses data)
Chat endpoint (connects AI to tools)
Simple Pattern:User asks question → AI reads question → AI calls your MCP tools → Tools fetch data → AI generates answer → User sees response
You’ve built your first MCP server! You now understand:
✅ What MCP is and why it’s useful ✅ How to create API endpoints ✅ How to define MCP tools ✅ How to connect AI to your tools ✅ How to build a chat interface
Next Challenge: Try building a tool that:
Reads data from a database
Calls an external API
Performs calculations
Analyzes files
💡 Pro Tips
Start Simple
Begin with one tool
Add more as you learn
Test each tool individually
Good Tool Descriptions// ❌ Bad description: 'Gets data' // ✅ Good description: 'Get user profile information including name, email, and registration date'
Use Parameters// Let AI provide inputs parameters: z.object({ userId: z.number().describe('The ID of the user to fetch'), includeOrders: z.boolean().describe('Whether to include order history'), })
NAME CPU(cores) MEMORY(bytes)
app-xxxxxxxxxx-xxxxx 1500m 587Mi ← CPU at limit!
app-xxxxxxxxxx-xxxxx 1m 524Mi ← Crashed pod
app-xxxxxxxxxx-xxxxx 1m 526Mi ← Crashed pod
Problem: PHP pods crashing due to worker exhaustion and short health check timeouts.
Solution: Increased PHP-FPM workers from 5 to 20 and health check timeout from 10s to 30s.
Result: All 22 pods stable, no crashes, 4x capacity increase.
Key Learning: Always calculate worker count based on available memory, not arbitrary numbers. The formula (RAM - Base) / 40MB ensures you stay within limits while maximizing capacity.
Author’s Note: This solution was implemented on a production Kubernetes cluster running a PHP application. The fix eliminated all CrashLoopBackOff issues and improved stability from 45% to 100% pod availability.
📊 Current Production Metrics Analysis
Actual Memory Usage (Right Now):
Average Memory per Pod: 665Mi Memory Range: 544Mi - 616Mi Memory Limit: 1024Mi (1Gi) Utilization: 65% average (53-60% range)
Current Configuration:
Pods: 22 total Workers per pod: 20 CPU Limit: 1500m Average CPU Usage: 935m (62% utilization) Restarts: 0-4 (minimal, stable)
🔍 Is 20 Workers Optimal? Let’s Calculate:
Method 1: Reverse Engineering from Memory Usage
Current Memory Usage: 665Mi average Memory Limit: 1024Mi
⚠️ 30 Workers (Too Aggressive): - Memory: 83% (risky, little headroom) - CPU: Would need more (1500m not enough) - Risk: Traffic spikes could OOMKill - Not recommended
❌ 15 Workers (Too Conservative): - Memory: 54% (underutilized) - Would crash under current 263+ concurrent requests - Wasted resources
🎯 Final Recommendation: KEEP 20 WORKERS
Reasons:
Memory Utilization: 65% – Perfect balance (not too high, not too low)
CPU Utilization: 62% – Efficient, room to grow
35% Headroom – Enough buffer for traffic spikes
Stable: 0-4 restarts – Proves it’s working well
Actual per-worker memory: 18-20MB – More efficient than estimated
Could We Optimize Further?
Option A: Increase to 25 workers (Moderate)
yamlpm.max_children = 25
Memory would be: ~750Mi (73%)
Headroom: ~274Mi (27%)
Risk: Medium (less buffer for spikes)
Benefit: 25% more capacity
Verdict: Only if you see consistent high load
Option B: Keep 20 workers (Recommended)
yamlpm.max_children = 20 # Current
Memory: 665Mi (65%) ✅
Headroom: 358Mi (35%) ✅
Risk: Low ✅
Stability: Proven ✅
Verdict: OPTIMAL – Don’t change!
📈 When to Reconsider:
Monitor these metrics and adjust if:
bash# Check if workers are maxed out kubectl exec <pod> -c php-php -- curl -s http://localhost:9000/fpm-status
from google import genai
from google.genai import types
from google.genai.types import Content, Part
from playwright.sync_api import sync_playwright
import time
# Initialize the Gemini client
client = genai.Client()
# Screen dimensions
SCREEN_WIDTH = 1440
SCREEN_HEIGHT = 900
def denormalize_x(x: int, screen_width: int) -> int:
"""Convert normalized x coordinate (0-1000) to actual pixel coordinate."""
return int(x / 1000 * screen_width)
def denormalize_y(y: int, screen_height: int) -> int:
"""Convert normalized y coordinate (0-1000) to actual pixel coordinate."""
return int(y / 1000 * screen_height)
def execute_function_calls(candidate, page, screen_width, screen_height):
"""Execute the actions suggested by the model."""
results = []
function_calls = []
for part in candidate.content.parts:
if part.function_call:
function_calls.append(part.function_call)
for function_call in function_calls:
action_result = {}
fname = function_call.name
args = function_call.args
print(f" -> Executing: {fname}")
try:
if fname == "open_web_browser":
pass # Already open
elif fname == "click_at":
actual_x = denormalize_x(args["x"], screen_width)
actual_y = denormalize_y(args["y"], screen_height)
page.mouse.click(actual_x, actual_y)
elif fname == "type_text_at":
actual_x = denormalize_x(args["x"], screen_width)
actual_y = denormalize_y(args["y"], screen_height)
text = args["text"]
press_enter = args.get("press_enter", False)
page.mouse.click(actual_x, actual_y)
page.keyboard.press("Meta+A")
page.keyboard.press("Backspace")
page.keyboard.type(text)
if press_enter:
page.keyboard.press("Enter")
page.wait_for_load_state(timeout=5000)
time.sleep(1)
except Exception as e:
print(f"Error executing {fname}: {e}")
action_result = {"error": str(e)}
results.append((fname, action_result))
return results
def get_function_responses(page, results):
"""Capture screenshot and URL after actions."""
screenshot_bytes = page.screenshot(type="png")
current_url = page.url
function_responses = []
for name, result in results:
response_data = {"url": current_url}
response_data.update(result)
function_responses.append(
types.FunctionResponse(
name=name,
response=response_data,
parts=[types.FunctionResponsePart(
inline_data=types.FunctionResponseBlob(
mime_type="image/png",
data=screenshot_bytes))
]
)
)
return function_responses
# Main program
print("Initialising browser...")
playwright = sync_playwright().start()
browser = playwright.chromium.launch(headless=False)
context = browser.new_context(viewport={"width": SCREEN_WIDTH, "height": SCREEN_HEIGHT})
page = context.new_page()
try:
# Go to initial page
page.goto("https://tinyurl.com/pet-care-signup")
# Configure the model with Computer Use tool
config = types.GenerateContentConfig(
tools=[types.Tool(computer_use=types.ComputerUse(
environment=types.Environment.ENVIRONMENT_BROWSER
))],
)
# Take initial screenshot
initial_screenshot = page.screenshot(type="png")
USER_PROMPT = """
From https://tinyurl.com/pet-care-signup,
get all details for any pet with a California residency.
Output all the information you find in a clear, readable format.
"""
print(f"Goal: {USER_PROMPT}")
contents = [
Content(role="user", parts=[
Part(text=USER_PROMPT),
Part.from_bytes(data=initial_screenshot, mime_type='image/png')
])
]
# Agent loop - maximum 5 turns
for i in range(5):
print(f"\n--- Turn {i+1} ---")
print("Thinking...")
response = client.models.generate_content(
model='gemini-2.5-computer-use-preview-10-2025',
contents=contents,
config=config,
)
candidate = response.candidates[0]
contents.append(candidate.content)
# Check if there are function calls to execute
has_function_calls = any(part.function_call for part in candidate.content.parts)
if not has_function_calls:
text_response = " ".join([part.text for part in candidate.content.parts if part.text])
print("Agent finished:", text_response)
break
print("Executing actions...")
results = execute_function_calls(candidate, page, SCREEN_WIDTH, SCREEN_HEIGHT)
print("Capturing state...")
function_responses = get_function_responses(page, results)
contents.append(
Content(role="user", parts=[Part(function_response=fr) for fr in function_responses])
)
finally:
print("\nClosing browser...")
browser.close()
playwright.stop()
print("Done!")
from google import genai
from google.genai import types
from google.genai.types import Content, Part
from playwright.sync_api import sync_playwright
import time
# Initialize the Gemini client
client = genai.Client()
# Screen dimensions
SCREEN_WIDTH = 1440
SCREEN_HEIGHT = 900
def denormalize_x(x: int, screen_width: int) -> int:
"""Convert normalized x coordinate (0-1000) to actual pixel coordinate."""
return int(x / 1000 * screen_width)
def denormalize_y(y: int, screen_height: int) -> int:
"""Convert normalized y coordinate (0-1000) to actual pixel coordinate."""
return int(y / 1000 * screen_height)
def execute_function_calls(candidate, page, screen_width, screen_height):
"""Execute the actions suggested by the model."""
results = []
function_calls = []
for part in candidate.content.parts:
if part.function_call:
function_calls.append(part.function_call)
for function_call in function_calls:
action_result = {}
fname = function_call.name
args = function_call.args
print(f" -> Executing: {fname}")
try:
if fname == "open_web_browser":
pass # Already open
elif fname == "click_at":
actual_x = denormalize_x(args["x"], screen_width)
actual_y = denormalize_y(args["y"], screen_height)
page.mouse.click(actual_x, actual_y)
elif fname == "type_text_at":
actual_x = denormalize_x(args["x"], screen_width)
actual_y = denormalize_y(args["y"], screen_height)
text = args["text"]
press_enter = args.get("press_enter", False)
page.mouse.click(actual_x, actual_y)
page.keyboard.press("Meta+A")
page.keyboard.press("Backspace")
page.keyboard.type(text)
if press_enter:
page.keyboard.press("Enter")
elif fname == "drag_and_drop":
start_x = denormalize_x(args["x"], screen_width)
start_y = denormalize_y(args["y"], screen_height)
dest_x = denormalize_x(args["destination_x"], screen_width)
dest_y = denormalize_y(args["destination_y"], screen_height)
# Perform drag and drop
page.mouse.move(start_x, start_y)
page.mouse.down()
page.mouse.move(dest_x, dest_y)
page.mouse.up()
page.wait_for_load_state(timeout=5000)
time.sleep(1)
except Exception as e:
print(f"Error executing {fname}: {e}")
action_result = {"error": str(e)}
results.append((fname, action_result))
return results
def get_function_responses(page, results):
"""Capture screenshot and URL after actions."""
screenshot_bytes = page.screenshot(type="png")
current_url = page.url
function_responses = []
for name, result in results:
response_data = {"url": current_url}
response_data.update(result)
function_responses.append(
types.FunctionResponse(
name=name,
response=response_data,
parts=[types.FunctionResponsePart(
inline_data=types.FunctionResponseBlob(
mime_type="image/png",
data=screenshot_bytes))
]
)
)
return function_responses
# Main program
print("Initialising browser...")
playwright = sync_playwright().start()
browser = playwright.chromium.launch(headless=False)
context = browser.new_context(viewport={"width": SCREEN_WIDTH, "height": SCREEN_HEIGHT})
page = context.new_page()
try:
# Go to initial page
page.goto("https://sticky-note-jam.web.app")
# Configure the model with Computer Use tool
config = types.GenerateContentConfig(
tools=[types.Tool(computer_use=types.ComputerUse(
environment=types.Environment.ENVIRONMENT_BROWSER
))],
)
# Take initial screenshot
initial_screenshot = page.screenshot(type="png")
USER_PROMPT = """
My art club brainstormed tasks ahead of our fair.
The board is chaotic and I need your help organising the tasks into some categories I created.
Go to sticky-note-jam.web.app and
ensure notes are clearly in the right sections.
Drag them there if not.
In your output, describe what the initial stage looked like and
what the final stage looks like after organisation.
"""
print(f"Goal: {USER_PROMPT}")
contents = [
Content(role="user", parts=[
Part(text=USER_PROMPT),
Part.from_bytes(data=initial_screenshot, mime_type='image/png')
])
]
# Agent loop - maximum 10 turns (more turns for drag operations)
for i in range(10):
print(f"\n--- Turn {i+1} ---")
print("Thinking...")
response = client.models.generate_content(
model='gemini-2.5-computer-use-preview-10-2025',
contents=contents,
config=config,
)
candidate = response.candidates[0]
contents.append(candidate.content)
# Check if there are function calls to execute
has_function_calls = any(part.function_call for part in candidate.content.parts)
if not has_function_calls:
text_response = " ".join([part.text for part in candidate.content.parts if part.text])
print("Agent finished:", text_response)
break
print("Executing actions...")
results = execute_function_calls(candidate, page, SCREEN_WIDTH, SCREEN_HEIGHT)
print("Capturing state...")
function_responses = get_function_responses(page, results)
contents.append(
Content(role="user", parts=[Part(function_response=fr) for fr in function_responses])
)
finally:
print("\nClosing browser...")
browser.close()
playwright.stop()
print("Done!")
# Install Claude Code
npm install -g @anthropic-ai/claude-code
# Install Claude Agents SDK
pip install claude-agent-sdk
# Set API Key
export ANTHROPIC_API_KEY=your_api_key_here
Basic
import asyncio
from claude_agent_sdk import query
async def main():
async for message in query(prompt="Hello, how are you?"):
print(message)
asyncio.run(main())
Inbuild Tools
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions
from rich import print
async def main():
options = ClaudeAgentOptions(
allowed_tools=["Read", "Write"],
permission_mode="acceptEdits"
)
async for msg in query(
prompt="Create a file called greeting.txt with 'Hello Mervin Praison!'",
options=options
):
print(msg)
asyncio.run(main())
Custom Tools
import asyncio
from typing import Any
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, tool, create_sdk_mcp_server
from rich import print
@tool("greet", "Greet a user", {"name": str})
async def greet(args: dict[str, Any]) -> dict[str, Any]:
return {
"content": [{
"type": "text",
"text": f"Hello, {args['name']}!"
}]
}
server = create_sdk_mcp_server(
name="my-tools",
version="1.0.0",
tools=[greet]
)
async def main():
options = ClaudeAgentOptions(
mcp_servers={"tools": server},
allowed_tools=["mcp__tools__greet"]
)
async with ClaudeSDKClient(options=options) as client:
await client.query("Greet Mervin Praison")
async for msg in client.receive_response():
print(msg)
asyncio.run(main())
Claude Agent Options
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions
from rich import print
async def main():
options = ClaudeAgentOptions(
system_prompt="You are an expert Python developer",
permission_mode='acceptEdits',
cwd="/Users/praison/cc"
)
async for message in query(
prompt="Create a Python web server in my current directory",
options=options
):
print(message)
asyncio.run(main())
from google import genai
from PIL import Image
from io import BytesIO
client = genai.Client()
prompt = "Add a Cap to the person's head"
image = Image.open('mervinpraison.jpeg')
response = client.models.generate_content(
model="gemini-2.5-flash-image-preview",
contents=[prompt, image],
)
for part in response.candidates[0].content.parts:
if part.text is not None:
print(part.text)
elif part.inline_data is not None:
image = Image.open(BytesIO(part.inline_data.data))
image.save("generated_image.png")
print("Generated image saved as 'generated_image.png'")
UI.py
import gradio as gr
from google import genai
from PIL import Image
from io import BytesIO
client = genai.Client()
def edit_image(image, prompt):
response = client.models.generate_content(
model="gemini-2.5-flash-image-preview",
contents=[prompt, image],
)
if response.candidates[0].finish_reason.name == 'PROHIBITED_CONTENT':
return None, "Content blocked by safety filters"
elif response.candidates[0].content is None:
return None, f"No content generated: {response.candidates[0].finish_reason.name}"
for part in response.candidates[0].content.parts:
if part.inline_data is not None:
return Image.open(BytesIO(part.inline_data.data)), "Image generated successfully"
return None, "No image found in response"
iface = gr.Interface(
fn=edit_image,
inputs=[
gr.Image(type="pil", label="Upload Image"),
gr.Textbox(label="Edit Prompt", value="Add a Cap to the person's head")
],
outputs=[
gr.Image(label="Edited Image"),
gr.Textbox(label="Status")
],
title="Image Editor"
)
iface.launch()
from poml import poml
import requests, json
from rich import print
# 1) Load and render POML file
messages = poml("financial_analysis.poml", chat=True)
# 2) Combine messages into a single prompt
full_prompt = "\n".join(
["\n".join(str(c).strip() for c in m["content"]) if isinstance(m.get("content"), list) else str(m["content"]).strip()
for m in messages if m.get("content")]
)
print("\n--- Full Prompt ---\n")
print(full_prompt)
# 3) Call Ollama Model
resp = requests.post(
"http://localhost:11434/api/generate",
json={"model": "qwen2.5vl:latest", "prompt": full_prompt, "stream": False},
)
data = resp.json()
print("\n--- Model Response ---\n")
print(data.get("response") or json.dumps(data, indent=2))