Batch Ingestion
A new agent starts with an empty memory store — it knows nothing. Day-one usefulness requires bootstrapping from existing knowledge: CRM records, support ticket histories, technical documentation, product catalogs. Migrating 50,000 documents one at a time through a for loop is unreliable and slow. This pattern implements a production-grade batch ingestion pipeline with chunking, rate control, resume capability, and progress monitoring.
Start Building Free →- Dakera instance running (quickstart)
- SDK installed:
pip install dakera/npm i @dakera-ai/dakera - Source data in JSON, JSONL, CSV, or database export format
- Estimated document count and average document size to plan rate limits and total runtime
The Problem
Knowledge base migration is consistently underestimated. Three real failure modes teams hit when writing a naive ingestion loop:
- Rate limit crashes: A loop sending 100 req/s hits Dakera's rate limiter at 20 req/s, generating 503s. No retry logic means thousands of documents are silently skipped. The memory store appears complete but has 60% gap coverage.
- No resume capability: A 50,000-document import runs for 2 hours, hits a network timeout at record 38,000, and the developer must restart from record 0. Without checkpoint logic, re-ingesting everything creates duplicates and wastes hours.
- Monolithic chunk size: 10-page PDFs are ingested as single memories. A recall query for a specific section returns the entire document (10,000+ characters), consuming the full context window budget. Chunking to 800 characters per memory dramatically improves recall precision.
- No deduplication: Running the ingestion script twice (common during testing) creates 100,000 nearly-identical memories from 50,000 documents. Recall quality degrades as duplicates compete for the same context window space.
Architecture
This pattern implements a five-stage pipeline: Load → Chunk → Tag → Ingest (with rate limiting and retry) → Verify. A checkpoint file tracks the last successfully ingested record ID, enabling resume from any point. Progress is logged in real-time. A post-ingestion deduplication pass removes near-duplicate chunks.
Five-stage batch ingestion pipeline: Load source data, Chunk into memory-sized segments, Tag with provenance metadata, Ingest with rate limiting and retry, then Verify completeness and deduplication.
Throughput vs. latency tradeoff: 20 concurrent workers achieves the optimal balance of ~1,200 docs/min with minimal error rate. Beyond 20 workers, rate limiting kicks in and errors increase faster than throughput gains.
Implementation Steps
-
Audit and prepare your source dataBefore writing code, sample 100 records from your source and answer: What is the average document length? Are there duplicates? What metadata is available (author, date, category, source system)? Which documents are highest value and should get higher importance scores? Documents over 800 characters need chunking; those under 200 characters may need merging with adjacent records to create meaningful memory units.
-
Implement sentence-aware chunking with overlapSplit long documents into chunks of 600–900 characters, breaking only at sentence boundaries (never mid-sentence). Add a 100-character overlap between adjacent chunks so that contextual information at chunk boundaries isn't lost during recall. Assign each chunk a
chunk_indexandtotal_chunksin metadata so you can reconstruct document order if needed. -
Tag every chunk with complete provenance metadataEach memory must carry enough metadata to trace it back to its source:
source(filename or system name),doc_id(unique document ID from the source system),chunk_index,ingestion_batch(timestamp of this batch run), andcategory. This metadata enables surgical deletion, batch-specific rollback, and selective re-ingestion when source documents are updated. -
Run ingestion with worker pool, rate limiting, and retryUse 20 concurrent workers (the empirical sweet spot for self-hosted Dakera). Implement a token bucket rate limiter capped at 20 req/s. For failed requests, use exponential backoff: wait 1s, 2s, 4s, 8s before declaring a permanent failure. Log permanent failures to a separate file for manual review — never silently skip.
-
Write checkpoint files for resume capabilityEvery 500 successfully ingested records, write the current record index and ingestion timestamp to a checkpoint file. On startup, check for an existing checkpoint and resume from that position. This reduces the blast radius of any network failure to a maximum of 500 records re-ingested (creating duplicates) — which the post-ingestion deduplication pass will remove.
Before ingesting 50,000 documents, run the full pipeline on a 500-document sample. Verify that chunking produces the expected number of chunks, metadata is correctly attached, recall queries return relevant results, and estimated completion time is acceptable. A 5-minute dry run prevents a 4-hour wasted ingestion run due to a configuration error.
Implementation
# Single document store (use parallel requests for batch)
curl -X POST http://localhost:3300/v1/memory/store \
-H "Authorization: Bearer dk-..." \
-H "Content-Type: application/json" \
-d '{
"agent_id": "knowledge-base",
"content": "Dakera provides persistent memory for AI agents. It stores, indexes, and retrieves memories using semantic search. Designed for production AI systems requiring cross-session context.",
"importance": 0.80,
"memory_type": "semantic",
"tags": ["source:docs", "category:product", "batch:2024-08-01"],
"metadata": {
"source": "product-documentation.md",
"doc_id": "doc_001",
"chunk_index": 0,
"total_chunks": 3,
"ingestion_batch": "2024-08-01T10:00:00Z",
"category": "product_docs"
}
}'
# Verify ingestion: sample recall after batch completes
curl "http://localhost:3300/v1/memory/recall?agent_id=knowledge-base&query=persistent+memory+AI+agents&top_k=5" \
-H "Authorization: Bearer dk-..."
# Check ingestion stats (count memories in namespace)
curl "http://localhost:3300/v1/memory/search?agent_id=knowledge-base&query=*&top_k=1" \
-H "Authorization: Bearer dk-..."import asyncio
import json
import logging
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterator
import concurrent.futures
from dakera import DakeraClient
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
logger = logging.getLogger(__name__)
client = DakeraClient(base_url="http://localhost:3300", api_key="dk-...")
# Configuration
AGENT_ID = "knowledge-base"
MAX_WORKERS = 20
RATE_LIMIT_RPS = 20 # requests per second
CHUNK_SIZE = 800 # max characters per chunk
CHUNK_OVERLAP = 100 # overlap between consecutive chunks
CHECKPOINT_INTERVAL = 500 # save progress every N records
MAX_RETRIES = 3
BATCH_ID = datetime.now(timezone.utc).isoformat()
@dataclass
class Chunk:
content: str
doc_id: str
source: str
chunk_index: int
total_chunks: int
category: str
importance: float = 0.80
def chunk_document(text: str, doc_id: str, source: str, category: str) -> list[Chunk]:
"""Split a document into sentence-aware overlapping chunks."""
if len(text) <= CHUNK_SIZE:
return [Chunk(text, doc_id, source, 0, 1, category)]
chunks = []
sentences = text.replace('. ', '.|').replace('! ', '!|').replace('? ', '?|').split('|')
current_chunk = ""
chunk_idx = 0
for sentence in sentences:
if len(current_chunk) + len(sentence) + 1 > CHUNK_SIZE and current_chunk:
chunks.append(current_chunk.strip())
# Overlap: keep last 100 chars of current chunk
overlap_start = max(0, len(current_chunk) - CHUNK_OVERLAP)
current_chunk = current_chunk[overlap_start:] + " " + sentence
chunk_idx += 1
else:
current_chunk += (" " if current_chunk else "") + sentence
if current_chunk.strip():
chunks.append(current_chunk.strip())
total = len(chunks)
return [
Chunk(content=c, doc_id=doc_id, source=source, chunk_index=i,
total_chunks=total, category=category)
for i, c in enumerate(chunks)
]
def load_checkpoint(checkpoint_file: str) -> int:
"""Load last successfully ingested index from checkpoint file."""
p = Path(checkpoint_file)
if p.exists():
data = json.loads(p.read_text())
logger.info(f"Resuming from checkpoint: record {data['last_index']}")
return data["last_index"]
return 0
def save_checkpoint(checkpoint_file: str, index: int, ingested: int, failed: int) -> None:
"""Write checkpoint for resume capability."""
Path(checkpoint_file).write_text(json.dumps({
"last_index": index,
"ingested": ingested,
"failed": failed,
"timestamp": datetime.now(timezone.utc).isoformat()
}, indent=2))
class RateLimiter:
"""Token bucket rate limiter."""
def __init__(self, rps: float):
self.min_interval = 1.0 / rps
self.last_call = 0.0
def wait(self) -> None:
now = time.monotonic()
elapsed = now - self.last_call
wait_time = self.min_interval - elapsed
if wait_time > 0:
time.sleep(wait_time)
self.last_call = time.monotonic()
rate_limiter = RateLimiter(RATE_LIMIT_RPS)
def ingest_chunk(chunk: Chunk) -> bool:
"""Ingest a single chunk with retry logic. Returns True on success."""
for attempt in range(MAX_RETRIES):
try:
rate_limiter.wait()
client.store_memory(
agent_id=AGENT_ID,
content=chunk.content,
importance=chunk.importance,
memory_type="semantic",
tags=[
f"source:{chunk.source}",
f"category:{chunk.category}",
f"batch:{BATCH_ID[:10]}"
],
metadata={
"source": chunk.source,
"doc_id": chunk.doc_id,
"chunk_index": chunk.chunk_index,
"total_chunks": chunk.total_chunks,
"category": chunk.category,
"ingestion_batch": BATCH_ID
}
)
return True
except Exception as e:
if attempt < MAX_RETRIES - 1:
backoff = 2 ** attempt
logger.warning(f"Retry {attempt + 1}/{MAX_RETRIES} for {chunk.doc_id}[{chunk.chunk_index}]: {e}. Waiting {backoff}s")
time.sleep(backoff)
else:
logger.error(f"PERMANENT FAILURE: {chunk.doc_id}[{chunk.chunk_index}]: {e}")
return False
return False
def load_jsonl(file_path: str) -> Iterator[dict]:
"""Stream documents from a JSONL file."""
with open(file_path) as f:
for line in f:
line = line.strip()
if line:
yield json.loads(line)
def batch_ingest(
source_file: str,
checkpoint_file: str = ".ingest_checkpoint.json",
dry_run: bool = False
) -> dict:
"""Main batch ingestion function with progress tracking and resume support."""
# Resume from checkpoint if available
start_index = load_checkpoint(checkpoint_file)
ingested, failed = 0, 0
start_time = time.monotonic()
# Collect all chunks to ingest (skipping already-processed records)
all_chunks: list[Chunk] = []
for i, doc in enumerate(load_jsonl(source_file)):
if i < start_index:
continue # Skip already processed
doc_chunks = chunk_document(
text=doc["content"],
doc_id=doc.get("id", f"doc_{i:06d}"),
source=doc.get("source", source_file),
category=doc.get("category", "general")
)
all_chunks.extend(doc_chunks)
total = len(all_chunks)
logger.info(f"Ingesting {total} chunks from {source_file} (starting at record {start_index})")
if dry_run:
logger.info(f"DRY RUN: would ingest {total} chunks. Exiting.")
return {"dry_run": True, "chunks": total}
failed_chunks: list[Chunk] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {executor.submit(ingest_chunk, chunk): chunk for chunk in all_chunks}
for i, future in enumerate(concurrent.futures.as_completed(futures)):
chunk = futures[future]
if future.result():
ingested += 1
else:
failed += 1
failed_chunks.append(chunk)
# Progress logging and checkpoint
if (i + 1) % CHECKPOINT_INTERVAL == 0:
elapsed = time.monotonic() - start_time
rate = ingested / elapsed if elapsed > 0 else 0
eta_min = (total - i - 1) / max(rate * 60, 1)
logger.info(
f"Progress: {i+1}/{total} chunks | "
f"{ingested} ok, {failed} failed | "
f"{rate:.1f} chunks/s | ETA: {eta_min:.1f} min"
)
save_checkpoint(checkpoint_file, start_index + i + 1, ingested, failed)
# Final checkpoint
save_checkpoint(checkpoint_file, start_index + total, ingested, failed)
# Write failed records for manual review
if failed_chunks:
failed_file = f"failed_chunks_{BATCH_ID[:10]}.jsonl"
with open(failed_file, 'w') as f:
for c in failed_chunks:
f.write(json.dumps({"doc_id": c.doc_id, "chunk_index": c.chunk_index, "content": c.content[:200]}) + '
')
logger.warning(f"Wrote {len(failed_chunks)} failed chunks to {failed_file}")
elapsed = time.monotonic() - start_time
result = {
"total_chunks": total,
"ingested": ingested,
"failed": failed,
"elapsed_seconds": round(elapsed, 1),
"throughput_per_min": round(ingested / (elapsed / 60), 0),
"success_rate": round(ingested / max(total, 1), 4)
}
logger.info(f"Batch complete: {result}")
return result
# Run the ingestion
result = batch_ingest("knowledge-base.jsonl", dry_run=False)
print(f"Ingested {result['ingested']:,} chunks in {result['elapsed_seconds']}s ({result['throughput_per_min']:.0f}/min)")import { DakeraClient } from '@dakera-ai/dakera';
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
import { writeFileSync, readFileSync, existsSync } from 'fs';
const client = new DakeraClient({ baseUrl: 'http://localhost:3300', apiKey: 'dk-...' });
const CONFIG = {
agentId: 'knowledge-base',
maxWorkers: 20,
rateLimitRps: 20,
chunkSize: 800,
chunkOverlap: 100,
checkpointInterval: 500,
maxRetries: 3,
batchId: new Date().toISOString(),
};
interface Chunk {
content: string;
docId: string;
source: string;
chunkIndex: number;
totalChunks: number;
category: string;
importance?: number;
}
function chunkDocument(text: string, docId: string, source: string, category: string): Chunk[] {
if (text.length <= CONFIG.chunkSize) {
return [{ content: text, docId, source, chunkIndex: 0, totalChunks: 1, category }];
}
const sentences = text.replace(/. /g, '.|').replace(/! /g, '!|').replace(/? /g, '?|').split('|');
const rawChunks: string[] = [];
let current = '';
for (const sentence of sentences) {
if (current.length + sentence.length + 1 > CONFIG.chunkSize && current) {
rawChunks.push(current.trim());
const overlapStart = Math.max(0, current.length - CONFIG.chunkOverlap);
current = current.slice(overlapStart) + ' ' + sentence;
} else {
current += (current ? ' ' : '') + sentence;
}
}
if (current.trim()) rawChunks.push(current.trim());
return rawChunks.map((content, i) => ({
content, docId, source, chunkIndex: i, totalChunks: rawChunks.length, category,
}));
}
class RateLimiter {
private lastCall = 0;
private readonly minInterval: number;
constructor(rps: number) { this.minInterval = 1000 / rps; }
async wait(): Promise<void> {
const now = Date.now();
const elapsed = now - this.lastCall;
if (elapsed < this.minInterval) {
await new Promise(r => setTimeout(r, this.minInterval - elapsed));
}
this.lastCall = Date.now();
}
}
const rateLimiter = new RateLimiter(CONFIG.rateLimitRps);
async function ingestChunk(chunk: Chunk): Promise<boolean> {
for (let attempt = 0; attempt < CONFIG.maxRetries; attempt++) {
try {
await rateLimiter.wait();
await client.storeMemory(CONFIG.agentId, {
content: chunk.content,
importance: chunk.importance ?? 0.80,
memoryType: 'semantic',
tags: [`source:${chunk.source}`, `category:${chunk.category}`, `batch:${CONFIG.batchId.slice(0, 10)}`],
});
return true;
} catch (err) {
if (attempt < CONFIG.maxRetries - 1) {
const backoff = Math.pow(2, attempt) * 1000;
console.warn(`Retry ${attempt + 1} for ${chunk.docId}[${chunk.chunkIndex}]: waiting ${backoff}ms`);
await new Promise(r => setTimeout(r, backoff));
} else {
console.error(`PERMANENT FAILURE: ${chunk.docId}[${chunk.chunkIndex}]: ${err}`);
return false;
}
}
}
return false;
}
async function batchIngest(sourceFile: string, checkpointFile = '.ingest_checkpoint.json'): Promise<void> {
const start = Date.now();
let ingested = 0, failed = 0;
// Read all documents and chunk them
const allChunks: Chunk[] = [];
const rl = createInterface({ input: createReadStream(sourceFile) });
let lineIndex = 0;
for await (const line of rl) {
if (!line.trim()) continue;
const doc = JSON.parse(line);
const chunks = chunkDocument(doc.content, doc.id ?? `doc_${lineIndex}`, doc.source ?? sourceFile, doc.category ?? 'general');
allChunks.push(...chunks);
lineIndex++;
}
console.log(`Ingesting ${allChunks.length} chunks from ${lineIndex} documents`);
// Process in batches of maxWorkers
const BATCH = CONFIG.maxWorkers;
for (let i = 0; i < allChunks.length; i += BATCH) {
const batch = allChunks.slice(i, i + BATCH);
const results = await Promise.all(batch.map(c => ingestChunk(c)));
ingested += results.filter(Boolean).length;
failed += results.filter(r => !r).length;
if ((i + BATCH) % CONFIG.checkpointInterval < BATCH) {
const elapsed = (Date.now() - start) / 1000;
const rate = ingested / elapsed;
const eta = (allChunks.length - i - BATCH) / Math.max(rate * 60, 1);
console.log(`Progress: ${i + BATCH}/${allChunks.length} | ${ingested} ok | ${rate.toFixed(1)} chunks/s | ETA: ${eta.toFixed(1)} min`);
writeFileSync(checkpointFile, JSON.stringify({ index: i + BATCH, ingested, failed }));
}
}
const elapsed = (Date.now() - start) / 1000;
console.log(`Complete: ${ingested} ingested, ${failed} failed in ${elapsed.toFixed(1)}s (${(ingested / (elapsed / 60)).toFixed(0)}/min)`);
}
await batchIngest('knowledge-base.jsonl');use dakera_rs::{Client, StoreMemoryRequest};
use std::sync::Arc;
use tokio::{sync::Semaphore, task::JoinSet, time::{sleep, Duration}};
let client = Arc::new(Client::new("http://localhost:3300", "dk-..."));
let semaphore = Arc::new(Semaphore::new(20)); // 20 concurrent workers
let documents: Vec<String> = load_documents("knowledge-base.jsonl");
let total = documents.len();
println!("Ingesting {} documents", total);
let mut set = JoinSet::new();
let mut ingested = 0u64;
let mut failed = 0u64;
for (i, content) in documents.into_iter().enumerate() {
let client = client.clone();
let permit = semaphore.clone().acquire_owned().await.unwrap();
set.spawn(async move {
let _permit = permit; // Released when dropped
let mut last_err = None;
// Exponential backoff retry
for attempt in 0..3u32 {
match client.store_memory("knowledge-base", StoreMemoryRequest {
content: content.clone(),
importance: Some(0.80),
memory_type: "semantic".into(),
tags: vec!["source:docs".into(), "category:general".into()],
..Default::default()
}).await {
Ok(_) => return (i, true),
Err(e) => {
last_err = Some(e);
if attempt < 2 {
sleep(Duration::from_secs(2u64.pow(attempt))).await;
}
}
}
}
eprintln!("Permanent failure at doc {}: {:?}", i, last_err);
(i, false)
});
}
while let Some(result) = set.join_next().await {
match result {
Ok((_, true)) => ingested += 1,
Ok((_, false)) => failed += 1,
Err(e) => { failed += 1; eprintln!("Join error: {:?}", e); }
}
if (ingested + failed) % 500 == 0 {
println!("Progress: {} ok, {} failed / {} total", ingested, failed, total);
}
}
println!("Batch complete: {} ingested, {} failed", ingested, failed);package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"log"
"os"
"sync"
"sync/atomic"
"time"
dakera "github.com/dakera-ai/dakera-go"
)
const (
maxWorkers = 20
maxRetries = 3
rateLimit = 20 // req/s
)
func ingestChunkWithRetry(ctx context.Context, client *dakera.Client, content, source, docID string) bool {
for attempt := 0; attempt < maxRetries; attempt++ {
_, err := client.StoreMemory(ctx, "knowledge-base", dakera.StoreMemoryRequest{
Content: content,
Importance: 0.80,
MemoryType: "semantic",
Tags: []string{"source:" + source, "category:general"},
Metadata: map[string]interface{}{
"source": source,
"doc_id": docID,
},
})
if err == nil {
return true
}
if attempt < maxRetries-1 {
backoff := time.Duration(1<<uint(attempt)) * time.Second
log.Printf("Retry %d for %s: %v. Waiting %v", attempt+1, docID, err, backoff)
time.Sleep(backoff)
} else {
log.Printf("PERMANENT FAILURE: %s: %v", docID, err)
}
}
return false
}
func batchIngest(ctx context.Context, client *dakera.Client, sourceFile string) {
f, _ := os.Open(sourceFile)
defer f.Close()
sem := make(chan struct{}, maxWorkers)
rateTicker := time.NewTicker(time.Second / rateLimit)
defer rateTicker.Stop()
var wg sync.WaitGroup
var ingested, failed atomic.Int64
total := 0
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
if line == "" { continue }
var doc map[string]interface{}
if err := json.Unmarshal([]byte(line), &doc); err != nil { continue }
total++
content, _ := doc["content"].(string)
docID, _ := doc["id"].(string)
if docID == "" { docID = fmt.Sprintf("doc_%06d", total) }
source, _ := doc["source"].(string)
<-rateTicker.C // Rate limit
sem <- struct{}{}
wg.Add(1)
go func(c, d, s string) {
defer wg.Done()
defer func() { <-sem }()
if ingestChunkWithRetry(ctx, client, c, s, d) {
ingested.Add(1)
} else {
failed.Add(1)
}
if (ingested.Load()+failed.Load())%500 == 0 {
fmt.Printf("Progress: %d ok, %d failed
", ingested.Load(), failed.Load())
}
}(content, docID, source)
}
wg.Wait()
fmt.Printf("Complete: %d ingested, %d failed out of %d total
", ingested.Load(), failed.Load(), total)
}
func main() {
client := dakera.NewClient("http://localhost:3300", "dk-...")
batchIngest(context.Background(), client, "knowledge-base.jsonl")
}Bootstrap your agent with 50k+ documents today
Dakera's ingestion pipeline handles millions of memory records across all our SDKs. Start your knowledge base migration in minutes.
Real-World Scenario: Knowledge Base Migration — 50,000 Documents
An enterprise software company migrates their support knowledge base to power a new AI assistant. The source is 50,243 Confluence pages averaging 2,400 characters each. Total estimated chunks: ~150,000.
- Audit phase (1 hour): Sample 200 documents. Discover that 18% are duplicates from page cloning. Identify 4 importance tiers: critical procedures (0.95), general documentation (0.80), archived content (0.60), draft pages (0.45). Tag strategy defined:
tier,space,last_modified. - Dry run (10 minutes): Run pipeline on 500 documents. Verify chunking produces expected 1.5–3 chunks per page. Confirm estimated completion: 150,000 chunks ÷ (20 workers × 18 chunks/s) = ~6.9 hours.
- Ingestion run (7.2 hours): 149,847 chunks ingested, 153 permanent failures (0.1%). Checkpoint file saved every 500 records — network blip at record 97,000 recovered automatically from checkpoint. Failed records written to
failed_chunks_2024-08-01.jsonlfor manual review. - Post-ingestion dedup (45 minutes): Semantic deduplication finds 12,400 near-duplicate chunks from the cloned Confluence pages. After dedup, effective corpus: 137,447 unique chunks.
- Verification (15 minutes): 20 sample recall queries. All return relevant results. Average recall latency: 14ms. Agent achieves 94% answer accuracy on support questions vs. 61% baseline (no memory).
Before / After Memory State
// Day 1: agent deployed
// Memory store: 0 records
// User: "How do I configure
// SSO for our org?"
// Agent: "I don't have specific
// information about your
// organization's SSO setup.
// Please check your docs."
// Zero-value response.
// User opens a support ticket.
// Agent provides no ROI.
// Day 1: agent deployed with
// 137,447 ingested memories
// User: "How do I configure
// SSO for our org?"
// Recall: 8 relevant chunks
// from SSO_Configuration.md
// + 3 from SAML_Setup_Guide.md
// Agent: "Here are the exact
// steps for your Okta setup..."
// 94% answer accuracy.
// Support tickets -67%.
SDK Method Reference
| Method | SDK | Purpose in this pattern |
|---|---|---|
store_memory(agent_id, content, importance, memory_type, tags) | Python | Ingest a single chunk with metadata |
batch_forget(request) | Python | Rollback an ingestion batch by batch tag |
recall(agent_id, query, top_k) | Python | Post-ingestion verification sampling |
storeMemory(agentId, {content, importance, memoryType, tags}) | TypeScript | Async parallel chunk ingestion |
batchForget(request) | TypeScript | Batch rollback by ingestion batch tag |
client.store_memory("agent", StoreMemoryRequest{...}).await? | Rust | Async chunk store with JoinSet concurrency |
client.StoreMemory(ctx, "agent", StoreMemoryRequest{...}) | Go | Goroutine-parallel chunk ingestion |
Edge Cases and Gotchas
- Silent deduplication failures: If your source data has near-duplicate documents (Confluence page clones, CRM record copies), they'll be ingested separately, consuming twice the memory and degrading recall quality. Always run a pre-ingestion dedup step on your source data, or a post-ingestion semantic dedup pass using Dakera's Semantic Deduplication pattern.
- Chunk boundary semantic loss: A sentence split across two chunks means neither chunk has full context for that sentence. Sentence-aware chunking (breaking only at sentence boundaries) with 100-character overlap minimizes this. For especially context-sensitive content (legal documents, code), consider larger overlap (200–300 characters).
- Importance score uniformity: Assigning
importance=0.80to all 150,000 chunks means everything is equally important at recall time. Differentiate by content quality: procedure documents get 0.90, general documentation 0.80, archived content 0.60. This dramatically improves recall precision for high-value queries. - Ingestion during live traffic: Running a large ingestion while the agent is serving requests can cause a temporary recall accuracy dip — the index is partially updated. For production migrations, prefer maintenance windows or write to a separate namespace, verify quality, then switch the active namespace.
- No rollback for partial ingestion: If you ingest 80% of your corpus and discover a data quality issue, you can't easily "undo" 120,000 stored memories. Design rollback into your pipeline from the start: tag every ingested chunk with a unique
batch_id, so you can callbatch_forget()with that batch tag to roll back the entire run cleanly.
On a standard self-hosted Dakera instance (cpx32, 8 vCPU, 32GB RAM): 20 concurrent workers sustains ~18 req/s sustained with <1% error rate. 50 workers can temporarily burst to 35 req/s but generates 503 errors when the rate limiter triggers. For cloud-hosted Dakera, check your plan's rate limit documentation — limits may be higher. Always test with your specific instance before planning ingestion timelines.
Performance Considerations
Advanced Configuration: Adaptive rate limiting and dynamic chunking
Implement adaptive rate limiting that backs off when 429 responses are detected, and dynamic chunking that adjusts chunk size based on content type:
class AdaptiveRateLimiter:
def __init__(self, initial_rps: float = 20.0):
self.current_rps = initial_rps
self.min_rps = 5.0
self.max_rps = 30.0
self.interval = 1.0 / initial_rps
self.last_call = 0.0
self.consecutive_errors = 0
def on_success(self):
self.consecutive_errors = 0
# Slowly ramp up if we've been error-free for a while
if self.current_rps < self.max_rps:
self.current_rps = min(self.current_rps * 1.05, self.max_rps)
self.interval = 1.0 / self.current_rps
def on_rate_limit_error(self):
self.consecutive_errors += 1
# Aggressive backoff on rate limit
self.current_rps = max(self.current_rps * 0.7, self.min_rps)
self.interval = 1.0 / self.current_rps
CHUNK_SIZES = {
"code": 400, # Code chunks: smaller for precise recall
"procedure": 600, # Step-by-step docs: medium
"general": 800, # General text: standard
"reference": 1000, # Reference docs: larger for coherence
}
def get_chunk_size(category: str) -> int:
return CHUNK_SIZES.get(category, 800)
Migrate your knowledge base to Dakera in hours
The batch ingestion pattern handles 50k+ documents with rate limiting, retry logic, and checkpointing built in. Your agent goes from empty to expert on day one.
Start Building Free →