Python SDK
v0.11.75 · Python 3.10+pip install dakera # core (sync client)
pip install dakera[async] # include async client
pip show dakera # verify installed version
Client setup
Point the client at your Dakera server. Use env vars to keep the URL and key out of source code:
import os
from dakera import DakeraClient
# Recommended: configure via env vars
# export DAKERA_URL=http://<YOUR_SERVER_IP>:3300
# export DAKERA_API_KEY=your-api-key
client = DakeraClient(
base_url=os.getenv("DAKERA_URL", "http://localhost:3300"),
api_key=os.getenv("DAKERA_API_KEY")
)
Memory operations
client.memories.store(agent_id="my-agent", content="User is learning Rust", importance=0.8, tags=["learning"])
memories = client.memories.recall(agent_id="my-agent", query="programming languages", top_k=10)
memories = client.memories.batch_recall(agent_id="my-agent", tags=["learning"], min_importance=0.6)
client.memories.forget(memory_id="mem-abc123")
client.memories.set_importance(memory_id="mem-abc123", importance=0.9)
memory = client.memories.get(memory_id="mem-abc123")
Sessions
session = client.sessions.start(agent_id="my-agent", metadata={"task": "review"})
client.memories.store(agent_id="my-agent", content="Found 2 issues", session_id=session.id)
client.sessions.end(session_id=session.id, summary="Review complete")
sessions = client.sessions.list(agent_id="my-agent")
Vector operations
# Upsert a text document (auto-embedded server-side)
client.vectors.upsert_text(namespace="docs", documents=[
{"id": "doc-001", "text": "Quarterly revenue report Q1 2026", "metadata": {"source": "finance"}}
])
# Query by natural language
results = client.vectors.query_text(namespace="docs", text="revenue trends", top_k=5)
# Hybrid search (vector + BM25)
results = client.vectors.hybrid_search(namespace="docs", text="revenue", top_k=10, vector_weight=0.7)
# Full-text search (BM25 only)
results = client.vectors.fulltext_search(namespace="docs", query="quarterly report", top_k=5)
Knowledge graph
# Build knowledge graph from a seed memory
graph = client.knowledge.graph(agent_id="my-agent", memory_id="mem-abc", depth=2)
# Find path between two memories
path = client.knowledge.path(agent_id="my-agent", from_id="mem-a", to_id="mem-b")
# Cross-agent knowledge network
network = client.knowledge.cross_agent_network(agent_ids=["agent-a", "agent-b"])
# Deduplicate memories (preview first)
dupes = client.knowledge.deduplicate(agent_id="my-agent", threshold=0.93, dry_run=True)
Namespace & API key management
# Create a namespace
client.namespaces.create(name="production", dimension=1024, distance="cosine")
# Create a scoped API key
key = client.namespaces.create_key(namespace="production", name="reader", scope="read", expires_in_days=90)
# Set memory lifecycle policy
client.namespaces.set_memory_policy(namespace="production", episodic_ttl_seconds=604800)
Entity extraction
# Extract entities from text
entities = client.extract_entities(text="Alice met Bob at Google HQ on Tuesday")
# Configure namespace entity extraction
client.namespaces.configure_ner(namespace="chat", extract_entities=True,
entity_types=["person", "organization", "location"])
Import & export
# Export all memories
data = client.export_memories(agent_id="my-agent", format="jsonl")
# Import from Mem0 format
client.import_memories(agent_id="my-agent", data=mem0_json, format="mem0")
Feedback loop
# Upvote a useful memory
client.memories.feedback(memory_id="mem-abc", signal="upvote")
# Get feedback summary for an agent
summary = client.agents.feedback_summary(agent_id="my-agent")
Batch operations
# Batch forget — remove memories matching filters
resp = client.batch_forget(agent_id="my-agent", tags=["obsolete"], min_importance=0.0, max_importance=0.3)
print(f"Deleted {resp.deleted_count} memories")
# Batch query text — multi-query in one call
results = client.batch_query_text(namespace="docs", queries=[
{"text": "revenue report", "top_k": 5},
{"text": "product roadmap", "top_k": 3},
])
Autopilot & decay (admin)
# AutoPilot status
status = client.autopilot_status()
# Update AutoPilot config
client.autopilot_update_config(enabled=True, dedup_threshold=0.93, dedup_interval_hours=6)
# Manually trigger a dedup or consolidation cycle
client.autopilot_trigger(action="dedup")
# Decay engine config
decay = client.decay_config()
client.decay_update_config(half_life_days=30, min_importance=0.01)
Health probes
# Readiness probe (dependencies healthy)
ready = client.health_ready()
# Liveness probe (process alive)
live = client.health_live()
Vector bulk operations
# Bulk update vectors by ID
client.vectors.bulk_update(namespace="docs", updates=[
{"id": "doc-001", "metadata": {"reviewed": True}},
{"id": "doc-002", "metadata": {"reviewed": True}},
])
# Bulk delete vectors by ID
client.vectors.bulk_delete(namespace="docs", ids=["doc-003", "doc-004"])
# Count vectors in a namespace
count = client.vectors.count(namespace="docs")
Full-text index management
# Get full-text index stats
stats = client.vectors.fulltext_stats(namespace="docs")
print(f"Documents: {stats.document_count}, Terms: {stats.term_count}")
# Delete a full-text document by ID
client.vectors.fulltext_delete(namespace="docs", document_id="doc-001")
TTL & storage
# TTL engine stats
ttl = client.ttl_stats()
print(f"Expired: {ttl.expired_count}, Pending: {ttl.pending_count}")
# Storage tier overview
tiers = client.storage_tier_overview()
# Memory type stats (episodic, semantic, procedural, working)
type_stats = client.memory_type_stats()
Agent consolidation
# Trigger consolidation for an agent
client.agents.consolidate(agent_id="my-agent")
# Get consolidation log
log = client.agents.consolidation_log(agent_id="my-agent")
# Patch consolidation config
client.agents.patch_consolidation_config(agent_id="my-agent", threshold=0.9, max_batch=100)
Namespace config
# Get entity extraction config for a namespace
config = client.namespaces.get_entity_config(namespace="production")
# Get namespace extractor settings
extractor = client.namespaces.get_extractor(namespace="production")
# Migrate namespace dimensions (e.g. 768 → 1024)
client.namespaces.migrate_dimensions(namespace="production", target_dimension=1024)
Admin & ops
# Cluster status
cluster = client.cluster_status()
# Maintenance mode
client.admin_enable_maintenance(reason="scheduled upgrade")
client.admin_disable_maintenance()
# Quota management
quotas = client.get_quotas()
client.update_quotas({"max_memories_per_agent": 100000})
# Slow query log
slow = client.slow_queries(limit=20)
# Backups
backup = client.create_backup(include_data=True)
backups = client.list_backups()
# Ops diagnostics
diag = client.ops_diagnostics()
# List background jobs
jobs = client.ops_list_jobs()
# Trigger compaction
client.ops_compact(namespace="production")
# Graceful shutdown
client.ops_shutdown()
SSE streaming
# Stream real-time memory events
for event in client.stream_memory_events(agent_id="my-agent"):
print(f"{event.type}: {event.data}")
# Stream audit events
for event in client.stream_audit_events(agent_id="my-agent"):
print(f"{event.type}: {event.data}")
Async client
from dakera import AsyncDakeraClient
async with AsyncDakeraClient(base_url="http://localhost:3300", api_key="key") as client:
await client.memories.store(agent_id="my-agent", content="async memory")
memories = await client.memories.recall(agent_id="my-agent", query="async")
Error handling
from dakera.exceptions import DakeraError, NotFoundError, RateLimitError, AuthorizationError
try:
memory = client.memories.get(memory_id="mem-abc")
except NotFoundError:
print("Memory not found")
except RateLimitError as e:
print(f"Rate limited — retry after {e.retry_after}s")
except DakeraError as e:
print(f"Error {e.error_code}: {e.message}")
Error types: DakeraError (base), ConnectionError, NotFoundError, ValidationError, RateLimitError, ServerError, AuthenticationError, AuthorizationError, TimeoutError.
Connection configuration
from dakera import DakeraClient, RetryConfig
client = DakeraClient(
base_url="http://localhost:3300",
api_key="your-key",
timeout=30, # seconds
retry_config=RetryConfig(
max_retries=3,
base_delay=0.5,
max_delay=10.0,
jitter=True
)
)