Deep Dive: RAGFlow’s Document Parsing Pipeline

RAGFlow is an open-source RAG (Retrieval-Augmented Generation) engine that provides deep document understanding capabilities. One of its core features is the document parsing pipeline — the mechanism that takes uploaded documents, breaks them into meaningful chunks, generates embeddings, and stores them for retrieval. This post walks through the architecture of that pipeline, from the frontend trigger all the way down to the task executor.

Source reference: All code links point to RAGFlow commit c217b8f3.


Overview

When a user uploads a document and triggers parsing, the request flows through three main layers:

  1. Frontend / SDK — the entry point where users initiate document parsing.
  2. API & Service Layer — validates the request, persists metadata, and enqueues parsing tasks.
  3. Task Executor — asynchronously picks up tasks, chunks the document, generates embeddings, and indexes the results.
graph TD
    subgraph Entry Points
        FE["Frontend (Web UI)"]
        SDK["Python SDK"]
    end

    subgraph API Layer
        DA["document_app.py"]
        SDKAPI["sdk/doc.py"]
    end

    subgraph Service Layer
        DS["DocumentService.run"]
        TS["TaskService.queue_task"]
    end

    subgraph Execution
        TE["task_executor.py"]
    end

    FE --> DA --> DS --> TS --> TE
    SDK --> SDKAPI --> TS

Frontend Flow

The web UI triggers parsing through a chain of React hooks and service calls:

Component Role
IconMap Maps document status to icon visuals
handleOperationIconClick Handles user click on parse/reparse icon
useHandleRunDocumentByIds React hook that batches document IDs for parsing
useRunDocument Calls the backend API via registered service
registerServer Utility for binding API endpoints
kbService Register knowledge service

The API endpoint is registered using register_page, which provides a path prefix of {API_VERSION}/{page_name} for all routed pages.

Once the user clicks the parse button, the request reaches the backend by kbService.document_run:

graph LR
    A["run() — document_app.py"] --> B["DocumentService.run()"] --> C["TaskService.queue_task()"]

    click A "https://github.com/infiniflow/ragflow/blob/c217b8f3/api/apps/document_app.py#L604-L663"
    click B "https://github.com/infiniflow/ragflow/blob/c217b8f3/api/db/services/document_service.py#L912"
    click C "https://github.com/infiniflow/ragflow/blob/c217b8f3/api/db/services/task_service.py#L360-L464"

SDK Flow

The Python SDK provides a more direct path. Calling parse() on a document object enqueues the task without going through the full web API layer:

graph LR
    A["parse() — doc.py"] --> B["TaskService.queue_task()"]

    click A "https://github.com/infiniflow/ragflow/blob/c217b8f3/api/apps/sdk/doc.py#L818-L896"
    click B "https://github.com/infiniflow/ragflow/blob/c217b8f3/api/db/services/task_service.py#L360-L464"

API & Service Layer

API Endpoints

Frontend API (document_app.py):

Endpoint Service Call Description
upload FileService.upload_document Uploads a new document to the dataset
run DocumentService.run Triggers parsing of a document
get File2DocumentService.get_storage_address Retrieves a document’s storage location

SDK API (doc.py):

Endpoint Service Call Description
upload FileService.upload_document Uploads a document via SDK
parse TaskService.queue_tasks Queues a parse task via SDK

Key Services

  • DocumentService.run — The main orchestrator. When a customized pipeline_id is provided, it delegates to TaskService.queue_dataflow; otherwise, it falls back to TaskService.queue_task.

  • doc_upload_and_parse — A convenience method that performs the full pipeline in one call: chunk the document, generate embeddings, and insert the results.

  • upload_document (FileService) — Persists the raw document to storage.

  • queue_raptor_o_graphrag_tasks — Creates specialized tasks for RAPTOR (tree-based summarization) or GraphRAG (knowledge-graph-based retrieval).

  • insert (DocumentService) — Inserts parsed document records into the database.

  • queue_tasks (TaskService) — Enqueues tasks and calls DocumentService.begin2parse to mark the document as being processed.

Storage

The storage backend is abstracted through StorageFactory, which supports multiple backends (local filesystem, MinIO, S3, etc.) and is selected via configuration.


Task Execution

The heavy lifting happens in task_executor.py, which runs as an independent worker process:

  1. handle_task — Entry point that dequeues a task, then calls do_handle_task or run_dataflow depending on the task type.

  2. build_chunks — The core chunking logic. It selects the appropriate parser based on document type, splits the content into chunks, and prepares them for embedding.

  3. FACTORY — A registry of all supported document parsers (naive, paper, book, presentation, laws, table, manual, etc.).

The overall execution flow looks like this:

graph TD
    HT["handle_task()"] --> DHT["do_handle_task()"]
    HT --> RDF["run_dataflow()"]
    DHT --> BC["build_chunks()"]
    BC --> F["FACTORY — select parser"]
    F --> EMB["Generate embeddings"]
    EMB --> IDX["Index into vector store"]

Open Questions

  • What is the purpose of parse in document_app.py? This endpoint appears to use seleniumwire.webdriver, suggesting it may be designed for parsing web pages or dynamically rendered content rather than static documents.

  • What code consumes api_utils.py? This utility module likely provides shared helpers (auth decorators, response formatting, etc.) used across multiple API endpoints.


References