Deep Dive into RAGFlow’s Task Executor: How Asynchronous Document Processing Works

RAGFlow is an open-source RAG (Retrieval-Augmented Generation) engine built by Infiniflow. One of its core architectural components is the task executor — a Redis-backed asynchronous worker that handles document parsing, embedding, dataflow execution, and memory persistence. This post walks through how tasks are created, dispatched, and executed under the hood.

Architecture Overview

RAGFlow decouples task creation from task execution using Redis as a message broker. The producer side (API layer) inserts tasks into the database and pushes them onto Redis queues. The consumer side (task_executor.py) runs as a separate process, continuously polling those queues and processing tasks asynchronously.

graph LR
    A[bulk_insert_into_db] --> B[REDIS_CONN.queue_product]
    B --> C[Redis Queue]
    C --> D[task_executor.collect]
    D --> E[task_executor.do_handle_task]

This design provides several benefits:

  • Decoupled scaling — executors can be scaled independently of the API servers.
  • Priority queues — tasks are routed to different Redis queues based on priority.
  • Fault tolerance — unacknowledged messages are retried via consumer groups.

Step 1: Inserting the Task into the Database

Every task begins its lifecycle with a database insert via bulk_insert_into_db. This utility function (defined in api/db/db_utils.py) handles batch inserts with conflict resolution:

@DB.connection_context()
def bulk_insert_into_db(model, data_source, replace_on_conflict=False):
    DB.create_tables([model])

    for i, data in enumerate(data_source):
        current_time = current_timestamp() + i
        current_date = timestamp_to_date(current_time)
        if 'create_time' not in data:
            data['create_time'] = current_time
        data['create_date'] = timestamp_to_date(data['create_time'])
        data['update_time'] = current_time
        data['update_date'] = current_date

    preserve = tuple(data_source[0].keys() - {'create_time', 'create_date'})
    batch_size = 1000

    for i in range(0, len(data_source), batch_size):
        with DB.atomic():
            query = model.insert_many(data_source[i:i + batch_size])
            if replace_on_conflict:
                if isinstance(DB, PooledMySQLDatabase):
                    query = query.on_conflict(preserve=preserve)
                else:
                    query = query.on_conflict(conflict_target="id", preserve=preserve)
            query.execute()

Key details:

  • Records are inserted in batches of 1000 within atomic transactions.
  • When replace_on_conflict=True, existing records are updated (upsert semantics), preserving all fields except create_time and create_date.
  • The function auto-creates the table if it doesn’t exist via DB.create_tables([model]).

Step 2: Pushing the Task to Redis

Inserting a task into the database alone does not trigger execution. The task must be explicitly pushed to a Redis queue via REDIS_CONN.queue_product(). This is the critical step that bridges the producer and consumer:

assert REDIS_CONN.queue_product(
    settings.get_svr_queue_name(priority), message=task
), "Can't access Redis. Please check the Redis' status."

The priority parameter determines which Redis queue receives the task. Higher-priority tasks go to separate queues that executors check first.

Real-World Usage Patterns

Document processing (api/db/services/document_service.py):

bulk_insert_into_db(Task, [task], True)
DocumentService.begin2parse(sample_doc_id["id"], keep_progress=True)
assert REDIS_CONN.queue_product(
    settings.get_svr_queue_name(priority), message=task
)

Memory tasks (api/db/joint_services/memory_message_service.py):

task = new_task(memory_id, raw_message_id)
bulk_insert_into_db(Task, [task], replace_on_conflict=True)
task_message = {
    "id": task["id"],
    "task_id": task["id"],
    "task_type": task["task_type"],
    "memory_id": memory_id,
    "source_id": raw_message_id,
    "message_dict": message_dict,
}
REDIS_CONN.queue_product(
    settings.get_svr_queue_name(priority=0), message=task_message
)

Notice that the message pushed to Redis can differ from the database record — for memory tasks, additional fields like memory_id, source_id, and message_dict are included to provide execution context.

Step 3: Task Collection — The Consumer Side

The collect() coroutine in task_executor.py is the consumer’s entry point. It polls Redis using consumer groups for distributed, at-least-once delivery:

async def collect():
    global CONSUMER_NAME, DONE_TASKS, FAILED_TASKS
    global UNACKED_ITERATOR

    svr_queue_names = settings.get_svr_queue_names()
    redis_msg = None
    try:
        # First, retry any previously unacknowledged messages
        if not UNACKED_ITERATOR:
            UNACKED_ITERATOR = REDIS_CONN.get_unacked_iterator(
                svr_queue_names, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME
            )
        try:
            redis_msg = next(UNACKED_ITERATOR)
        except StopIteration:
            # No unacked messages — read new ones from the queue
            for svr_queue_name in svr_queue_names:
                redis_msg = REDIS_CONN.queue_consumer(
                    svr_queue_name, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME
                )
                if redis_msg:
                    break
    except Exception as e:
        logging.exception(f"collect got exception: {e}")
        return None, None
    ...

The collection logic follows a deliberate order:

  1. Retry unacked messages first — ensures failed tasks from previous runs are not lost.
  2. Read new messages — iterates through queues (by priority) to pick up fresh tasks.
  3. Validate the task — checks whether the task still exists in the database and hasn’t been cancelled.
  4. Enrich the task — attaches task-type-specific fields (e.g., dataflow_id, memory_id) from the Redis message onto the task dict.

If a task is unknown or has been cancelled, it is immediately acknowledged and discarded.

Step 4: Task Execution

Once a task passes collection, do_handle_task() takes over. This function is decorated with a 3-hour timeout to prevent runaway tasks:

@timeout(60 * 60 * 3, 1)
async def do_handle_task(task):
    task_type = task.get("task_type", "")

    if task_type == "memory":
        await handle_save_to_memory_task(task)
        return

    if task_type == "dataflow" and task.get("doc_id", "") == CANVAS_DEBUG_DOC_ID:
        await run_dataflow(task)
        return

    # For document parsing tasks:
    task_id = task["id"]
    task_tenant_id = task["tenant_id"]
    task_embedding_id = task["embd_id"]
    ...

The handler routes tasks by type:

Task Type Handler Purpose
memory handle_save_to_memory_task() Persist conversation memory
dataflow run_dataflow() Execute canvas/dataflow pipelines
(default) Inline parsing logic Document chunking, embedding, indexing

For document parsing tasks specifically, the executor:

  1. Binds an embedding model — resolves the tenant’s configured embedding model and validates it by encoding a test string.
  2. Initializes the knowledge base — calls init_kb(task, vector_size) to ensure the vector index is ready.
  3. Parses and chunks the document — splits the document based on parser_config.
  4. Reports progress — uses a progress_callback to update task status throughout.

Executor Startup and Lifecycle

Each task executor runs as an independent async process. On startup, it staggers its initialization to avoid overwhelming shared resources like the Infinity vector database:

async def main():
    try:
        worker_num = int(CONSUMER_NAME.rsplit("_", 1)[-1])
        startup_delay = worker_num * 2.0 + random.uniform(0, 0.5)
        if startup_delay > 0:
            await asyncio.sleep(startup_delay)
    except (ValueError, IndexError):
        pass
    ...
    settings.init_settings()
    settings.check_and_install_torch()

    report_task = asyncio.create_task(report_status())
    tasks = []

    while not stop_event.is_set():
        await task_limiter.acquire()
        t = asyncio.create_task(task_manager())
        tasks.append(t)

Key design decisions:

  • Staggered startup — each worker delays by worker_num * 2.0s + jitter to prevent a connection storm.
  • Concurrency limitingtask_limiter (a semaphore) caps the number of concurrent tasks per executor.
  • Graceful shutdown — signal handlers (SIGINT, SIGTERM) set a stop_event, and all running tasks are cancelled and awaited.
  • jemalloc — the Docker entrypoint preloads jemalloc for better memory allocation performance in long-running processes.

In production, executors are spawned by the Docker entrypoint script with automatic restart:

function task_exe() {
    local consumer_id="$1"
    local host_id="$2"
    JEMALLOC_PATH="$(pkg-config --variable=libdir jemalloc)/libjemalloc.so"
    while true; do
        LD_PRELOAD="$JEMALLOC_PATH" \
        "$PY" rag/svr/task_executor.py "${host_id}_${consumer_id}" &
        wait; sleep 1;
    done
}

Summary

RAGFlow’s task execution pipeline is a clean example of the producer-consumer pattern applied to document processing:

  1. Produce: Insert a task record into the database, then push it to a Redis queue with a priority.
  2. Consume: The task executor polls Redis using consumer groups, validates and enriches the task, then dispatches it to the appropriate handler.
  3. Execute: Depending on task type, the executor parses documents, runs dataflows, or persists memory — all within an async framework with timeout protection.

This architecture makes RAGFlow’s ingestion pipeline horizontally scalable, resilient to crashes (via Redis consumer groups and unacked message retry), and cleanly separated from the user-facing API layer.

References