Skip to content

API Reference

Complete reference for all public classes and methods exported by graphrag_sdk.

Table of Contents


GraphRAG (Facade)

The main entry point. Three primary operations: ingest(), retrieve(), and completion().

from graphrag_sdk import GraphRAG

Constructor

GraphRAG(
    connection: FalkorDBConnection | ConnectionConfig,
    llm: LLMInterface,
    embedder: Embedder,
    schema: GraphSchema | None = None,
    retrieval_strategy: RetrievalStrategy | None = None,
)
Parameter Type Default Description
connection FalkorDBConnection \| ConnectionConfig required Database connection or config to create one
llm LLMInterface required LLM provider
embedder Embedder required Embedding provider
schema GraphSchema \| None None Schema constraints for extraction (empty = unconstrained)
retrieval_strategy RetrievalStrategy \| None None Default retrieval strategy (uses MultiPathRetrieval if None)

Public attributes: llm, embedder, schema, graph_store, vector_store

ingest()

async def ingest(
    source: str | list[str],
    *,
    text: str | None = None,
    loader: LoaderStrategy | None = None,
    chunker: ChunkingStrategy | None = None,
    extractor: ExtractionStrategy | None = None,
    resolver: ResolutionStrategy | None = None,
    max_concurrent: int = 3,
    ctx: Context | None = None,
) -> IngestionResult | list[IngestionResult]

Build a knowledge graph from one or more sources. Auto-detects loader from file extension. When a list of sources is provided, documents are ingested in parallel with bounded concurrency.

Parameter Type Default Description
source str \| list[str] required File path (or list of paths) for ingestion
text str \| None None Raw text (single source only; skips loader)
loader LoaderStrategy \| None None Custom loader (auto-detect if None)
chunker ChunkingStrategy \| None None Custom chunker (FixedSizeChunking(1000) if None)
extractor ExtractionStrategy \| None None Custom extractor (GraphExtraction if None)
resolver ResolutionStrategy \| None None Custom resolver (ExactMatchResolution if None)
max_concurrent int 3 Max parallel ingestions (list input only)
ctx Context \| None None Execution context

Returns: IngestionResult for a single source, list[IngestionResult] for multiple sources.

retrieve()

async def retrieve(
    question: str,
    *,
    strategy: RetrievalStrategy | None = None,
    reranker: RerankingStrategy | None = None,
    ctx: Context | None = None,
) -> RetrieverResult

Retrieve context from the knowledge graph without generating an answer. Use this to inspect retrieved context or pass it to your own LLM.

Parameter Type Default Description
question str required The user's question
strategy RetrievalStrategy \| None None Override retrieval strategy
reranker RerankingStrategy \| None None Optional reranking after retrieval
ctx Context \| None None Execution context

Returns: RetrieverResult

completion()

async def completion(
    question: str,
    *,
    history: list[ChatMessage | dict[str, str]] | None = None,
    strategy: RetrievalStrategy | None = None,
    reranker: RerankingStrategy | None = None,
    prompt_template: str | None = None,
    return_context: bool = False,
    ctx: Context | None = None,
) -> RagResult

Full RAG pipeline: retrieve context and generate an answer. When history is provided, messages are passed natively to the LLM provider's multi-turn chat API.

Parameter Type Default Description
question str required The user's question
history list[ChatMessage \| dict] \| None None Conversation history (see below)
strategy RetrievalStrategy \| None None Override retrieval strategy
reranker RerankingStrategy \| None None Optional reranking after retrieval
prompt_template str \| None None Custom prompt for single-turn (must contain {context} and {question}). Ignored when history is provided.
return_context bool False Include retriever results in output
ctx Context \| None None Execution context

Returns: RagResult

Conversation history:

History accepts a list of ChatMessage objects or plain dicts with role and content keys. Supported roles: "system", "user", "assistant". Invalid roles raise ValueError.

from graphrag_sdk import ChatMessage

# Using ChatMessage objects
result = await rag.completion(
    "What happened next?",
    history=[
        ChatMessage(role="user", content="Who is Alice?"),
        ChatMessage(role="assistant", content="Alice is an engineer."),
    ],
)

# Using plain dicts
result = await rag.completion(
    "Tell me more.",
    history=[
        {"role": "user", "content": "What is Acme?"},
        {"role": "assistant", "content": "A tech company."},
    ],
)

When history is provided, completion() builds a native messages list: [system_prompt, *history, user_question] and calls LLMInterface.ainvoke_messages(). Without history, it uses the single-turn ainvoke() path.

query() (deprecated)

async def query(question: str, **kwargs) -> RagResult

Deprecated. Use completion() for the full RAG pipeline or retrieve() for retrieval-only. Emits a DeprecationWarning and delegates to completion().

deduplicate_entities()

async def deduplicate_entities(
    *,
    fuzzy: bool = False,
    similarity_threshold: float = 0.9,
    batch_size: int = 500,
) -> int

Post-ingestion entity deduplication. Groups entities by (normalized name, label) to prevent cross-type merging (e.g. Person "Paris" and Location "Paris" stay separate).

  • Phase 1 (always): Exact name match -- keeps longest description, remaps RELATES and MENTIONED_IN edges, deletes duplicates.
  • Phase 2 (optional, fuzzy=True): Embedding-based -- embeds entity names, finds near-duplicates by cosine similarity.

Call once after all documents are ingested.

Returns: Number of duplicate entities merged.

finalize()

async def finalize() -> dict[str, Any]

Run all post-ingestion steps after all documents are ingested. Bundles: 1. deduplicate_entities() -- global exact-name dedup 2. backfill_entity_embeddings() -- name-only embeddings 3. embed_relationships() -- fact text embeddings on RELATES edges 4. ensure_indices() -- all indexes

Returns: Dict with counts: entities_deduplicated, entities_embedded, relationships_embedded, indexes.

Sync Wrappers

def retrieve_sync(question: str, **kwargs) -> RetrieverResult
def completion_sync(question: str, **kwargs) -> RagResult
def ingest_sync(source: str | list[str], **kwargs) -> IngestionResult | list[IngestionResult]
def finalize_sync() -> dict[str, Any]
def query_sync(question: str, **kwargs) -> RagResult  # deprecated

Convenience methods that run the async versions in asyncio.run().


Connection

from graphrag_sdk import ConnectionConfig, FalkorDBConnection

ConnectionConfig

@dataclass
class ConnectionConfig:
    host: str = "localhost"
    port: int = 6379
    username: str | None = None
    password: str | None = None
    graph_name: str = "knowledge_graph"
    max_connections: int = 16
    retry_count: int = 3
    retry_delay: float = 1.0
    query_timeout_ms: int = 10_000
    pool_timeout: int = 30

FalkorDBConnection

FalkorDBConnection(config: ConnectionConfig | None = None)
Method Description
await conn.query(cypher, params=None, timeout=None) Execute a Cypher query
await conn.close() Close the connection pool
conn.graph Lazy property returning the AsyncGraph handle

Providers

from graphrag_sdk import LLMInterface, Embedder, LLMBatchItem
from graphrag_sdk import LiteLLM, LiteLLMEmbedder, OpenRouterLLM, OpenRouterEmbedder

LLMInterface (ABC)

LLMInterface(model_name: str, model_params: dict | None = None, max_concurrency: int = 12)
Method Signature Description
invoke (prompt: str, **kwargs) -> LLMResponse Sync text generation (abstract)
ainvoke (prompt: str, *, max_retries=3, **kwargs) -> LLMResponse Async with retry + backoff
ainvoke_messages (messages: list[ChatMessage], *, max_retries=3, **kwargs) -> LLMResponse Multi-turn native messages (see below)
invoke_with_model (prompt: str, response_model: Type[BaseModel], **kwargs) -> BaseModel Structured output
ainvoke_with_model (prompt: str, response_model: Type[BaseModel], *, max_retries=3) -> BaseModel Async structured output
abatch_invoke (prompts: list[str], *, max_concurrency=None, max_retries=3) -> list[LLMBatchItem] Concurrent batch

ainvoke_messages() is used by completion() when conversation history is provided. The default implementation concatenates messages into a single prompt string and calls ainvoke(), so custom providers work without changes. LiteLLM and OpenRouterLLM override this with native multi-turn implementations.

Embedder (ABC)

Method Signature Description
model_name @property -> str Embedding model identifier (abstract)
embed_query (text: str, **kwargs) -> list[float] Single text embedding (abstract)
aembed_query (text: str, **kwargs) -> list[float] Async single (default: thread pool)
embed_documents (texts: list[str], **kwargs) -> list[list[float]] Batch (default: sequential)
aembed_documents (texts: list[str], **kwargs) -> list[list[float]] Async batch (default: thread pool)

LLMBatchItem

@dataclass
class LLMBatchItem:
    index: int
    response: LLMResponse | None = None
    error: Exception | None = None

    @property
    def ok(self) -> bool  # True if response is not None

LiteLLM

LiteLLM(model: str, *, api_key=None, api_base=None, api_version=None, temperature=0.0, max_tokens=None, **kwargs)

LiteLLMEmbedder

LiteLLMEmbedder(model: str, *, api_key=None, api_base=None, api_version=None, **kwargs)

OpenRouterLLM

OpenRouterLLM(model: str, *, api_key=None, temperature=0.0, max_tokens=None, extra_headers=None)

OpenRouterEmbedder

OpenRouterEmbedder(model: str, *, api_key=None, extra_headers=None)

Data Models

All models extend DataModel (Pydantic BaseModel with extra="allow").

from graphrag_sdk import GraphNode, GraphRelationship, GraphData
from graphrag_sdk import TextChunk, TextChunks
from graphrag_sdk import DocumentInfo, DocumentOutput
from graphrag_sdk import IngestionResult, RagResult
from graphrag_sdk import RetrieverResult, RetrieverResultItem
from graphrag_sdk import ResolutionResult, SearchType

GraphNode

class GraphNode(DataModel):
    id: str                                              # Unique identifier
    label: str                                           # Node label (Person, Place, etc.)
    properties: dict[str, Any] = {}                      # Key-value properties
    embedding_properties: dict[str, list[float]] | None = None

GraphRelationship

class GraphRelationship(DataModel):
    start_node_id: str
    end_node_id: str
    type: str                                            # Edge type (RELATES for all extracted rels)
    properties: dict[str, Any] = {}                      # Includes rel_type, fact, keywords, etc.
    embedding_properties: dict[str, list[float]] | None = None

GraphData

class GraphData(DataModel):
    nodes: list[GraphNode] = []
    relationships: list[GraphRelationship] = []

TextChunk

class TextChunk(DataModel):
    text: str
    index: int
    metadata: dict[str, Any] = {}
    uid: str                                             # Auto-generated UUID

TextChunks

class TextChunks(DataModel):
    chunks: list[TextChunk] = []

DocumentInfo

class DocumentInfo(DataModel):
    path: str | None = None
    uid: str                                             # Auto-generated UUID
    metadata: dict[str, Any] = {}

DocumentOutput

class DocumentOutput(DataModel):
    text: str
    document_info: DocumentInfo = DocumentInfo()

IngestionResult

class IngestionResult(DataModel):
    document_info: DocumentInfo = DocumentInfo()
    nodes_created: int = 0
    relationships_created: int = 0
    chunks_indexed: int = 0
    metadata: dict[str, Any] = {}

RagResult

class RagResult(DataModel):
    answer: str
    retriever_result: RetrieverResult | None = None      # Populated when return_context=True
    metadata: dict[str, Any] = {}                        # Contains model, num_context_items, strategy

RetrieverResult

class RetrieverResult(DataModel):
    items: list[RetrieverResultItem] = []
    metadata: dict[str, Any] = {}

RetrieverResultItem

class RetrieverResultItem(DataModel):
    content: str
    metadata: dict[str, Any] = {}
    score: float | None = None

ResolutionResult

class ResolutionResult(DataModel):
    nodes: list[GraphNode] = []
    relationships: list[GraphRelationship] = []
    merged_count: int = 0

ChatMessage

from graphrag_sdk import ChatMessage

class ChatMessage(DataModel):
    role: Literal["system", "user", "assistant"]
    content: str

    def to_dict(self) -> dict[str, str]  # {"role": ..., "content": ...}

Validated message type for multi-turn conversations. Used by completion(history=...) and LLMInterface.ainvoke_messages(). Invalid roles raise a validation error on construction.

LLMMessage is a backward-compatible alias for ChatMessage.

LLMResponse

class LLMResponse(DataModel):
    content: str
    tool_calls: list[dict[str, Any]] | None = None

SearchType

class SearchType(str, Enum):
    VECTOR = "vector"
    FULLTEXT = "fulltext"
    HYBRID = "hybrid"

Extraction Models

class ExtractedEntity(DataModel):
    name: str
    type: str
    description: str = ""
    source_chunk_ids: list[str] = []

class ExtractedRelation(DataModel):
    source: str
    target: str
    type: str
    keywords: str = ""
    description: str = ""
    weight: float = 1.0
    source_chunk_ids: list[str] = []

class EntityMention(DataModel):
    chunk_id: str
    entity_id: str

class ExtractionOutput(DataModel):
    entities: list[ExtractedEntity] = []
    relations: list[ExtractedRelation] = []
    mentions: list[EntityMention] = []

compute_entity_id()

def compute_entity_id(name: str, entity_type: str = "") -> str

Deterministic entity ID from normalized name and optional type. When entity_type is provided, appends a __type suffix to prevent cross-type collisions (e.g. paris__person vs paris__location). Without entity_type, returns just the normalized name for backwards compatibility.


Schema

from graphrag_sdk import GraphSchema, EntityType, RelationType

EntityType

class EntityType(DataModel):
    label: str                            # e.g. "Person"
    description: str | None = None        # Helps LLM understand what to extract
    properties: list[PropertyType] = []   # Optional property definitions

RelationType

class RelationType(DataModel):
    label: str                            # e.g. "WORKS_AT"
    description: str | None = None
    patterns: list[tuple[str, str]] = []  # Allowed (source_label, target_label) pairs

PropertyType

class PropertyType(DataModel):
    name: str
    type: str = "STRING"                  # STRING, INTEGER, FLOAT, BOOLEAN, DATE, LIST
    description: str | None = None
    required: bool = False

GraphSchema

class GraphSchema(DataModel):
    entities: list[EntityType] = []
    relations: list[RelationType] = []

Ingestion Strategies

LoaderStrategy (ABC)

class LoaderStrategy(ABC):
    @abstractmethod
    async def load(self, source: str, ctx: Context) -> DocumentOutput: ...

Built-in: TextLoader(encoding="utf-8"), PdfLoader()

ChunkingStrategy (ABC)

class ChunkingStrategy(ABC):
    @abstractmethod
    async def chunk(self, text: str, ctx: Context) -> TextChunks: ...

Built-in: FixedSizeChunking(chunk_size=1000, chunk_overlap=100)

ExtractionStrategy (ABC)

class ExtractionStrategy(ABC):
    @abstractmethod
    async def extract(self, chunks: TextChunks, schema: GraphSchema, ctx: Context) -> GraphData: ...

Built-in: - GraphExtraction(llm, *, entity_extractor=None, coref_resolver=None, entity_types=None, max_concurrency=None)

Entity Extractors (step 1 backends for GraphExtraction): - GLiNERExtractor(threshold=0.75, model_name="urchade/gliner_medium-v2.1") -- default, local NER - LLMExtractor(llm, threshold=0.75) -- LLM-based NER - Subclass EntityExtractor for custom backends

ResolutionStrategy (ABC)

class ResolutionStrategy(ABC):
    @abstractmethod
    async def resolve(self, graph_data: GraphData, ctx: Context) -> ResolutionResult: ...

Built-in: - ExactMatchResolution(resolve_property="id") - DescriptionMergeResolution(llm=None, force_summary_threshold=3, max_summary_tokens=500)


Ingestion Pipeline

from graphrag_sdk import IngestionPipeline
IngestionPipeline(
    loader: LoaderStrategy,
    chunker: ChunkingStrategy,
    extractor: ExtractionStrategy,
    resolver: ResolutionStrategy,
    graph_store: GraphStore,
    vector_store: VectorStore,
    schema: GraphSchema | None = None,
    embedder: Embedder | None = None,
)
Method Signature Description
run (source, ctx=None, *, text=None, document_info=None) -> IngestionResult Execute the full 9-step pipeline

Retrieval Strategies

RetrievalStrategy (ABC)

Uses the Template Method pattern.

class RetrievalStrategy(ABC):
    def __init__(self, graph_store=None, vector_store=None): ...

    async def search(self, query, ctx=None, **kwargs) -> RetrieverResult: ...

    @abstractmethod
    async def _execute(self, query, ctx, **kwargs) -> RawSearchResult: ...

LocalRetrieval

LocalRetrieval(graph_store, vector_store, embedder, top_k=5, include_entities=True)

MultiPathRetrieval

MultiPathRetrieval(
    graph_store, vector_store, embedder, llm,
    *,
    chunk_top_k=15,
    max_entities=30,
    max_relationships=20,
    rel_top_k=15,
    keyword_limit=10,
)

Reranking Strategies

RerankingStrategy (ABC)

class RerankingStrategy(ABC):
    @abstractmethod
    async def rerank(self, query, result: RetrieverResult, ctx: Context) -> RetrieverResult: ...

CosineReranker

CosineReranker(embedder: Embedder, top_k: int = 15)

Storage

from graphrag_sdk import GraphStore, VectorStore

GraphStore

GraphStore(connection: FalkorDBConnection)
Method Signature Description
upsert_nodes (nodes: list[GraphNode]) -> int Batched MERGE, returns count
upsert_relationships (rels: list[GraphRelationship]) -> int Batched MERGE with label hints
get_connected_entities (chunk_id, max_hops=1) -> list[dict] N-hop entity traversal
query_raw (cypher, params=None) -> Any Raw Cypher execution
get_statistics () -> dict Node/edge counts, types, density
delete_all () -> None Delete all data

VectorStore

VectorStore(connection, embedder=None, index_name="chunk_embeddings", embedding_dimension=256, similarity_function="cosine")
Method Signature Description
create_vector_index (label="Chunk", property="embedding") -> None Create vector index
create_entity_vector_index () -> None Create entity vector index
create_fulltext_index (label="Chunk", *properties) -> None Create fulltext index
ensure_indices () -> None Create all standard indices
index_chunks (chunks: TextChunks) -> int Embed and store chunk vectors
backfill_entity_embeddings () -> int Embed all entities missing vectors
embed_relationships () -> int Embed fact text on RELATES edges
search (query_vector, top_k=5, label="Chunk") -> list[dict] Vector similarity search
search_entities (query_vector, top_k=5) -> list[dict] Entity vector search
search_relationships (query_vector, top_k=15) -> list[dict] RELATES edge vector search
fulltext_search (query, top_k=5, label="Chunk") -> list[dict] Fulltext keyword search

Context

from graphrag_sdk import Context

Execution context for logging and budget tracking.

Context(tenant_id: str = "default", latency_budget_ms: float = 60000.0)
Method/Property Description
ctx.log(message, log_level=logging.INFO) Log a message
ctx.budget_exceeded True if elapsed time > latency_budget_ms

Exceptions

from graphrag_sdk import GraphRAGError
Exception When Raised
GraphRAGError Base exception for all SDK errors
LoaderError File loading failures
ChunkingError Text splitting failures
ExtractionError LLM extraction or JSON parsing failures
ResolutionError Entity deduplication failures
RetrieverError Retrieval execution failures
DatabaseError FalkorDB connection or query failures
IngestionError Pipeline orchestration failures
SchemaValidationError Schema constraint violations