Deep Dive into RAGFlow’s Task Executor: How Asynchronous Document Processing Works
Deep Dive into RAGFlow’s Task Executor: How Asynchronous Document Processing Works
RAGFlow is an open-source RAG (Retrieval-Augmented Generation) engine built by Infiniflow. One of its core architectural components is the task executor — a Redis-backed asynchronous worker that handles document parsing, embedding, dataflow execution, and memory persistence. This post walks through how tasks are created, dispatched, and executed under the hood.
Architecture Overview
RAGFlow decouples task creation from task execution using Redis as a message broker. The producer side (API layer) inserts tasks into the database and pushes them onto Redis queues. The consumer side (task_executor.py) runs as a separate process, continuously polling those queues and processing tasks asynchronously.
graph LR
A[bulk_insert_into_db] --> B[REDIS_CONN.queue_product]
B --> C[Redis Queue]
C --> D[task_executor.collect]
D --> E[task_executor.do_handle_task]
This design provides several benefits:
- Decoupled scaling — executors can be scaled independently of the API servers.
- Priority queues — tasks are routed to different Redis queues based on priority.
- Fault tolerance — unacknowledged messages are retried via consumer groups.
Step 1: Inserting the Task into the Database
Every task begins its lifecycle with a database insert via bulk_insert_into_db. This utility function (defined in api/db/db_utils.py) handles batch inserts with conflict resolution:
@DB.connection_context()
def bulk_insert_into_db(model, data_source, replace_on_conflict=False):
DB.create_tables([model])
for i, data in enumerate(data_source):
current_time = current_timestamp() + i
current_date = timestamp_to_date(current_time)
if 'create_time' not in data:
data['create_time'] = current_time
data['create_date'] = timestamp_to_date(data['create_time'])
data['update_time'] = current_time
data['update_date'] = current_date
preserve = tuple(data_source[0].keys() - {'create_time', 'create_date'})
batch_size = 1000
for i in range(0, len(data_source), batch_size):
with DB.atomic():
query = model.insert_many(data_source[i:i + batch_size])
if replace_on_conflict:
if isinstance(DB, PooledMySQLDatabase):
query = query.on_conflict(preserve=preserve)
else:
query = query.on_conflict(conflict_target="id", preserve=preserve)
query.execute()
Key details:
- Records are inserted in batches of 1000 within atomic transactions.
- When
replace_on_conflict=True, existing records are updated (upsert semantics), preserving all fields exceptcreate_timeandcreate_date. - The function auto-creates the table if it doesn’t exist via
DB.create_tables([model]).
Step 2: Pushing the Task to Redis
Inserting a task into the database alone does not trigger execution. The task must be explicitly pushed to a Redis queue via REDIS_CONN.queue_product(). This is the critical step that bridges the producer and consumer:
assert REDIS_CONN.queue_product(
settings.get_svr_queue_name(priority), message=task
), "Can't access Redis. Please check the Redis' status."
The priority parameter determines which Redis queue receives the task. Higher-priority tasks go to separate queues that executors check first.
Real-World Usage Patterns
Document processing (api/db/services/document_service.py):
bulk_insert_into_db(Task, [task], True)
DocumentService.begin2parse(sample_doc_id["id"], keep_progress=True)
assert REDIS_CONN.queue_product(
settings.get_svr_queue_name(priority), message=task
)
Memory tasks (api/db/joint_services/memory_message_service.py):
task = new_task(memory_id, raw_message_id)
bulk_insert_into_db(Task, [task], replace_on_conflict=True)
task_message = {
"id": task["id"],
"task_id": task["id"],
"task_type": task["task_type"],
"memory_id": memory_id,
"source_id": raw_message_id,
"message_dict": message_dict,
}
REDIS_CONN.queue_product(
settings.get_svr_queue_name(priority=0), message=task_message
)
Notice that the message pushed to Redis can differ from the database record — for memory tasks, additional fields like memory_id, source_id, and message_dict are included to provide execution context.
Step 3: Task Collection — The Consumer Side
The collect() coroutine in task_executor.py is the consumer’s entry point. It polls Redis using consumer groups for distributed, at-least-once delivery:
async def collect():
global CONSUMER_NAME, DONE_TASKS, FAILED_TASKS
global UNACKED_ITERATOR
svr_queue_names = settings.get_svr_queue_names()
redis_msg = None
try:
# First, retry any previously unacknowledged messages
if not UNACKED_ITERATOR:
UNACKED_ITERATOR = REDIS_CONN.get_unacked_iterator(
svr_queue_names, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME
)
try:
redis_msg = next(UNACKED_ITERATOR)
except StopIteration:
# No unacked messages — read new ones from the queue
for svr_queue_name in svr_queue_names:
redis_msg = REDIS_CONN.queue_consumer(
svr_queue_name, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME
)
if redis_msg:
break
except Exception as e:
logging.exception(f"collect got exception: {e}")
return None, None
...
The collection logic follows a deliberate order:
- Retry unacked messages first — ensures failed tasks from previous runs are not lost.
- Read new messages — iterates through queues (by priority) to pick up fresh tasks.
- Validate the task — checks whether the task still exists in the database and hasn’t been cancelled.
- Enrich the task — attaches task-type-specific fields (e.g.,
dataflow_id,memory_id) from the Redis message onto the task dict.
If a task is unknown or has been cancelled, it is immediately acknowledged and discarded.
Step 4: Task Execution
Once a task passes collection, do_handle_task() takes over. This function is decorated with a 3-hour timeout to prevent runaway tasks:
@timeout(60 * 60 * 3, 1)
async def do_handle_task(task):
task_type = task.get("task_type", "")
if task_type == "memory":
await handle_save_to_memory_task(task)
return
if task_type == "dataflow" and task.get("doc_id", "") == CANVAS_DEBUG_DOC_ID:
await run_dataflow(task)
return
# For document parsing tasks:
task_id = task["id"]
task_tenant_id = task["tenant_id"]
task_embedding_id = task["embd_id"]
...
The handler routes tasks by type:
| Task Type | Handler | Purpose |
|---|---|---|
memory |
handle_save_to_memory_task() |
Persist conversation memory |
dataflow |
run_dataflow() |
Execute canvas/dataflow pipelines |
| (default) | Inline parsing logic | Document chunking, embedding, indexing |
For document parsing tasks specifically, the executor:
- Binds an embedding model — resolves the tenant’s configured embedding model and validates it by encoding a test string.
- Initializes the knowledge base — calls
init_kb(task, vector_size)to ensure the vector index is ready. - Parses and chunks the document — splits the document based on
parser_config. - Reports progress — uses a
progress_callbackto update task status throughout.
Executor Startup and Lifecycle
Each task executor runs as an independent async process. On startup, it staggers its initialization to avoid overwhelming shared resources like the Infinity vector database:
async def main():
try:
worker_num = int(CONSUMER_NAME.rsplit("_", 1)[-1])
startup_delay = worker_num * 2.0 + random.uniform(0, 0.5)
if startup_delay > 0:
await asyncio.sleep(startup_delay)
except (ValueError, IndexError):
pass
...
settings.init_settings()
settings.check_and_install_torch()
report_task = asyncio.create_task(report_status())
tasks = []
while not stop_event.is_set():
await task_limiter.acquire()
t = asyncio.create_task(task_manager())
tasks.append(t)
Key design decisions:
- Staggered startup — each worker delays by
worker_num * 2.0s + jitterto prevent a connection storm. - Concurrency limiting —
task_limiter(a semaphore) caps the number of concurrent tasks per executor. - Graceful shutdown — signal handlers (
SIGINT,SIGTERM) set astop_event, and all running tasks are cancelled and awaited. - jemalloc — the Docker entrypoint preloads jemalloc for better memory allocation performance in long-running processes.
In production, executors are spawned by the Docker entrypoint script with automatic restart:
function task_exe() {
local consumer_id="$1"
local host_id="$2"
JEMALLOC_PATH="$(pkg-config --variable=libdir jemalloc)/libjemalloc.so"
while true; do
LD_PRELOAD="$JEMALLOC_PATH" \
"$PY" rag/svr/task_executor.py "${host_id}_${consumer_id}" &
wait; sleep 1;
done
}
Summary
RAGFlow’s task execution pipeline is a clean example of the producer-consumer pattern applied to document processing:
- Produce: Insert a task record into the database, then push it to a Redis queue with a priority.
- Consume: The task executor polls Redis using consumer groups, validates and enriches the task, then dispatches it to the appropriate handler.
- Execute: Depending on task type, the executor parses documents, runs dataflows, or persists memory — all within an async framework with timeout protection.
This architecture makes RAGFlow’s ingestion pipeline horizontally scalable, resilient to crashes (via Redis consumer groups and unacked message retry), and cleanly separated from the user-facing API layer.