GraphRAG SDK v2 -- Configuration Reference¶
This document is the comprehensive configuration reference for GraphRAG SDK v2. Each section covers a configurable component, its parameters, defaults, and usage examples.
1. ConnectionConfig¶
ConnectionConfig is a dataclass that defines how the SDK connects to a FalkorDB instance. It is passed to GraphRAG or used to create a FalkorDBConnection directly.
Fields¶
| Field | Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost" |
FalkorDB server hostname or IP address. |
port |
int |
6379 |
FalkorDB server port. |
username |
str \| None |
None |
Authentication username (omit for unauthenticated connections). |
password |
str \| None |
None |
Authentication password. |
graph_name |
str |
"knowledge_graph" |
Name of the FalkorDB graph to operate on. |
max_connections |
int |
16 |
Maximum number of connections in the Redis BlockingConnectionPool. |
retry_count |
int |
3 |
Number of retry attempts for transient query failures. |
retry_delay |
float |
1.0 |
Base delay (seconds) between retries (multiplied by attempt number). |
pool_timeout |
float |
30.0 |
Timeout (seconds) waiting to acquire a connection from the pool. |
query_timeout_ms |
int \| None |
10_000 |
Per-query timeout in milliseconds forwarded to FalkorDB. Set to None to disable. |
Creating from a URL¶
ConnectionConfig.from_url() parses a redis:// URL and returns a ConnectionConfig:
config = ConnectionConfig.from_url(
"redis://user:pass@my-falkordb.example.com:6380",
graph_name="my_graph",
query_timeout_ms=15_000,
)
The URL format is redis://[user:pass@]host[:port][/db]. Any keyword argument overrides the value parsed from the URL.
Passing to GraphRAG¶
You can pass either a ConnectionConfig or a pre-built FalkorDBConnection:
from graphrag_sdk.core.connection import ConnectionConfig, FalkorDBConnection
from graphrag_sdk.api.main import GraphRAG
# Option A: pass a config (connection created internally)
rag = GraphRAG(
connection=ConnectionConfig(host="localhost", port=6379, graph_name="novels"),
llm=my_llm,
embedder=my_embedder,
)
# Option B: pass a FalkorDBConnection directly (full control)
conn = FalkorDBConnection(ConnectionConfig(host="10.0.0.5", password="secret"))
rag = GraphRAG(connection=conn, llm=my_llm, embedder=my_embedder)
Retry Behavior¶
Queries are retried up to retry_count times with linear backoff (retry_delay * attempt_number). Non-transient errors -- those containing "already indexed", "already exists", or "unknown index" -- are raised immediately without retrying.
2. LLM Providers¶
The SDK defines an abstract LLMInterface base class. All LLM providers must implement invoke() for synchronous calls. Async calls (ainvoke) default to running invoke in a thread pool but can be overridden for true async support.
Common Parameters¶
The LLMInterface base class accepts:
| Parameter | Type | Default | Description |
|---|---|---|---|
model_name |
str |
-- | Model identifier (e.g. "gpt-4.1"). |
model_params |
dict[str, Any] \| None |
{} |
Provider-specific parameters. |
max_concurrency |
int |
12 |
Concurrency limit for abatch_invoke(). |
LiteLLM (Recommended)¶
LiteLLM supports 100+ LLM providers through a unified interface. Install with pip install graphrag-sdk[litellm].
from graphrag_sdk.core.providers import LiteLLM
# OpenAI
llm = LiteLLM(model="gpt-4.1", api_key="sk-...")
# Azure OpenAI
llm = LiteLLM(
model="azure/gpt-4.1",
api_key="your-azure-key",
api_base="https://your-resource.openai.azure.com/",
api_version="2024-12-01-preview",
temperature=0.0,
max_tokens=4096,
)
# Anthropic
llm = LiteLLM(model="anthropic/claude-sonnet-4-20250514", api_key="sk-ant-...")
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
model |
str |
-- | Model identifier in LiteLLM format. |
api_key |
str \| None |
None |
API key (or set via environment variable). |
api_base |
str \| None |
None |
API base URL (required for Azure). |
api_version |
str \| None |
None |
API version string (required for Azure). |
temperature |
float |
0.0 |
Sampling temperature. |
max_tokens |
int \| None |
None |
Maximum tokens in response. |
OpenRouter¶
OpenRouter provides access to many models through a single API. Install with pip install graphrag-sdk[openrouter].
from graphrag_sdk.core.providers import OpenRouterLLM
llm = OpenRouterLLM(
model="anthropic/claude-sonnet-4-20250514",
api_key="sk-or-...",
temperature=0.0,
max_tokens=4096,
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
model |
str |
-- | Model identifier in OpenRouter format. |
api_key |
str \| None |
os.environ["OPENROUTER_API_KEY"] |
OpenRouter API key. |
temperature |
float |
0.0 |
Sampling temperature. |
max_tokens |
int \| None |
None |
Maximum tokens in response. |
extra_headers |
dict[str, str] \| None |
None |
Additional HTTP headers. |
Azure OpenAI via Environment Variables¶
When using LiteLLM with Azure, the following environment variables are recognized:
export AZURE_OPENAI_API_KEY="your-key"
export AZURE_OPENAI_ENDPOINT="https://your-resource.openai.azure.com/"
export AZURE_OPENAI_API_VERSION="2024-12-01-preview"
Then configure the LLM:
import os
from graphrag_sdk.core.providers import LiteLLM
llm = LiteLLM(
model="azure/gpt-4.1",
api_key=os.environ["AZURE_OPENAI_API_KEY"],
api_base=os.environ["AZURE_OPENAI_ENDPOINT"],
api_version=os.environ["AZURE_OPENAI_API_VERSION"],
)
Custom LLM Provider¶
Implement the LLMInterface abstract class:
from graphrag_sdk.core.providers import LLMInterface
from graphrag_sdk.core.models import LLMResponse
class MyLLM(LLMInterface):
def __init__(self, model_name: str, **kwargs):
super().__init__(model_name=model_name)
def invoke(self, prompt: str, **kwargs) -> LLMResponse:
# Call your LLM here
text = my_custom_api(prompt)
return LLMResponse(content=text)
3. Embedder Providers¶
The SDK defines an abstract Embedder base class with embed_query() (single text) and embed_documents() (batch). Batch embedding is critical for performance.
Performance Note: Batch Embedding¶
Individual embedding calls to Azure OpenAI take approximately 0.22 seconds each. A batch of 500 texts takes approximately 8 seconds. Always use batch embedding (embed_documents / aembed_documents) rather than looping over embed_query.
LiteLLMEmbedder¶
Supports OpenAI, Azure, Cohere, and other embedding models via LiteLLM.
from graphrag_sdk.core.providers import LiteLLMEmbedder
# Azure OpenAI
embedder = LiteLLMEmbedder(
model="azure/text-embedding-ada-002",
api_key="your-key",
api_base="https://your-resource.openai.azure.com/",
api_version="2024-12-01-preview",
batch_size=500,
)
# OpenAI
embedder = LiteLLMEmbedder(model="text-embedding-ada-002", api_key="sk-...")
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
model |
str |
-- | Model identifier in LiteLLM format. |
api_key |
str \| None |
None |
API key. |
api_base |
str \| None |
None |
API base URL. |
api_version |
str \| None |
None |
API version string. |
batch_size |
int |
2048 |
Maximum texts per batch call. Azure users should set to 500. |
For Azure OpenAI, set batch_size=500 to stay within the API rate limits. The default of 2048 works well for OpenAI's direct API.
OpenRouterEmbedder¶
from graphrag_sdk.core.providers import OpenRouterEmbedder
embedder = OpenRouterEmbedder(
model="openai/text-embedding-ada-002",
api_key="sk-or-...",
batch_size=2048,
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
model |
str |
-- | Model identifier. |
api_key |
str \| None |
os.environ["OPENROUTER_API_KEY"] |
API key. |
batch_size |
int |
2048 |
Maximum texts per batch. |
extra_headers |
dict[str, str] \| None |
None |
Additional HTTP headers. |
Custom Embedder¶
Implement the Embedder abstract class:
from graphrag_sdk.core.providers import Embedder
class MyEmbedder(Embedder):
def embed_query(self, text: str, **kwargs) -> list[float]:
return my_embedding_api(text)
def embed_documents(self, texts: list[str], **kwargs) -> list[list[float]]:
# Implement batch embedding for performance
return my_batch_embedding_api(texts)
Override aembed_query and aembed_documents if your provider supports true async. The defaults run the sync methods in a thread pool via asyncio.to_thread.
Binary-Split Error Recovery¶
Both LiteLLMEmbedder and OpenRouterEmbedder implement binary-split error recovery for batch embedding. If a batch fails with a transient error, the batch is split in half and each half is retried recursively. Non-transient errors (401, 403, authentication failures) are raised immediately.
4. GraphSchema¶
GraphSchema defines the structure of your knowledge graph. It constrains LLM extraction and powers the pruning step that filters non-conforming data.
Components¶
EntityType -- defines a node type:
| Field | Type | Default | Description |
|---|---|---|---|
label |
str |
-- | The node label (e.g. "Person"). |
description |
str \| None |
None |
Human-readable description. |
properties |
list[PropertyType] |
[] |
Expected properties on this node type. |
RelationType -- defines a relationship type:
| Field | Type | Default | Description |
|---|---|---|---|
label |
str |
-- | The relationship type (e.g. "KNOWS"). |
description |
str \| None |
None |
Human-readable description. |
properties |
list[PropertyType] |
[] |
Expected properties on this relationship. |
PropertyType -- defines a property on a node or relationship:
| Field | Type | Default | Description |
|---|---|---|---|
name |
str |
-- | Property name. |
type |
str |
"STRING" |
Type hint: STRING, INTEGER, FLOAT, BOOLEAN, DATE, LIST. |
description |
str \| None |
None |
Human-readable description. |
required |
bool |
False |
Whether the property is required. |
Example Schema Definition¶
from graphrag_sdk.core.models import (
EntityType, RelationType, PropertyType, GraphSchema,
)
schema = GraphSchema(
entities=[
EntityType(
label="Person",
description="A character or real person",
properties=[
PropertyType(name="name", type="STRING", required=True),
PropertyType(name="age", type="INTEGER"),
PropertyType(name="occupation", type="STRING"),
],
),
EntityType(
label="Location",
description="A geographical place or setting",
properties=[
PropertyType(name="name", type="STRING", required=True),
PropertyType(name="country", type="STRING"),
],
),
EntityType(
label="Organization",
description="A company, institution, or group",
),
],
relations=[
RelationType(
label="LIVES_IN",
description="Person resides at location",
patterns=[("Person", "Location")],
),
RelationType(
label="WORKS_FOR",
description="Person is employed by organization",
patterns=[("Person", "Organization")],
),
RelationType(
label="LOCATED_IN",
description="Organization is located at a place",
patterns=[("Organization", "Location")],
),
RelationType(
label="KNOWS",
description="Two people know each other",
patterns=[("Person", "Person")],
),
],
)
Each RelationType.patterns entry is a (source_label, target_label) tuple.
An empty patterns list means the relation is allowed between any entity types.
Open Schema Mode¶
If no entity types or relation types are defined (empty GraphSchema()), the extraction operates in open-schema mode and the pruning step is skipped. This lets the LLM extract any entities and relationships it finds.
5. Pipeline Tuning¶
Chunking Parameters¶
FixedSizeChunking splits text into fixed-size character windows with overlap.
| Parameter | Type | Default | Benchmark Value | Description |
|---|---|---|---|---|
chunk_size |
int |
1000 |
1500 |
Maximum characters per chunk. |
chunk_overlap |
int |
100 |
200 |
Overlapping characters between consecutive chunks. |
from graphrag_sdk.ingestion.chunking_strategies.fixed_size import FixedSizeChunking
chunker = FixedSizeChunking(chunk_size=1500, chunk_overlap=200)
result = await rag.ingest("document.txt", chunker=chunker)
Larger chunks provide more context per extraction call but increase LLM token usage. The benchmark-optimized values (1500/200) balance extraction quality against cost.
Extraction Strategy Parameters¶
GraphExtraction -- composable 2-step extraction (GLiNER NER + LLM relationship extraction):
| Parameter | Type | Default | Description |
|---|---|---|---|
llm |
LLMInterface |
required | LLM provider for step 2 (verify + relationship extraction). |
entity_extractor |
EntityExtractor \| None |
None (GLiNERExtractor()) |
Pluggable NER backend for step 1. |
coref_resolver |
CorefResolver \| None |
None |
Optional coreference resolution (e.g. FastCorefResolver()). |
entity_types |
list[str] \| None |
None (11 default types) |
Custom entity types. Overridden by schema.entities if set. |
max_concurrency |
int \| None |
None (uses LLM default) |
Maximum parallel LLM calls during step 2. |
Built-in entity extractors:
| Class | Description | Parameters |
|---|---|---|
GLiNERExtractor |
Local GLiNER model (default) | threshold=0.75, model_name="urchade/gliner_medium-v2.1" |
LLMExtractor |
LLM-based NER via prompt | llm (required), threshold=0.75 |
from graphrag_sdk import GraphExtraction, GLiNERExtractor, LLMExtractor
# Default: GLiNER for entity NER, LLM for relationship extraction
extractor = GraphExtraction(llm=my_llm)
# Use LLM for step 1 instead of GLiNER
extractor = GraphExtraction(
llm=my_llm,
entity_extractor=LLMExtractor(my_llm),
)
# GLiNER with lower confidence threshold
extractor = GraphExtraction(
llm=my_llm,
entity_extractor=GLiNERExtractor(threshold=0.6),
)
result = await rag.ingest("document.txt", extractor=extractor)
Custom Entity Types¶
Override the default 11 entity types with your own domain-specific ontology:
# Pass entity_types to GraphExtraction
extractor = GraphExtraction(
llm=my_llm,
entity_types=["Gene", "Protein", "Disease", "Drug", "Pathway"],
)
# Or define them in the schema (takes priority)
from graphrag_sdk import GraphSchema, EntityType
schema = GraphSchema(entities=[
EntityType(label="Gene", description="A gene or genetic locus"),
EntityType(label="Protein", description="A protein or enzyme"),
EntityType(label="Disease", description="A disease or condition"),
])
rag = GraphRAG(connection=conn, llm=llm, embedder=embedder, schema=schema)
Priority: schema.entities > entity_types param > defaults (Person, Organization, Technology, Product, Location, Date, Event, Concept, Law, Dataset, Method).
LLM Concurrency¶
The LLMInterface.max_concurrency parameter (default: 12) controls how many LLM calls run in parallel during abatch_invoke(). Set it lower to avoid rate limits:
llm = LiteLLM(model="azure/gpt-4.1", api_key="...")
llm.max_concurrency = 8 # limit to 8 parallel calls
For GraphExtraction, you can also pass max_concurrency directly:
6. Retrieval Tuning¶
MultiPathRetrieval¶
MultiPathRetrieval is the default retrieval strategy. It combines multiple search paths with cosine reranking.
| Parameter | Type | Default | Description |
|---|---|---|---|
chunk_top_k |
int |
15 |
Final chunks kept after cosine reranking. |
max_entities |
int |
30 |
Maximum entities to include in context. |
max_relationships |
int |
20 |
Maximum relationships in context (after 1-hop + 2-hop expansion). |
rel_top_k |
int |
15 |
RELATES edge vector search results to retrieve. |
keyword_limit |
int |
10 |
Maximum keywords extracted from the question. |
from graphrag_sdk.retrieval.strategies.multi_path import MultiPathRetrieval
retriever = MultiPathRetrieval(
graph_store=rag.graph_store,
vector_store=rag.vector_store,
embedder=rag.embedder,
llm=rag.llm,
chunk_top_k=20, # more passages for complex questions
max_entities=40, # wider entity coverage
max_relationships=30, # more graph context
rel_top_k=20, # more RELATES edge hits
keyword_limit=12, # extract more keywords
)
result = await rag.completion("What happened?", strategy=retriever)
Retrieval Pipeline (9 Steps)¶
The retrieval pipeline proceeds as follows:
- Keyword extraction -- stopword filtering + LLM proper-noun extraction.
- Embed question -- single embedding API call for the query.
- RELATES edge vector search -- finds fact strings and entity entry points via edge embeddings.
- Entity discovery (2 paths) -- Cypher
CONTAINSon entity names + fulltext search on the__Entity__index. Merged with entities from step 3. - Relationship expansion -- 1-hop (top 15 entities, limit 150) + 2-hop (top 5 entities, limit 25) traversal of RELATES edges.
- Chunk retrieval (4 paths) -- fulltext search, vector search, MENTIONED_IN traversal, and 2-hop entity-to-neighbor-to-chunk traversal.
- Source document names -- batch-fetch document paths via PART_OF edges.
- Cosine reranking -- batch-embed candidate chunks and sort by cosine similarity to the query vector.
- Context assembly -- structured sections: hint, entities, relationships, facts, passages.
Overriding the Default Strategy¶
Pass a custom strategy to individual queries or set it as the default:
# Per-query override
result = await rag.completion("...", strategy=my_custom_retriever)
# Default at init time
rag = GraphRAG(
connection=config,
llm=my_llm,
embedder=my_embedder,
retrieval_strategy=my_custom_retriever,
)
7. Post-Ingestion¶
After all documents have been ingested, run post-ingestion steps to deduplicate entities, backfill embeddings, and ensure all indexes exist.
finalize() -- All-In-One¶
The recommended approach is to call finalize() after all ingestion is complete. It bundles four steps in order:
deduplicate_entities()-- global exact-name deduplication.backfill_entity_embeddings()-- embed entity names for vector search.embed_relationships()-- embed fact text on RELATES edges.ensure_indices()-- create all 5 standard indexes (idempotent).
# After ingesting all documents:
stats = await rag.finalize()
print(stats)
# {
# "entities_deduplicated": 142,
# "entities_embedded": 3200,
# "relationships_embedded": 8500,
# "indexes": {
# "vector_Chunk": True,
# "vector___Entity__": True,
# "vector_RELATES": True,
# "fulltext_Chunk": True,
# "fulltext___Entity__": True,
# },
# }
A synchronous convenience method is also available:
deduplicate_entities() -- Entity Deduplication¶
Call this when you need fine-grained control over deduplication.
merged_count = await rag.deduplicate_entities(
fuzzy=False, # True to also run embedding-based dedup
similarity_threshold=0.9, # cosine threshold for fuzzy matching
batch_size=500, # entities per query batch
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
fuzzy |
bool |
False |
If True, runs a second fuzzy dedup phase using embedding similarity. |
similarity_threshold |
float |
0.9 |
Cosine similarity threshold for fuzzy matching. |
batch_size |
int |
500 |
Entities per query batch. |
Phase 1 (always runs): Exact name match. Groups entities by normalized name (lowercase, stripped) and label to prevent cross-type merging. Keeps the entity with the longest description as the survivor. Remaps all RELATES and MENTIONED_IN edges from duplicates to the survivor, then deletes the duplicate nodes.
Phase 2 (optional, fuzzy=True): Embedding-based match. Re-fetches all surviving entities, batch-embeds their names, computes pairwise cosine similarity in memory-efficient blocks (1000 entities per block), and merges near-duplicates above the threshold.
backfill_entity_embeddings() -- Entity Vector Backfill¶
Embeds __Entity__ nodes that are missing embeddings. Queries entities where embedding IS NULL, batch-embeds the entity name, and stores vectors. Safe for incremental runs.
embed_relationships() -- RELATES Edge Embeddings¶
Batch-embeds all RELATES edges that have a fact property but are missing embeddings. These edge embeddings power the RELATES vector search path in retrieval.
ensure_indices() -- Index Creation¶
Creates all standard indexes (idempotent -- safe to call repeatedly):
| Index Type | Label/Type | Property |
|---|---|---|
| Vector | Chunk |
embedding |
| Vector | __Entity__ |
embedding |
| Vector | RELATES (edge) |
embedding |
| Fulltext | Chunk |
text |
| Fulltext | __Entity__ |
name, description |
Note: ensure_indices() is called automatically after each ingest() call. The finalize() method resets the internal _indices_ensured flag and re-runs it to catch any newly needed indexes.
When to Call Each¶
| Scenario | What to Call |
|---|---|
| After ingesting all documents | await rag.finalize() |
| After incremental ingestion (new documents) | await rag.finalize() |
| Only need dedup (embeddings already exist) | await rag.deduplicate_entities() |
| Only need entity embeddings | await rag.vector_store.backfill_entity_embeddings() |
| Only need RELATES edge embeddings | await rag.vector_store.embed_relationships() |
| Only need indexes | await rag.vector_store.ensure_indices() |
Do not call backfill_entity_embeddings() inside an ingestion loop (i.e., after each document). It re-scans all entities and is slow when called repeatedly. Instead, ingest all documents first, then call finalize() once.