API Reference¶
Complete reference for all public classes and methods exported by graphrag_sdk.
Table of Contents¶
- GraphRAG (Facade)
- Connection
- Providers
- Data Models
- Schema
- Ingestion Strategies
- Ingestion Pipeline
- Retrieval Strategies
- Reranking Strategies
- Storage
- Context
- Exceptions
GraphRAG (Facade)¶
The main entry point. Three primary operations: ingest(), retrieve(), and completion().
Constructor¶
GraphRAG(
connection: FalkorDBConnection | ConnectionConfig,
llm: LLMInterface,
embedder: Embedder,
schema: GraphSchema | None = None,
retrieval_strategy: RetrievalStrategy | None = None,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
connection |
FalkorDBConnection \| ConnectionConfig |
required | Database connection or config to create one |
llm |
LLMInterface |
required | LLM provider |
embedder |
Embedder |
required | Embedding provider |
schema |
GraphSchema \| None |
None |
Schema constraints for extraction (empty = unconstrained) |
retrieval_strategy |
RetrievalStrategy \| None |
None |
Default retrieval strategy (uses MultiPathRetrieval if None) |
Public attributes: llm, embedder, schema, graph_store, vector_store
ingest()¶
async def ingest(
source: str | list[str],
*,
text: str | None = None,
loader: LoaderStrategy | None = None,
chunker: ChunkingStrategy | None = None,
extractor: ExtractionStrategy | None = None,
resolver: ResolutionStrategy | None = None,
max_concurrent: int = 3,
ctx: Context | None = None,
) -> IngestionResult | list[IngestionResult]
Build a knowledge graph from one or more sources. Auto-detects loader from file extension. When a list of sources is provided, documents are ingested in parallel with bounded concurrency.
| Parameter | Type | Default | Description |
|---|---|---|---|
source |
str \| list[str] |
required | File path (or list of paths) for ingestion |
text |
str \| None |
None |
Raw text (single source only; skips loader) |
loader |
LoaderStrategy \| None |
None |
Custom loader (auto-detect if None) |
chunker |
ChunkingStrategy \| None |
None |
Custom chunker (FixedSizeChunking(1000) if None) |
extractor |
ExtractionStrategy \| None |
None |
Custom extractor (GraphExtraction if None) |
resolver |
ResolutionStrategy \| None |
None |
Custom resolver (ExactMatchResolution if None) |
max_concurrent |
int |
3 |
Max parallel ingestions (list input only) |
ctx |
Context \| None |
None |
Execution context |
Returns: IngestionResult for a single source, list[IngestionResult] for multiple sources.
retrieve()¶
async def retrieve(
question: str,
*,
strategy: RetrievalStrategy | None = None,
reranker: RerankingStrategy | None = None,
ctx: Context | None = None,
) -> RetrieverResult
Retrieve context from the knowledge graph without generating an answer. Use this to inspect retrieved context or pass it to your own LLM.
| Parameter | Type | Default | Description |
|---|---|---|---|
question |
str |
required | The user's question |
strategy |
RetrievalStrategy \| None |
None |
Override retrieval strategy |
reranker |
RerankingStrategy \| None |
None |
Optional reranking after retrieval |
ctx |
Context \| None |
None |
Execution context |
Returns: RetrieverResult
completion()¶
async def completion(
question: str,
*,
history: list[ChatMessage | dict[str, str]] | None = None,
strategy: RetrievalStrategy | None = None,
reranker: RerankingStrategy | None = None,
prompt_template: str | None = None,
return_context: bool = False,
ctx: Context | None = None,
) -> RagResult
Full RAG pipeline: retrieve context and generate an answer. When history is provided, messages are passed natively to the LLM provider's multi-turn chat API.
| Parameter | Type | Default | Description |
|---|---|---|---|
question |
str |
required | The user's question |
history |
list[ChatMessage \| dict] \| None |
None |
Conversation history (see below) |
strategy |
RetrievalStrategy \| None |
None |
Override retrieval strategy |
reranker |
RerankingStrategy \| None |
None |
Optional reranking after retrieval |
prompt_template |
str \| None |
None |
Custom prompt for single-turn (must contain {context} and {question}). Ignored when history is provided. |
return_context |
bool |
False |
Include retriever results in output |
ctx |
Context \| None |
None |
Execution context |
Returns: RagResult
Conversation history:
History accepts a list of ChatMessage objects or plain dicts with role and content keys. Supported roles: "system", "user", "assistant". Invalid roles raise ValueError.
from graphrag_sdk import ChatMessage
# Using ChatMessage objects
result = await rag.completion(
"What happened next?",
history=[
ChatMessage(role="user", content="Who is Alice?"),
ChatMessage(role="assistant", content="Alice is an engineer."),
],
)
# Using plain dicts
result = await rag.completion(
"Tell me more.",
history=[
{"role": "user", "content": "What is Acme?"},
{"role": "assistant", "content": "A tech company."},
],
)
When history is provided, completion() builds a native messages list: [system_prompt, *history, user_question] and calls LLMInterface.ainvoke_messages(). Without history, it uses the single-turn ainvoke() path.
query() (deprecated)¶
Deprecated. Use completion() for the full RAG pipeline or retrieve() for retrieval-only. Emits a DeprecationWarning and delegates to completion().
deduplicate_entities()¶
async def deduplicate_entities(
*,
fuzzy: bool = False,
similarity_threshold: float = 0.9,
batch_size: int = 500,
) -> int
Post-ingestion entity deduplication. Groups entities by (normalized name, label) to prevent cross-type merging (e.g. Person "Paris" and Location "Paris" stay separate).
- Phase 1 (always): Exact name match -- keeps longest description, remaps RELATES and MENTIONED_IN edges, deletes duplicates.
- Phase 2 (optional,
fuzzy=True): Embedding-based -- embeds entity names, finds near-duplicates by cosine similarity.
Call once after all documents are ingested.
Returns: Number of duplicate entities merged.
finalize()¶
Run all post-ingestion steps after all documents are ingested. Bundles:
1. deduplicate_entities() -- global exact-name dedup
2. backfill_entity_embeddings() -- name-only embeddings
3. embed_relationships() -- fact text embeddings on RELATES edges
4. ensure_indices() -- all indexes
Returns: Dict with counts: entities_deduplicated, entities_embedded, relationships_embedded, indexes.
Sync Wrappers¶
def retrieve_sync(question: str, **kwargs) -> RetrieverResult
def completion_sync(question: str, **kwargs) -> RagResult
def ingest_sync(source: str | list[str], **kwargs) -> IngestionResult | list[IngestionResult]
def finalize_sync() -> dict[str, Any]
def query_sync(question: str, **kwargs) -> RagResult # deprecated
Convenience methods that run the async versions in asyncio.run().
Connection¶
ConnectionConfig¶
@dataclass
class ConnectionConfig:
host: str = "localhost"
port: int = 6379
username: str | None = None
password: str | None = None
graph_name: str = "knowledge_graph"
max_connections: int = 16
retry_count: int = 3
retry_delay: float = 1.0
query_timeout_ms: int = 10_000
pool_timeout: int = 30
FalkorDBConnection¶
| Method | Description |
|---|---|
await conn.query(cypher, params=None, timeout=None) |
Execute a Cypher query |
await conn.close() |
Close the connection pool |
conn.graph |
Lazy property returning the AsyncGraph handle |
Providers¶
from graphrag_sdk import LLMInterface, Embedder, LLMBatchItem
from graphrag_sdk import LiteLLM, LiteLLMEmbedder, OpenRouterLLM, OpenRouterEmbedder
LLMInterface (ABC)¶
| Method | Signature | Description |
|---|---|---|
invoke |
(prompt: str, **kwargs) -> LLMResponse |
Sync text generation (abstract) |
ainvoke |
(prompt: str, *, max_retries=3, **kwargs) -> LLMResponse |
Async with retry + backoff |
ainvoke_messages |
(messages: list[ChatMessage], *, max_retries=3, **kwargs) -> LLMResponse |
Multi-turn native messages (see below) |
invoke_with_model |
(prompt: str, response_model: Type[BaseModel], **kwargs) -> BaseModel |
Structured output |
ainvoke_with_model |
(prompt: str, response_model: Type[BaseModel], *, max_retries=3) -> BaseModel |
Async structured output |
abatch_invoke |
(prompts: list[str], *, max_concurrency=None, max_retries=3) -> list[LLMBatchItem] |
Concurrent batch |
ainvoke_messages() is used by completion() when conversation history is provided. The default implementation concatenates messages into a single prompt string and calls ainvoke(), so custom providers work without changes. LiteLLM and OpenRouterLLM override this with native multi-turn implementations.
Embedder (ABC)¶
| Method | Signature | Description |
|---|---|---|
model_name |
@property -> str |
Embedding model identifier (abstract) |
embed_query |
(text: str, **kwargs) -> list[float] |
Single text embedding (abstract) |
aembed_query |
(text: str, **kwargs) -> list[float] |
Async single (default: thread pool) |
embed_documents |
(texts: list[str], **kwargs) -> list[list[float]] |
Batch (default: sequential) |
aembed_documents |
(texts: list[str], **kwargs) -> list[list[float]] |
Async batch (default: thread pool) |
LLMBatchItem¶
@dataclass
class LLMBatchItem:
index: int
response: LLMResponse | None = None
error: Exception | None = None
@property
def ok(self) -> bool # True if response is not None
LiteLLM¶
LiteLLM(model: str, *, api_key=None, api_base=None, api_version=None, temperature=0.0, max_tokens=None, **kwargs)
LiteLLMEmbedder¶
OpenRouterLLM¶
OpenRouterEmbedder¶
Data Models¶
All models extend DataModel (Pydantic BaseModel with extra="allow").
from graphrag_sdk import GraphNode, GraphRelationship, GraphData
from graphrag_sdk import TextChunk, TextChunks
from graphrag_sdk import DocumentInfo, DocumentOutput
from graphrag_sdk import IngestionResult, RagResult
from graphrag_sdk import RetrieverResult, RetrieverResultItem
from graphrag_sdk import ResolutionResult, SearchType
GraphNode¶
class GraphNode(DataModel):
id: str # Unique identifier
label: str # Node label (Person, Place, etc.)
properties: dict[str, Any] = {} # Key-value properties
embedding_properties: dict[str, list[float]] | None = None
GraphRelationship¶
class GraphRelationship(DataModel):
start_node_id: str
end_node_id: str
type: str # Edge type (RELATES for all extracted rels)
properties: dict[str, Any] = {} # Includes rel_type, fact, keywords, etc.
embedding_properties: dict[str, list[float]] | None = None
GraphData¶
TextChunk¶
class TextChunk(DataModel):
text: str
index: int
metadata: dict[str, Any] = {}
uid: str # Auto-generated UUID
TextChunks¶
DocumentInfo¶
class DocumentInfo(DataModel):
path: str | None = None
uid: str # Auto-generated UUID
metadata: dict[str, Any] = {}
DocumentOutput¶
IngestionResult¶
class IngestionResult(DataModel):
document_info: DocumentInfo = DocumentInfo()
nodes_created: int = 0
relationships_created: int = 0
chunks_indexed: int = 0
metadata: dict[str, Any] = {}
RagResult¶
class RagResult(DataModel):
answer: str
retriever_result: RetrieverResult | None = None # Populated when return_context=True
metadata: dict[str, Any] = {} # Contains model, num_context_items, strategy
RetrieverResult¶
class RetrieverResult(DataModel):
items: list[RetrieverResultItem] = []
metadata: dict[str, Any] = {}
RetrieverResultItem¶
class RetrieverResultItem(DataModel):
content: str
metadata: dict[str, Any] = {}
score: float | None = None
ResolutionResult¶
class ResolutionResult(DataModel):
nodes: list[GraphNode] = []
relationships: list[GraphRelationship] = []
merged_count: int = 0
ChatMessage¶
from graphrag_sdk import ChatMessage
class ChatMessage(DataModel):
role: Literal["system", "user", "assistant"]
content: str
def to_dict(self) -> dict[str, str] # {"role": ..., "content": ...}
Validated message type for multi-turn conversations. Used by completion(history=...) and LLMInterface.ainvoke_messages(). Invalid roles raise a validation error on construction.
LLMMessage is a backward-compatible alias for ChatMessage.
LLMResponse¶
SearchType¶
Extraction Models¶
class ExtractedEntity(DataModel):
name: str
type: str
description: str = ""
source_chunk_ids: list[str] = []
class ExtractedRelation(DataModel):
source: str
target: str
type: str
keywords: str = ""
description: str = ""
weight: float = 1.0
source_chunk_ids: list[str] = []
class EntityMention(DataModel):
chunk_id: str
entity_id: str
class ExtractionOutput(DataModel):
entities: list[ExtractedEntity] = []
relations: list[ExtractedRelation] = []
mentions: list[EntityMention] = []
compute_entity_id()¶
Deterministic entity ID from normalized name and optional type. When entity_type is provided, appends a __type suffix to prevent cross-type collisions (e.g. paris__person vs paris__location). Without entity_type, returns just the normalized name for backwards compatibility.
Schema¶
EntityType¶
class EntityType(DataModel):
label: str # e.g. "Person"
description: str | None = None # Helps LLM understand what to extract
properties: list[PropertyType] = [] # Optional property definitions
RelationType¶
class RelationType(DataModel):
label: str # e.g. "WORKS_AT"
description: str | None = None
patterns: list[tuple[str, str]] = [] # Allowed (source_label, target_label) pairs
PropertyType¶
class PropertyType(DataModel):
name: str
type: str = "STRING" # STRING, INTEGER, FLOAT, BOOLEAN, DATE, LIST
description: str | None = None
required: bool = False
GraphSchema¶
Ingestion Strategies¶
LoaderStrategy (ABC)¶
class LoaderStrategy(ABC):
@abstractmethod
async def load(self, source: str, ctx: Context) -> DocumentOutput: ...
Built-in: TextLoader(encoding="utf-8"), PdfLoader()
ChunkingStrategy (ABC)¶
class ChunkingStrategy(ABC):
@abstractmethod
async def chunk(self, text: str, ctx: Context) -> TextChunks: ...
Built-in: FixedSizeChunking(chunk_size=1000, chunk_overlap=100)
ExtractionStrategy (ABC)¶
class ExtractionStrategy(ABC):
@abstractmethod
async def extract(self, chunks: TextChunks, schema: GraphSchema, ctx: Context) -> GraphData: ...
Built-in:
- GraphExtraction(llm, *, entity_extractor=None, coref_resolver=None, entity_types=None, max_concurrency=None)
Entity Extractors (step 1 backends for GraphExtraction):
- GLiNERExtractor(threshold=0.75, model_name="urchade/gliner_medium-v2.1") -- default, local NER
- LLMExtractor(llm, threshold=0.75) -- LLM-based NER
- Subclass EntityExtractor for custom backends
ResolutionStrategy (ABC)¶
class ResolutionStrategy(ABC):
@abstractmethod
async def resolve(self, graph_data: GraphData, ctx: Context) -> ResolutionResult: ...
Built-in:
- ExactMatchResolution(resolve_property="id")
- DescriptionMergeResolution(llm=None, force_summary_threshold=3, max_summary_tokens=500)
Ingestion Pipeline¶
IngestionPipeline(
loader: LoaderStrategy,
chunker: ChunkingStrategy,
extractor: ExtractionStrategy,
resolver: ResolutionStrategy,
graph_store: GraphStore,
vector_store: VectorStore,
schema: GraphSchema | None = None,
embedder: Embedder | None = None,
)
| Method | Signature | Description |
|---|---|---|
run |
(source, ctx=None, *, text=None, document_info=None) -> IngestionResult |
Execute the full 9-step pipeline |
Retrieval Strategies¶
RetrievalStrategy (ABC)¶
Uses the Template Method pattern.
class RetrievalStrategy(ABC):
def __init__(self, graph_store=None, vector_store=None): ...
async def search(self, query, ctx=None, **kwargs) -> RetrieverResult: ...
@abstractmethod
async def _execute(self, query, ctx, **kwargs) -> RawSearchResult: ...
LocalRetrieval¶
MultiPathRetrieval¶
MultiPathRetrieval(
graph_store, vector_store, embedder, llm,
*,
chunk_top_k=15,
max_entities=30,
max_relationships=20,
rel_top_k=15,
keyword_limit=10,
)
Reranking Strategies¶
RerankingStrategy (ABC)¶
class RerankingStrategy(ABC):
@abstractmethod
async def rerank(self, query, result: RetrieverResult, ctx: Context) -> RetrieverResult: ...
CosineReranker¶
Storage¶
GraphStore¶
| Method | Signature | Description |
|---|---|---|
upsert_nodes |
(nodes: list[GraphNode]) -> int |
Batched MERGE, returns count |
upsert_relationships |
(rels: list[GraphRelationship]) -> int |
Batched MERGE with label hints |
get_connected_entities |
(chunk_id, max_hops=1) -> list[dict] |
N-hop entity traversal |
query_raw |
(cypher, params=None) -> Any |
Raw Cypher execution |
get_statistics |
() -> dict |
Node/edge counts, types, density |
delete_all |
() -> None |
Delete all data |
VectorStore¶
VectorStore(connection, embedder=None, index_name="chunk_embeddings", embedding_dimension=256, similarity_function="cosine")
| Method | Signature | Description |
|---|---|---|
create_vector_index |
(label="Chunk", property="embedding") -> None |
Create vector index |
create_entity_vector_index |
() -> None |
Create entity vector index |
create_fulltext_index |
(label="Chunk", *properties) -> None |
Create fulltext index |
ensure_indices |
() -> None |
Create all standard indices |
index_chunks |
(chunks: TextChunks) -> int |
Embed and store chunk vectors |
backfill_entity_embeddings |
() -> int |
Embed all entities missing vectors |
embed_relationships |
() -> int |
Embed fact text on RELATES edges |
search |
(query_vector, top_k=5, label="Chunk") -> list[dict] |
Vector similarity search |
search_entities |
(query_vector, top_k=5) -> list[dict] |
Entity vector search |
search_relationships |
(query_vector, top_k=15) -> list[dict] |
RELATES edge vector search |
fulltext_search |
(query, top_k=5, label="Chunk") -> list[dict] |
Fulltext keyword search |
Context¶
Execution context for logging and budget tracking.
| Method/Property | Description |
|---|---|
ctx.log(message, log_level=logging.INFO) |
Log a message |
ctx.budget_exceeded |
True if elapsed time > latency_budget_ms |
Exceptions¶
| Exception | When Raised |
|---|---|
GraphRAGError |
Base exception for all SDK errors |
LoaderError |
File loading failures |
ChunkingError |
Text splitting failures |
ExtractionError |
LLM extraction or JSON parsing failures |
ResolutionError |
Entity deduplication failures |
RetrieverError |
Retrieval execution failures |
DatabaseError |
FalkorDB connection or query failures |
IngestionError |
Pipeline orchestration failures |
SchemaValidationError |
Schema constraint violations |