Deep Dive: RAGFlow’s Document Parsing Pipeline
Deep Dive: RAGFlow’s Document Parsing Pipeline
RAGFlow is an open-source RAG (Retrieval-Augmented Generation) engine that provides deep document understanding capabilities. One of its core features is the document parsing pipeline — the mechanism that takes uploaded documents, breaks them into meaningful chunks, generates embeddings, and stores them for retrieval. This post walks through the architecture of that pipeline, from the frontend trigger all the way down to the task executor.
Source reference: All code links point to RAGFlow commit
c217b8f3.
Overview
When a user uploads a document and triggers parsing, the request flows through three main layers:
- Frontend / SDK — the entry point where users initiate document parsing.
- API & Service Layer — validates the request, persists metadata, and enqueues parsing tasks.
- Task Executor — asynchronously picks up tasks, chunks the document, generates embeddings, and indexes the results.
graph TD
subgraph Entry Points
FE["Frontend (Web UI)"]
SDK["Python SDK"]
end
subgraph API Layer
DA["document_app.py"]
SDKAPI["sdk/doc.py"]
end
subgraph Service Layer
DS["DocumentService.run"]
TS["TaskService.queue_task"]
end
subgraph Execution
TE["task_executor.py"]
end
FE --> DA --> DS --> TS --> TE
SDK --> SDKAPI --> TS
Frontend Flow
The web UI triggers parsing through a chain of React hooks and service calls:
| Component | Role |
|---|---|
IconMap |
Maps document status to icon visuals |
handleOperationIconClick |
Handles user click on parse/reparse icon |
useHandleRunDocumentByIds |
React hook that batches document IDs for parsing |
useRunDocument |
Calls the backend API via registered service |
registerServer |
Utility for binding API endpoints |
kbService |
Register knowledge service |
The API endpoint is registered using register_page, which provides a path prefix of {API_VERSION}/{page_name} for all routed pages.
Once the user clicks the parse button, the request reaches the backend by kbService.document_run:
graph LR
A["run() — document_app.py"] --> B["DocumentService.run()"] --> C["TaskService.queue_task()"]
click A "https://github.com/infiniflow/ragflow/blob/c217b8f3/api/apps/document_app.py#L604-L663"
click B "https://github.com/infiniflow/ragflow/blob/c217b8f3/api/db/services/document_service.py#L912"
click C "https://github.com/infiniflow/ragflow/blob/c217b8f3/api/db/services/task_service.py#L360-L464"
SDK Flow
The Python SDK provides a more direct path. Calling parse() on a document object enqueues the task without going through the full web API layer:
graph LR
A["parse() — doc.py"] --> B["TaskService.queue_task()"]
click A "https://github.com/infiniflow/ragflow/blob/c217b8f3/api/apps/sdk/doc.py#L818-L896"
click B "https://github.com/infiniflow/ragflow/blob/c217b8f3/api/db/services/task_service.py#L360-L464"
API & Service Layer
API Endpoints
Frontend API (document_app.py):
| Endpoint | Service Call | Description |
|---|---|---|
upload |
FileService.upload_document |
Uploads a new document to the dataset |
run |
DocumentService.run |
Triggers parsing of a document |
get |
File2DocumentService.get_storage_address |
Retrieves a document’s storage location |
SDK API (doc.py):
| Endpoint | Service Call | Description |
|---|---|---|
upload |
FileService.upload_document |
Uploads a document via SDK |
parse |
TaskService.queue_tasks |
Queues a parse task via SDK |
Key Services
-
DocumentService.run— The main orchestrator. When a customizedpipeline_idis provided, it delegates toTaskService.queue_dataflow; otherwise, it falls back toTaskService.queue_task. -
doc_upload_and_parse— A convenience method that performs the full pipeline in one call: chunk the document, generate embeddings, and insert the results. -
upload_document(FileService) — Persists the raw document to storage. -
queue_raptor_o_graphrag_tasks— Creates specialized tasks for RAPTOR (tree-based summarization) or GraphRAG (knowledge-graph-based retrieval). -
insert(DocumentService) — Inserts parsed document records into the database. -
queue_tasks(TaskService) — Enqueues tasks and callsDocumentService.begin2parseto mark the document as being processed.
Storage
The storage backend is abstracted through StorageFactory, which supports multiple backends (local filesystem, MinIO, S3, etc.) and is selected via configuration.
Task Execution
The heavy lifting happens in task_executor.py, which runs as an independent worker process:
-
handle_task— Entry point that dequeues a task, then callsdo_handle_taskorrun_dataflowdepending on the task type. -
build_chunks— The core chunking logic. It selects the appropriate parser based on document type, splits the content into chunks, and prepares them for embedding. -
FACTORY— A registry of all supported document parsers (naive, paper, book, presentation, laws, table, manual, etc.).
The overall execution flow looks like this:
graph TD
HT["handle_task()"] --> DHT["do_handle_task()"]
HT --> RDF["run_dataflow()"]
DHT --> BC["build_chunks()"]
BC --> F["FACTORY — select parser"]
F --> EMB["Generate embeddings"]
EMB --> IDX["Index into vector store"]
Open Questions
-
What is the purpose of
parseindocument_app.py? This endpoint appears to useseleniumwire.webdriver, suggesting it may be designed for parsing web pages or dynamically rendered content rather than static documents. -
What code consumes
api_utils.py? This utility module likely provides shared helpers (auth decorators, response formatting, etc.) used across multiple API endpoints.