Python SDK

v0.11.75 · Python 3.10+

PyPI · GitHub · Changelog

pip install dakera              # core (sync client)
pip install dakera[async]       # include async client
pip show dakera                 # verify installed version

Client setup

Point the client at your Dakera server. Use env vars to keep the URL and key out of source code:

import os
from dakera import DakeraClient

# Recommended: configure via env vars
# export DAKERA_URL=http://<YOUR_SERVER_IP>:3300
# export DAKERA_API_KEY=your-api-key

client = DakeraClient(
    base_url=os.getenv("DAKERA_URL", "http://localhost:3300"),
    api_key=os.getenv("DAKERA_API_KEY")
)

Memory operations

client.memories.store(agent_id="my-agent", content="User is learning Rust", importance=0.8, tags=["learning"])
memories = client.memories.recall(agent_id="my-agent", query="programming languages", top_k=10)
memories = client.memories.batch_recall(agent_id="my-agent", tags=["learning"], min_importance=0.6)
client.memories.forget(memory_id="mem-abc123")
client.memories.set_importance(memory_id="mem-abc123", importance=0.9)
memory = client.memories.get(memory_id="mem-abc123")

Sessions

session = client.sessions.start(agent_id="my-agent", metadata={"task": "review"})
client.memories.store(agent_id="my-agent", content="Found 2 issues", session_id=session.id)
client.sessions.end(session_id=session.id, summary="Review complete")
sessions = client.sessions.list(agent_id="my-agent")

Vector operations

# Upsert a text document (auto-embedded server-side)
client.vectors.upsert_text(namespace="docs", documents=[
    {"id": "doc-001", "text": "Quarterly revenue report Q1 2026", "metadata": {"source": "finance"}}
])

# Query by natural language
results = client.vectors.query_text(namespace="docs", text="revenue trends", top_k=5)

# Hybrid search (vector + BM25)
results = client.vectors.hybrid_search(namespace="docs", text="revenue", top_k=10, vector_weight=0.7)

# Full-text search (BM25 only)
results = client.vectors.fulltext_search(namespace="docs", query="quarterly report", top_k=5)

Knowledge graph

# Build knowledge graph from a seed memory
graph = client.knowledge.graph(agent_id="my-agent", memory_id="mem-abc", depth=2)

# Find path between two memories
path = client.knowledge.path(agent_id="my-agent", from_id="mem-a", to_id="mem-b")

# Cross-agent knowledge network
network = client.knowledge.cross_agent_network(agent_ids=["agent-a", "agent-b"])

# Deduplicate memories (preview first)
dupes = client.knowledge.deduplicate(agent_id="my-agent", threshold=0.93, dry_run=True)

Namespace & API key management

# Create a namespace
client.namespaces.create(name="production", dimension=1024, distance="cosine")

# Create a scoped API key
key = client.namespaces.create_key(namespace="production", name="reader", scope="read", expires_in_days=90)

# Set memory lifecycle policy
client.namespaces.set_memory_policy(namespace="production", episodic_ttl_seconds=604800)

Entity extraction

# Extract entities from text
entities = client.extract_entities(text="Alice met Bob at Google HQ on Tuesday")

# Configure namespace entity extraction
client.namespaces.configure_ner(namespace="chat", extract_entities=True,
    entity_types=["person", "organization", "location"])

Import & export

# Export all memories
data = client.export_memories(agent_id="my-agent", format="jsonl")

# Import from Mem0 format
client.import_memories(agent_id="my-agent", data=mem0_json, format="mem0")

Feedback loop

# Upvote a useful memory
client.memories.feedback(memory_id="mem-abc", signal="upvote")

# Get feedback summary for an agent
summary = client.agents.feedback_summary(agent_id="my-agent")

Batch operations

# Batch forget — remove memories matching filters
resp = client.batch_forget(agent_id="my-agent", tags=["obsolete"], min_importance=0.0, max_importance=0.3)
print(f"Deleted {resp.deleted_count} memories")

# Batch query text — multi-query in one call
results = client.batch_query_text(namespace="docs", queries=[
    {"text": "revenue report", "top_k": 5},
    {"text": "product roadmap", "top_k": 3},
])

Autopilot & decay (admin)

# AutoPilot status
status = client.autopilot_status()

# Update AutoPilot config
client.autopilot_update_config(enabled=True, dedup_threshold=0.93, dedup_interval_hours=6)

# Manually trigger a dedup or consolidation cycle
client.autopilot_trigger(action="dedup")

# Decay engine config
decay = client.decay_config()
client.decay_update_config(half_life_days=30, min_importance=0.01)

Health probes

# Readiness probe (dependencies healthy)
ready = client.health_ready()

# Liveness probe (process alive)
live = client.health_live()

Vector bulk operations

# Bulk update vectors by ID
client.vectors.bulk_update(namespace="docs", updates=[
    {"id": "doc-001", "metadata": {"reviewed": True}},
    {"id": "doc-002", "metadata": {"reviewed": True}},
])

# Bulk delete vectors by ID
client.vectors.bulk_delete(namespace="docs", ids=["doc-003", "doc-004"])

# Count vectors in a namespace
count = client.vectors.count(namespace="docs")

Full-text index management

# Get full-text index stats
stats = client.vectors.fulltext_stats(namespace="docs")
print(f"Documents: {stats.document_count}, Terms: {stats.term_count}")

# Delete a full-text document by ID
client.vectors.fulltext_delete(namespace="docs", document_id="doc-001")

TTL & storage

# TTL engine stats
ttl = client.ttl_stats()
print(f"Expired: {ttl.expired_count}, Pending: {ttl.pending_count}")

# Storage tier overview
tiers = client.storage_tier_overview()

# Memory type stats (episodic, semantic, procedural, working)
type_stats = client.memory_type_stats()

Agent consolidation

# Trigger consolidation for an agent
client.agents.consolidate(agent_id="my-agent")

# Get consolidation log
log = client.agents.consolidation_log(agent_id="my-agent")

# Patch consolidation config
client.agents.patch_consolidation_config(agent_id="my-agent", threshold=0.9, max_batch=100)

Namespace config

# Get entity extraction config for a namespace
config = client.namespaces.get_entity_config(namespace="production")

# Get namespace extractor settings
extractor = client.namespaces.get_extractor(namespace="production")

# Migrate namespace dimensions (e.g. 768 → 1024)
client.namespaces.migrate_dimensions(namespace="production", target_dimension=1024)

Admin & ops

# Cluster status
cluster = client.cluster_status()

# Maintenance mode
client.admin_enable_maintenance(reason="scheduled upgrade")
client.admin_disable_maintenance()

# Quota management
quotas = client.get_quotas()
client.update_quotas({"max_memories_per_agent": 100000})

# Slow query log
slow = client.slow_queries(limit=20)

# Backups
backup = client.create_backup(include_data=True)
backups = client.list_backups()

# Ops diagnostics
diag = client.ops_diagnostics()

# List background jobs
jobs = client.ops_list_jobs()

# Trigger compaction
client.ops_compact(namespace="production")

# Graceful shutdown
client.ops_shutdown()

SSE streaming

# Stream real-time memory events
for event in client.stream_memory_events(agent_id="my-agent"):
    print(f"{event.type}: {event.data}")

# Stream audit events
for event in client.stream_audit_events(agent_id="my-agent"):
    print(f"{event.type}: {event.data}")

Async client

from dakera import AsyncDakeraClient

async with AsyncDakeraClient(base_url="http://localhost:3300", api_key="key") as client:
    await client.memories.store(agent_id="my-agent", content="async memory")
    memories = await client.memories.recall(agent_id="my-agent", query="async")

Error handling

from dakera.exceptions import DakeraError, NotFoundError, RateLimitError, AuthorizationError

try:
    memory = client.memories.get(memory_id="mem-abc")
except NotFoundError:
    print("Memory not found")
except RateLimitError as e:
    print(f"Rate limited — retry after {e.retry_after}s")
except DakeraError as e:
    print(f"Error {e.error_code}: {e.message}")

Error types: DakeraError (base), ConnectionError, NotFoundError, ValidationError, RateLimitError, ServerError, AuthenticationError, AuthorizationError, TimeoutError.

Connection configuration

from dakera import DakeraClient, RetryConfig

client = DakeraClient(
    base_url="http://localhost:3300",
    api_key="your-key",
    timeout=30,                          # seconds
    retry_config=RetryConfig(
        max_retries=3,
        base_delay=0.5,
        max_delay=10.0,
        jitter=True
    )
)