Intermediate Operations

Batch Ingestion

~35 min to implement 📦 Requires: Dakera v0.11+

A new agent starts with an empty memory store — it knows nothing. Day-one usefulness requires bootstrapping from existing knowledge: CRM records, support ticket histories, technical documentation, product catalogs. Migrating 50,000 documents one at a time through a for loop is unreliable and slow. This pattern implements a production-grade batch ingestion pipeline with chunking, rate control, resume capability, and progress monitoring.

Start Building Free →
Prerequisites
  • Dakera instance running (quickstart)
  • SDK installed: pip install dakera / npm i @dakera-ai/dakera
  • Source data in JSON, JSONL, CSV, or database export format
  • Estimated document count and average document size to plan rate limits and total runtime

The Problem

Knowledge base migration is consistently underestimated. Three real failure modes teams hit when writing a naive ingestion loop:

  • Rate limit crashes: A loop sending 100 req/s hits Dakera's rate limiter at 20 req/s, generating 503s. No retry logic means thousands of documents are silently skipped. The memory store appears complete but has 60% gap coverage.
  • No resume capability: A 50,000-document import runs for 2 hours, hits a network timeout at record 38,000, and the developer must restart from record 0. Without checkpoint logic, re-ingesting everything creates duplicates and wastes hours.
  • Monolithic chunk size: 10-page PDFs are ingested as single memories. A recall query for a specific section returns the entire document (10,000+ characters), consuming the full context window budget. Chunking to 800 characters per memory dramatically improves recall precision.
  • No deduplication: Running the ingestion script twice (common during testing) creates 100,000 nearly-identical memories from 50,000 documents. Recall quality degrades as duplicates compete for the same context window space.

Architecture

This pattern implements a five-stage pipeline: Load → Chunk → Tag → Ingest (with rate limiting and retry) → Verify. A checkpoint file tracks the last successfully ingested record ID, enabling resume from any point. Progress is logged in real-time. A post-ingestion deduplication pass removes near-duplicate chunks.

BATCH INGESTION PIPELINE Load JSONL files CSV exports DB dumps PDF/Markdown Chunk 800 char max sentence-aware overlap: 100 char 1 doc → N chunks Tag source: filename doc_id: UUID chunk_index: N category: type Ingest 20 concurrent workers rate: 20 req/s retry: exp backoff checkpoint: every 500 progress: real-time log resume on failure Verify count check sample recall dedup sweep ready to serve

Five-stage batch ingestion pipeline: Load source data, Chunk into memory-sized segments, Tag with provenance metadata, Ingest with rate limiting and retry, then Verify completeness and deduplication.

THROUGHPUT VS LATENCY TRADEOFF (50k docs) docs/min 1200 600 200 5 workers 10 workers 20 workers 40 workers 80 workers rate limit zone (diminishing returns) sweet spot: 20 workers error rate Throughput (docs/min) Error rate (relative)

Throughput vs. latency tradeoff: 20 concurrent workers achieves the optimal balance of ~1,200 docs/min with minimal error rate. Beyond 20 workers, rate limiting kicks in and errors increase faster than throughput gains.

Implementation Steps

  • Audit and prepare your source data
    Before writing code, sample 100 records from your source and answer: What is the average document length? Are there duplicates? What metadata is available (author, date, category, source system)? Which documents are highest value and should get higher importance scores? Documents over 800 characters need chunking; those under 200 characters may need merging with adjacent records to create meaningful memory units.
  • Implement sentence-aware chunking with overlap
    Split long documents into chunks of 600–900 characters, breaking only at sentence boundaries (never mid-sentence). Add a 100-character overlap between adjacent chunks so that contextual information at chunk boundaries isn't lost during recall. Assign each chunk a chunk_index and total_chunks in metadata so you can reconstruct document order if needed.
  • Tag every chunk with complete provenance metadata
    Each memory must carry enough metadata to trace it back to its source: source (filename or system name), doc_id (unique document ID from the source system), chunk_index, ingestion_batch (timestamp of this batch run), and category. This metadata enables surgical deletion, batch-specific rollback, and selective re-ingestion when source documents are updated.
  • Run ingestion with worker pool, rate limiting, and retry
    Use 20 concurrent workers (the empirical sweet spot for self-hosted Dakera). Implement a token bucket rate limiter capped at 20 req/s. For failed requests, use exponential backoff: wait 1s, 2s, 4s, 8s before declaring a permanent failure. Log permanent failures to a separate file for manual review — never silently skip.
  • Write checkpoint files for resume capability
    Every 500 successfully ingested records, write the current record index and ingestion timestamp to a checkpoint file. On startup, check for an existing checkpoint and resume from that position. This reduces the blast radius of any network failure to a maximum of 500 records re-ingested (creating duplicates) — which the post-ingestion deduplication pass will remove.
Tip: Run a dry run on 1% sample first

Before ingesting 50,000 documents, run the full pipeline on a 500-document sample. Verify that chunking produces the expected number of chunks, metadata is correctly attached, recall queries return relevant results, and estimated completion time is acceptable. A 5-minute dry run prevents a 4-hour wasted ingestion run due to a configuration error.

Implementation

# Single document store (use parallel requests for batch)
curl -X POST http://localhost:3300/v1/memory/store \
  -H "Authorization: Bearer dk-..." \
  -H "Content-Type: application/json" \
  -d '{
    "agent_id": "knowledge-base",
    "content": "Dakera provides persistent memory for AI agents. It stores, indexes, and retrieves memories using semantic search. Designed for production AI systems requiring cross-session context.",
    "importance": 0.80,
    "memory_type": "semantic",
    "tags": ["source:docs", "category:product", "batch:2024-08-01"],
    "metadata": {
      "source": "product-documentation.md",
      "doc_id": "doc_001",
      "chunk_index": 0,
      "total_chunks": 3,
      "ingestion_batch": "2024-08-01T10:00:00Z",
      "category": "product_docs"
    }
  }'

# Verify ingestion: sample recall after batch completes
curl "http://localhost:3300/v1/memory/recall?agent_id=knowledge-base&query=persistent+memory+AI+agents&top_k=5" \
  -H "Authorization: Bearer dk-..."

# Check ingestion stats (count memories in namespace)
curl "http://localhost:3300/v1/memory/search?agent_id=knowledge-base&query=*&top_k=1" \
  -H "Authorization: Bearer dk-..."
import asyncio
import json
import logging
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterator
import concurrent.futures

from dakera import DakeraClient

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
logger = logging.getLogger(__name__)

client = DakeraClient(base_url="http://localhost:3300", api_key="dk-...")

# Configuration
AGENT_ID = "knowledge-base"
MAX_WORKERS = 20
RATE_LIMIT_RPS = 20        # requests per second
CHUNK_SIZE = 800            # max characters per chunk
CHUNK_OVERLAP = 100         # overlap between consecutive chunks
CHECKPOINT_INTERVAL = 500  # save progress every N records
MAX_RETRIES = 3
BATCH_ID = datetime.now(timezone.utc).isoformat()

@dataclass
class Chunk:
    content: str
    doc_id: str
    source: str
    chunk_index: int
    total_chunks: int
    category: str
    importance: float = 0.80

def chunk_document(text: str, doc_id: str, source: str, category: str) -> list[Chunk]:
    """Split a document into sentence-aware overlapping chunks."""
    if len(text) <= CHUNK_SIZE:
        return [Chunk(text, doc_id, source, 0, 1, category)]

    chunks = []
    sentences = text.replace('. ', '.|').replace('! ', '!|').replace('? ', '?|').split('|')

    current_chunk = ""
    chunk_idx = 0

    for sentence in sentences:
        if len(current_chunk) + len(sentence) + 1 > CHUNK_SIZE and current_chunk:
            chunks.append(current_chunk.strip())
            # Overlap: keep last 100 chars of current chunk
            overlap_start = max(0, len(current_chunk) - CHUNK_OVERLAP)
            current_chunk = current_chunk[overlap_start:] + " " + sentence
            chunk_idx += 1
        else:
            current_chunk += (" " if current_chunk else "") + sentence

    if current_chunk.strip():
        chunks.append(current_chunk.strip())

    total = len(chunks)
    return [
        Chunk(content=c, doc_id=doc_id, source=source, chunk_index=i,
              total_chunks=total, category=category)
        for i, c in enumerate(chunks)
    ]

def load_checkpoint(checkpoint_file: str) -> int:
    """Load last successfully ingested index from checkpoint file."""
    p = Path(checkpoint_file)
    if p.exists():
        data = json.loads(p.read_text())
        logger.info(f"Resuming from checkpoint: record {data['last_index']}")
        return data["last_index"]
    return 0

def save_checkpoint(checkpoint_file: str, index: int, ingested: int, failed: int) -> None:
    """Write checkpoint for resume capability."""
    Path(checkpoint_file).write_text(json.dumps({
        "last_index": index,
        "ingested": ingested,
        "failed": failed,
        "timestamp": datetime.now(timezone.utc).isoformat()
    }, indent=2))

class RateLimiter:
    """Token bucket rate limiter."""
    def __init__(self, rps: float):
        self.min_interval = 1.0 / rps
        self.last_call = 0.0

    def wait(self) -> None:
        now = time.monotonic()
        elapsed = now - self.last_call
        wait_time = self.min_interval - elapsed
        if wait_time > 0:
            time.sleep(wait_time)
        self.last_call = time.monotonic()

rate_limiter = RateLimiter(RATE_LIMIT_RPS)

def ingest_chunk(chunk: Chunk) -> bool:
    """Ingest a single chunk with retry logic. Returns True on success."""
    for attempt in range(MAX_RETRIES):
        try:
            rate_limiter.wait()
            client.store_memory(
                agent_id=AGENT_ID,
                content=chunk.content,
                importance=chunk.importance,
                memory_type="semantic",
                tags=[
                    f"source:{chunk.source}",
                    f"category:{chunk.category}",
                    f"batch:{BATCH_ID[:10]}"
                ],
                metadata={
                    "source": chunk.source,
                    "doc_id": chunk.doc_id,
                    "chunk_index": chunk.chunk_index,
                    "total_chunks": chunk.total_chunks,
                    "category": chunk.category,
                    "ingestion_batch": BATCH_ID
                }
            )
            return True
        except Exception as e:
            if attempt < MAX_RETRIES - 1:
                backoff = 2 ** attempt
                logger.warning(f"Retry {attempt + 1}/{MAX_RETRIES} for {chunk.doc_id}[{chunk.chunk_index}]: {e}. Waiting {backoff}s")
                time.sleep(backoff)
            else:
                logger.error(f"PERMANENT FAILURE: {chunk.doc_id}[{chunk.chunk_index}]: {e}")
                return False
    return False

def load_jsonl(file_path: str) -> Iterator[dict]:
    """Stream documents from a JSONL file."""
    with open(file_path) as f:
        for line in f:
            line = line.strip()
            if line:
                yield json.loads(line)

def batch_ingest(
    source_file: str,
    checkpoint_file: str = ".ingest_checkpoint.json",
    dry_run: bool = False
) -> dict:
    """Main batch ingestion function with progress tracking and resume support."""

    # Resume from checkpoint if available
    start_index = load_checkpoint(checkpoint_file)
    ingested, failed = 0, 0
    start_time = time.monotonic()

    # Collect all chunks to ingest (skipping already-processed records)
    all_chunks: list[Chunk] = []
    for i, doc in enumerate(load_jsonl(source_file)):
        if i < start_index:
            continue  # Skip already processed
        doc_chunks = chunk_document(
            text=doc["content"],
            doc_id=doc.get("id", f"doc_{i:06d}"),
            source=doc.get("source", source_file),
            category=doc.get("category", "general")
        )
        all_chunks.extend(doc_chunks)

    total = len(all_chunks)
    logger.info(f"Ingesting {total} chunks from {source_file} (starting at record {start_index})")

    if dry_run:
        logger.info(f"DRY RUN: would ingest {total} chunks. Exiting.")
        return {"dry_run": True, "chunks": total}

    failed_chunks: list[Chunk] = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {executor.submit(ingest_chunk, chunk): chunk for chunk in all_chunks}

        for i, future in enumerate(concurrent.futures.as_completed(futures)):
            chunk = futures[future]
            if future.result():
                ingested += 1
            else:
                failed += 1
                failed_chunks.append(chunk)

            # Progress logging and checkpoint
            if (i + 1) % CHECKPOINT_INTERVAL == 0:
                elapsed = time.monotonic() - start_time
                rate = ingested / elapsed if elapsed > 0 else 0
                eta_min = (total - i - 1) / max(rate * 60, 1)
                logger.info(
                    f"Progress: {i+1}/{total} chunks | "
                    f"{ingested} ok, {failed} failed | "
                    f"{rate:.1f} chunks/s | ETA: {eta_min:.1f} min"
                )
                save_checkpoint(checkpoint_file, start_index + i + 1, ingested, failed)

    # Final checkpoint
    save_checkpoint(checkpoint_file, start_index + total, ingested, failed)

    # Write failed records for manual review
    if failed_chunks:
        failed_file = f"failed_chunks_{BATCH_ID[:10]}.jsonl"
        with open(failed_file, 'w') as f:
            for c in failed_chunks:
                f.write(json.dumps({"doc_id": c.doc_id, "chunk_index": c.chunk_index, "content": c.content[:200]}) + '
')
        logger.warning(f"Wrote {len(failed_chunks)} failed chunks to {failed_file}")

    elapsed = time.monotonic() - start_time
    result = {
        "total_chunks": total,
        "ingested": ingested,
        "failed": failed,
        "elapsed_seconds": round(elapsed, 1),
        "throughput_per_min": round(ingested / (elapsed / 60), 0),
        "success_rate": round(ingested / max(total, 1), 4)
    }
    logger.info(f"Batch complete: {result}")
    return result

# Run the ingestion
result = batch_ingest("knowledge-base.jsonl", dry_run=False)
print(f"Ingested {result['ingested']:,} chunks in {result['elapsed_seconds']}s ({result['throughput_per_min']:.0f}/min)")
import { DakeraClient } from '@dakera-ai/dakera';
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
import { writeFileSync, readFileSync, existsSync } from 'fs';

const client = new DakeraClient({ baseUrl: 'http://localhost:3300', apiKey: 'dk-...' });

const CONFIG = {
  agentId: 'knowledge-base',
  maxWorkers: 20,
  rateLimitRps: 20,
  chunkSize: 800,
  chunkOverlap: 100,
  checkpointInterval: 500,
  maxRetries: 3,
  batchId: new Date().toISOString(),
};

interface Chunk {
  content: string;
  docId: string;
  source: string;
  chunkIndex: number;
  totalChunks: number;
  category: string;
  importance?: number;
}

function chunkDocument(text: string, docId: string, source: string, category: string): Chunk[] {
  if (text.length <= CONFIG.chunkSize) {
    return [{ content: text, docId, source, chunkIndex: 0, totalChunks: 1, category }];
  }

  const sentences = text.replace(/. /g, '.|').replace(/! /g, '!|').replace(/? /g, '?|').split('|');
  const rawChunks: string[] = [];
  let current = '';

  for (const sentence of sentences) {
    if (current.length + sentence.length + 1 > CONFIG.chunkSize && current) {
      rawChunks.push(current.trim());
      const overlapStart = Math.max(0, current.length - CONFIG.chunkOverlap);
      current = current.slice(overlapStart) + ' ' + sentence;
    } else {
      current += (current ? ' ' : '') + sentence;
    }
  }
  if (current.trim()) rawChunks.push(current.trim());

  return rawChunks.map((content, i) => ({
    content, docId, source, chunkIndex: i, totalChunks: rawChunks.length, category,
  }));
}

class RateLimiter {
  private lastCall = 0;
  private readonly minInterval: number;
  constructor(rps: number) { this.minInterval = 1000 / rps; }

  async wait(): Promise<void> {
    const now = Date.now();
    const elapsed = now - this.lastCall;
    if (elapsed < this.minInterval) {
      await new Promise(r => setTimeout(r, this.minInterval - elapsed));
    }
    this.lastCall = Date.now();
  }
}

const rateLimiter = new RateLimiter(CONFIG.rateLimitRps);

async function ingestChunk(chunk: Chunk): Promise<boolean> {
  for (let attempt = 0; attempt < CONFIG.maxRetries; attempt++) {
    try {
      await rateLimiter.wait();
      await client.storeMemory(CONFIG.agentId, {
        content: chunk.content,
        importance: chunk.importance ?? 0.80,
        memoryType: 'semantic',
        tags: [`source:${chunk.source}`, `category:${chunk.category}`, `batch:${CONFIG.batchId.slice(0, 10)}`],
      });
      return true;
    } catch (err) {
      if (attempt < CONFIG.maxRetries - 1) {
        const backoff = Math.pow(2, attempt) * 1000;
        console.warn(`Retry ${attempt + 1} for ${chunk.docId}[${chunk.chunkIndex}]: waiting ${backoff}ms`);
        await new Promise(r => setTimeout(r, backoff));
      } else {
        console.error(`PERMANENT FAILURE: ${chunk.docId}[${chunk.chunkIndex}]: ${err}`);
        return false;
      }
    }
  }
  return false;
}

async function batchIngest(sourceFile: string, checkpointFile = '.ingest_checkpoint.json'): Promise<void> {
  const start = Date.now();
  let ingested = 0, failed = 0;

  // Read all documents and chunk them
  const allChunks: Chunk[] = [];
  const rl = createInterface({ input: createReadStream(sourceFile) });
  let lineIndex = 0;
  for await (const line of rl) {
    if (!line.trim()) continue;
    const doc = JSON.parse(line);
    const chunks = chunkDocument(doc.content, doc.id ?? `doc_${lineIndex}`, doc.source ?? sourceFile, doc.category ?? 'general');
    allChunks.push(...chunks);
    lineIndex++;
  }

  console.log(`Ingesting ${allChunks.length} chunks from ${lineIndex} documents`);

  // Process in batches of maxWorkers
  const BATCH = CONFIG.maxWorkers;
  for (let i = 0; i < allChunks.length; i += BATCH) {
    const batch = allChunks.slice(i, i + BATCH);
    const results = await Promise.all(batch.map(c => ingestChunk(c)));
    ingested += results.filter(Boolean).length;
    failed += results.filter(r => !r).length;

    if ((i + BATCH) % CONFIG.checkpointInterval < BATCH) {
      const elapsed = (Date.now() - start) / 1000;
      const rate = ingested / elapsed;
      const eta = (allChunks.length - i - BATCH) / Math.max(rate * 60, 1);
      console.log(`Progress: ${i + BATCH}/${allChunks.length} | ${ingested} ok | ${rate.toFixed(1)} chunks/s | ETA: ${eta.toFixed(1)} min`);
      writeFileSync(checkpointFile, JSON.stringify({ index: i + BATCH, ingested, failed }));
    }
  }

  const elapsed = (Date.now() - start) / 1000;
  console.log(`Complete: ${ingested} ingested, ${failed} failed in ${elapsed.toFixed(1)}s (${(ingested / (elapsed / 60)).toFixed(0)}/min)`);
}

await batchIngest('knowledge-base.jsonl');
use dakera_rs::{Client, StoreMemoryRequest};
use std::sync::Arc;
use tokio::{sync::Semaphore, task::JoinSet, time::{sleep, Duration}};

let client = Arc::new(Client::new("http://localhost:3300", "dk-..."));
let semaphore = Arc::new(Semaphore::new(20)); // 20 concurrent workers

let documents: Vec<String> = load_documents("knowledge-base.jsonl");
let total = documents.len();
println!("Ingesting {} documents", total);

let mut set = JoinSet::new();
let mut ingested = 0u64;
let mut failed = 0u64;

for (i, content) in documents.into_iter().enumerate() {
    let client = client.clone();
    let permit = semaphore.clone().acquire_owned().await.unwrap();

    set.spawn(async move {
        let _permit = permit; // Released when dropped
        let mut last_err = None;

        // Exponential backoff retry
        for attempt in 0..3u32 {
            match client.store_memory("knowledge-base", StoreMemoryRequest {
                content: content.clone(),
                importance: Some(0.80),
                memory_type: "semantic".into(),
                tags: vec!["source:docs".into(), "category:general".into()],
                ..Default::default()
            }).await {
                Ok(_) => return (i, true),
                Err(e) => {
                    last_err = Some(e);
                    if attempt < 2 {
                        sleep(Duration::from_secs(2u64.pow(attempt))).await;
                    }
                }
            }
        }
        eprintln!("Permanent failure at doc {}: {:?}", i, last_err);
        (i, false)
    });
}

while let Some(result) = set.join_next().await {
    match result {
        Ok((_, true)) => ingested += 1,
        Ok((_, false)) => failed += 1,
        Err(e) => { failed += 1; eprintln!("Join error: {:?}", e); }
    }
    if (ingested + failed) % 500 == 0 {
        println!("Progress: {} ok, {} failed / {} total", ingested, failed, total);
    }
}

println!("Batch complete: {} ingested, {} failed", ingested, failed);
package main

import (
    "bufio"
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "sync"
    "sync/atomic"
    "time"

    dakera "github.com/dakera-ai/dakera-go"
)

const (
    maxWorkers = 20
    maxRetries = 3
    rateLimit  = 20 // req/s
)

func ingestChunkWithRetry(ctx context.Context, client *dakera.Client, content, source, docID string) bool {
    for attempt := 0; attempt < maxRetries; attempt++ {
        _, err := client.StoreMemory(ctx, "knowledge-base", dakera.StoreMemoryRequest{
            Content:    content,
            Importance: 0.80,
            MemoryType: "semantic",
            Tags:       []string{"source:" + source, "category:general"},
            Metadata: map[string]interface{}{
                "source": source,
                "doc_id": docID,
            },
        })
        if err == nil {
            return true
        }
        if attempt < maxRetries-1 {
            backoff := time.Duration(1<<uint(attempt)) * time.Second
            log.Printf("Retry %d for %s: %v. Waiting %v", attempt+1, docID, err, backoff)
            time.Sleep(backoff)
        } else {
            log.Printf("PERMANENT FAILURE: %s: %v", docID, err)
        }
    }
    return false
}

func batchIngest(ctx context.Context, client *dakera.Client, sourceFile string) {
    f, _ := os.Open(sourceFile)
    defer f.Close()

    sem := make(chan struct{}, maxWorkers)
    rateTicker := time.NewTicker(time.Second / rateLimit)
    defer rateTicker.Stop()

    var wg sync.WaitGroup
    var ingested, failed atomic.Int64
    total := 0

    scanner := bufio.NewScanner(f)
    for scanner.Scan() {
        line := scanner.Text()
        if line == "" { continue }

        var doc map[string]interface{}
        if err := json.Unmarshal([]byte(line), &doc); err != nil { continue }

        total++
        content, _ := doc["content"].(string)
        docID, _ := doc["id"].(string)
        if docID == "" { docID = fmt.Sprintf("doc_%06d", total) }
        source, _ := doc["source"].(string)

        <-rateTicker.C // Rate limit
        sem <- struct{}{}
        wg.Add(1)

        go func(c, d, s string) {
            defer wg.Done()
            defer func() { <-sem }()
            if ingestChunkWithRetry(ctx, client, c, s, d) {
                ingested.Add(1)
            } else {
                failed.Add(1)
            }

            if (ingested.Load()+failed.Load())%500 == 0 {
                fmt.Printf("Progress: %d ok, %d failed
", ingested.Load(), failed.Load())
            }
        }(content, docID, source)
    }

    wg.Wait()
    fmt.Printf("Complete: %d ingested, %d failed out of %d total
", ingested.Load(), failed.Load(), total)
}

func main() {
    client := dakera.NewClient("http://localhost:3300", "dk-...")
    batchIngest(context.Background(), client, "knowledge-base.jsonl")
}

Bootstrap your agent with 50k+ documents today

Dakera's ingestion pipeline handles millions of memory records across all our SDKs. Start your knowledge base migration in minutes.

Try Free →

Real-World Scenario: Knowledge Base Migration — 50,000 Documents

An enterprise software company migrates their support knowledge base to power a new AI assistant. The source is 50,243 Confluence pages averaging 2,400 characters each. Total estimated chunks: ~150,000.

  • Audit phase (1 hour): Sample 200 documents. Discover that 18% are duplicates from page cloning. Identify 4 importance tiers: critical procedures (0.95), general documentation (0.80), archived content (0.60), draft pages (0.45). Tag strategy defined: tier, space, last_modified.
  • Dry run (10 minutes): Run pipeline on 500 documents. Verify chunking produces expected 1.5–3 chunks per page. Confirm estimated completion: 150,000 chunks ÷ (20 workers × 18 chunks/s) = ~6.9 hours.
  • Ingestion run (7.2 hours): 149,847 chunks ingested, 153 permanent failures (0.1%). Checkpoint file saved every 500 records — network blip at record 97,000 recovered automatically from checkpoint. Failed records written to failed_chunks_2024-08-01.jsonl for manual review.
  • Post-ingestion dedup (45 minutes): Semantic deduplication finds 12,400 near-duplicate chunks from the cloned Confluence pages. After dedup, effective corpus: 137,447 unique chunks.
  • Verification (15 minutes): 20 sample recall queries. All return relevant results. Average recall latency: 14ms. Agent achieves 94% answer accuracy on support questions vs. 61% baseline (no memory).

Before / After Memory State

Before: Empty Memory Store
// Day 1: agent deployed
// Memory store: 0 records

// User: "How do I configure
// SSO for our org?"

// Agent: "I don't have specific
// information about your
// organization's SSO setup.
// Please check your docs."

// Zero-value response.
// User opens a support ticket.
// Agent provides no ROI.
After: 137k Memories Ingested
// Day 1: agent deployed with
// 137,447 ingested memories

// User: "How do I configure
// SSO for our org?"

// Recall: 8 relevant chunks
// from SSO_Configuration.md
// + 3 from SAML_Setup_Guide.md

// Agent: "Here are the exact
// steps for your Okta setup..."

// 94% answer accuracy.
// Support tickets -67%.

SDK Method Reference

MethodSDKPurpose in this pattern
store_memory(agent_id, content, importance, memory_type, tags)PythonIngest a single chunk with metadata
batch_forget(request)PythonRollback an ingestion batch by batch tag
recall(agent_id, query, top_k)PythonPost-ingestion verification sampling
storeMemory(agentId, {content, importance, memoryType, tags})TypeScriptAsync parallel chunk ingestion
batchForget(request)TypeScriptBatch rollback by ingestion batch tag
client.store_memory("agent", StoreMemoryRequest{...}).await?RustAsync chunk store with JoinSet concurrency
client.StoreMemory(ctx, "agent", StoreMemoryRequest{...})GoGoroutine-parallel chunk ingestion

Edge Cases and Gotchas

  • Silent deduplication failures: If your source data has near-duplicate documents (Confluence page clones, CRM record copies), they'll be ingested separately, consuming twice the memory and degrading recall quality. Always run a pre-ingestion dedup step on your source data, or a post-ingestion semantic dedup pass using Dakera's Semantic Deduplication pattern.
  • Chunk boundary semantic loss: A sentence split across two chunks means neither chunk has full context for that sentence. Sentence-aware chunking (breaking only at sentence boundaries) with 100-character overlap minimizes this. For especially context-sensitive content (legal documents, code), consider larger overlap (200–300 characters).
  • Importance score uniformity: Assigning importance=0.80 to all 150,000 chunks means everything is equally important at recall time. Differentiate by content quality: procedure documents get 0.90, general documentation 0.80, archived content 0.60. This dramatically improves recall precision for high-value queries.
  • Ingestion during live traffic: Running a large ingestion while the agent is serving requests can cause a temporary recall accuracy dip — the index is partially updated. For production migrations, prefer maintenance windows or write to a separate namespace, verify quality, then switch the active namespace.
  • No rollback for partial ingestion: If you ingest 80% of your corpus and discover a data quality issue, you can't easily "undo" 120,000 stored memories. Design rollback into your pipeline from the start: tag every ingested chunk with a unique batch_id, so you can call batch_forget() with that batch tag to roll back the entire run cleanly.
Note: Throughput benchmarks for self-hosted Dakera

On a standard self-hosted Dakera instance (cpx32, 8 vCPU, 32GB RAM): 20 concurrent workers sustains ~18 req/s sustained with <1% error rate. 50 workers can temporarily burst to 35 req/s but generates 503 errors when the rate limiter triggers. For cloud-hosted Dakera, check your plan's rate limit documentation — limits may be higher. Always test with your specific instance before planning ingestion timelines.

Performance Considerations

~1,200
Chunks/min at 20 workers (self-hosted)
800 chars
Optimal chunk size for recall precision
<0.1%
Permanent failure rate with retry backoff
Advanced Configuration: Adaptive rate limiting and dynamic chunking

Implement adaptive rate limiting that backs off when 429 responses are detected, and dynamic chunking that adjusts chunk size based on content type:

class AdaptiveRateLimiter:
    def __init__(self, initial_rps: float = 20.0):
        self.current_rps = initial_rps
        self.min_rps = 5.0
        self.max_rps = 30.0
        self.interval = 1.0 / initial_rps
        self.last_call = 0.0
        self.consecutive_errors = 0

    def on_success(self):
        self.consecutive_errors = 0
        # Slowly ramp up if we've been error-free for a while
        if self.current_rps < self.max_rps:
            self.current_rps = min(self.current_rps * 1.05, self.max_rps)
            self.interval = 1.0 / self.current_rps

    def on_rate_limit_error(self):
        self.consecutive_errors += 1
        # Aggressive backoff on rate limit
        self.current_rps = max(self.current_rps * 0.7, self.min_rps)
        self.interval = 1.0 / self.current_rps

CHUNK_SIZES = {
    "code": 400,       # Code chunks: smaller for precise recall
    "procedure": 600,  # Step-by-step docs: medium
    "general": 800,    # General text: standard
    "reference": 1000, # Reference docs: larger for coherence
}

def get_chunk_size(category: str) -> int:
    return CHUNK_SIZES.get(category, 800)

Migrate your knowledge base to Dakera in hours

The batch ingestion pattern handles 50k+ documents with rate limiting, retry logic, and checkpointing built in. Your agent goes from empty to expert on day one.

Start Building Free →