The PDF Processing Pipeline is a core component of Qwello that transforms PDF documents into structured knowledge graphs. This document provides a detailed technical overview of the pipeline's architecture, components, and processes.
Pipeline Overview
The Qwello PDF processing pipeline is a sophisticated multi-stage system that leverages advanced AI models and parallel processing to efficiently handle documents of any size and complexity.
Processing Stages in Detail
1. PDF Upload and Validation
Input Handling
The pipeline begins with document upload and validation:
// Example validation code from PdfController
@Post('upload')
@UseInterceptors(FileInterceptor('file', {
storage: diskStorage({
destination: './uploads',
filename: (req, file, cb) => {
const uniqueSuffix = Date.now() + '-' + Math.round(Math.random() * 1e9);
cb(null, uniqueSuffix + '-' + file.originalname);
}
}),
limits: {
fileSize: 30 * 1024 * 1024, // 30MB max file size
},
fileFilter: (req, file, cb) => {
if (file.mimetype !== 'application/pdf') {
return cb(new UnsupportedMediaTypeException('Only PDF files are allowed'), false);
}
cb(null, true);
}
}))
async uploadPdf(@UploadedFile() file: Express.Multer.File, @Req() req: Request) {
// Validate the file
if (!file) {
throw new BadRequestException('No file uploaded');
}
// Process the file
return this.pdfService.processPdf(file.path, req.user);
}
Key validation steps include:
File Format Verification: Ensuring the uploaded file is a valid PDF
Size Validation: Checking that the file size is within limits (up to 30MB)
Page Count Determination: Identifying the number of pages for processing planning
Metadata Extraction: Extracting document metadata for later reference
Storage: Saving the original PDF to AWS S3 for persistence
Error Handling
The upload process includes comprehensive error handling:
Invalid Format: Returns appropriate error messages for non-PDF files
Size Exceeded: Provides clear feedback when file size limits are exceeded
Corrupt Files: Detects and reports corrupted PDF files
Server Errors: Handles and logs server-side errors during upload
2. Image Conversion
Once a PDF is validated, it's converted to optimized images for AI processing:
PDF to Image Conversion
// Example from PdfService
async pdfToCompressedImages(pdfBuffer: Buffer): Promise<Buffer[]> {
const pageBuffers: Buffer[] = [];
try {
// Load the PDF document
const pdfDoc = await PDFDocument.load(pdfBuffer);
const pageCount = pdfDoc.getPageCount();
// Process each page
for (let i = 0; i < pageCount; i++) {
// Convert page to image
const img = await this.convertPageToImage(pdfDoc, i, {
width: 800, // pixels
format: 'WebP', // image format
quality: 70, // compression quality
grayscale: true // color mode
});
pageBuffers.push(img);
}
return pageBuffers;
} catch (error) {
this.logger.error(`Error converting PDF to images: ${error.message}`);
throw new Error(`Failed to convert PDF to images: ${error.message}`);
}
}
Key image conversion features:
Page Extraction: Converting each PDF page to a separate image
Image Optimization: Applying "Heavy L3" optimization for optimal AI processing
Aspect Ratio Preservation: Maintaining the original document proportions
Parallel Processing: Converting multiple pages simultaneously for efficiency
Image Processing Considerations
The image conversion process is optimized for AI model consumption:
Resolution Balance: High enough for text clarity, low enough for efficient processing
Format Selection: WebP format provides good compression while maintaining quality
Grayscale Conversion: Reduces file size and improves OCR performance
Quality Settings: Balances file size with image clarity
3. Text Extraction
The optimized images are processed by AI vision models to extract structured text:
Vision AI Processing
// Example from PdfProcessingService
private async processImageToMarkdown(
base64Image: string,
pageNum: number,
primaryModel: Model,
fallbackModel: Model,
): Promise<[string, number]> {
let attempt = 0;
let currentModel: Model = primaryModel;
let lastError = null;
let usedFallback = false;
while (attempt < this.retries) {
try {
this.logger.log(
`Processing page ${pageNum} with model: ${currentModel} (attempt ${attempt + 1}/${this.retries})`,
);
const messages = [
{
role: 'system',
content: markdownPrompt(pageNum),
},
{
role: 'user',
content: [
{
type: 'text',
text: `I'm sending you a document image that needs to be converted to clean, structured markdown format.`,
},
{
type: 'image_url',
image_url: { url: `data:image/png;base64,${base64Image}` },
},
],
},
];
const startTime = new Date().getTime();
// Make API request
const response = await this.aiService.request(
{
provider: 'openrouter',
route: 'chat/completions',
messages: messages as any,
model: currentModel,
},
usedFallback
? null
: {
provider: 'openrouter',
route: 'chat/completions',
messages: messages as any,
model: fallbackModel,
},
);
const endTime = new Date().getTime();
this.logger.debug(
`Processed page ${pageNum} in ${endTime - startTime}ms`,
)
return [response, pageNum];
} catch (err) {
// Error handling with retry logic and fallback model
// Implementation details omitted for brevity
}
}
}
Key text extraction features:
Vision AI Model: Uses Grok Vision model (x-ai/grok-2-vision-1212) to analyze page images
Text Recognition: Extracts text while preserving layout and formatting
Structure Preservation: Maintains headings, paragraphs, lists, and tables
Markdown Conversion: Converts extracted content to structured markdown
Page Boundary Marking: Clearly marks page transitions for reference
Prompt Engineering
The vision model uses carefully crafted prompts to guide the extraction process:
// Image to Markdown prompt
const markdownPrompt = (pageNum) => `
You are an expert document analyzer and formatter. Extract all text from these images and convert them to clean, structured markdown format.
1. EXTRACTION AND STRUCTURE:
- Extract all text accurately while maintaining each document's logical structure
- Identify and properly format headings using markdown # syntax:
* Main title: # Title (H1)
* Main sections: ## Section (H2)
* Subsections: ### Subsection (H3)
* Further subsections: #### Subsubsection (H4)
- Remove any numbering schemes from headings (like "1.2.3", "I.A.1") but keep the text
- Preserve the hierarchical relationship between sections
- Begin each image's content with a marker in this format: "{{{${pageNum}}}}" (where ${pageNum} is the page number)
2. FORMATTING AND SPECIAL ELEMENTS:
- Convert tables to proper markdown table syntax with aligned columns
- Format lists as proper markdown bulleted or numbered lists
- Format code blocks and technical snippets with appropriate syntax
- Use *italics* and **bold** where appropriate in the original
- Format footnotes properly (author affiliations with asterisks, other footnotes with [^1] notation)
- Preserve mathematical formulas and equations accurately using LaTeX syntax when needed
3. CONTENT ACCURACY:
- Transcribe all text, numbers, and symbols precisely
- Maintain exact terminology, technical jargon, and specialized vocabulary
- Keep proper nouns, names, and titles with correct capitalization
- Preserve the exact structure of tables, including column alignments
- Maintain the integrity of diagrams and figures by describing their content
4. CLEANUP AND CLARITY:
- Remove any PDF artifacts or format remnants
- Remove any duplicated text from layout issues
- Clean up any OCR errors that are obviously incorrect
- Ensure consistent spacing between sections
- Maintain proper paragraph breaks and section divisions
5. DO NOT:
- Add any commentary, analysis, or explanations about the content
- Include watermarks, headers, footers, or page numbers
- Add any text that isn't from the original document
- Modify, summarize, or paraphrase the original content
- Merge content between different images unless they are clearly part of the same section
`;
Fallback Mechanisms
The text extraction process includes robust fallback mechanisms:
Primary Model: x-ai/grok-2-vision-1212 for optimal performance
Fallback Model: anthropic/claude-3.7-sonnet when primary model encounters issues
Retry Logic: Multiple attempts with exponential backoff
Rate Limit Handling: Intelligent handling of API rate limits
Error Recovery: Graceful degradation with partial results when needed
4. Knowledge Graph Generation
The extracted text is analyzed to generate a knowledge graph:
Language AI Processing
// Example from PdfProcessingService
private async processMarkdownToKG(
markdown: string,
pageNum: number,
primaryModel: Model,
fallbackModel: Model,
): Promise<[object, number]> {
let attempt = 0;
let currentModel: Model = primaryModel;
let lastError = null;
let usedFallback = false;
while (attempt < this.retries) {
try {
this.logger.log(
`Processing markdown to KG with model: ${currentModel} (attempt ${attempt + 1}/${this.retries})`,
);
// Prepare messages
const messages = [
{
role: 'system',
content: kgPrompt,
},
{
role: 'user',
content: `Please analyze the following markdown text from page ${pageNum} and create a knowledge graph by identifying key entities, relationships, and concepts:
${markdown}
Extract all important entities, their attributes, and the relationships between them. Format your response as a JSON knowledge graph following the structure specified in the system instructions.`,
},
];
// Make API request
const response = await this.aiService.request(
{
provider: 'openrouter',
route: 'chat/completions',
messages: messages as any,
model: currentModel,
},
usedFallback
? null
: {
provider: 'openrouter',
route: 'chat/completions',
messages: messages as any,
model: fallbackModel,
},
);
// Try to parse JSON from content
try {
// Extract JSON from content (it might be wrapped in ```json ... ``` or other text)
const jsonMatch = response.match(/```json\s*([\s\S]*?)\s*```/) ||
response.match(/```\s*([\s\S]*?)\s*```/) || [null, response];
const jsonStr = jsonMatch[1].trim();
return [JSON.parse(jsonStr), pageNum];
} catch (jsonError) {
throw new Error(
`Error parsing JSON from content: ${jsonError.message}`,
);
}
} catch (err) {
// Error handling with retry logic and fallback model
// Implementation details omitted for brevity
}
}
}
Key knowledge graph generation features:
Language AI Model: Uses Grok Language model (x-ai/grok-2-1212) to analyze text
Entity Identification: Recognizes key concepts, people, organizations, etc.
Entity Classification: Assigns appropriate types to identified entities
Relationship Extraction: Determines connections between entities
Attribute Assignment: Extracts relevant attributes for entities and relationships
JSON Structure Creation: Formats the knowledge graph in a structured JSON format
Knowledge Graph Prompt
The language model uses a specialized prompt for knowledge graph generation:
// Knowledge Graph generation prompt
const kgPrompt = `
You are an expert knowledge graph creator. Convert the provided markdown text from a single page into a structured knowledge graph by identifying key entities, relationships, and concepts.
1. ENTITY RECOGNITION:
- Identify key entities (people, organizations, concepts, technologies, methods)
- Extract attributes and properties of these entities
- Recognize specialized terminology and technical concepts
- Identify numerical data, statistics, and measurements
- Be aware that some entities may be referenced but defined on other pages
2. RELATIONSHIP EXTRACTION:
- Identify relationships between entities
- Determine the nature of these relationships (e.g., "is part of", "causes", "implements")
- Capture hierarchical relationships between concepts
- Identify temporal relationships and sequences
3. KNOWLEDGE STRUCTURING:
- Organize extracted information into a coherent knowledge structure
- Maintain the logical flow and connections between concepts
- Preserve the context in which entities and relationships appear
- Identify overarching themes and categories
4. COREFERENCE AND REFERENCES:
- Identify when the text refers to entities that might be defined elsewhere
- Include these references even if the full entity definition is not on this page
- Use the most specific name or identifier available on this page
5. OUTPUT FORMAT:
- Provide a JSON object representing the knowledge graph with entities and relationships
- The JSON should follow this structure:
{
"entities": [
{"id": "e1", "type": "concept", "name": "Entity Name", "attributes": {"key": "value"}},
...
],
"relationships": [
{"source": "e1", "target": "e2", "type": "relationship_type", "attributes": {"key": "value"}},
...
]
}
Your response should ONLY contain the JSON knowledge graph without any additional text or explanation.
`;
JSON Structure
The knowledge graph is structured as a JSON object with entities and relationships:
Storage and Indexing: Saving reports for later retrieval and reference
Chunking Strategy
For large knowledge graphs, a chunking strategy is employed:
Token-Based Splitting: Divides content based on token counts
Structural Preservation: Maintains valid JSON in each chunk
Context Inclusion: Includes additional context from previous chunks
Overlap Management: Configurable overlap between chunks for continuity
Parallel Processing Architecture
Qwello implements a worker-based parallel processing architecture for efficient document handling:
Worker Pool Implementation
// Example from WorkerPool class
class WorkerPool {
constructor(size, totalTasks) {
this.size = size; // Number of concurrent workers
this.workers = []; // Active worker references
this.taskQueue = []; // Individual page tasks waiting to be processed
this.freeWorkers = []; // Available worker IDs
this.completedTasks = 0; // Counter for completed tasks
this.totalTasks = totalTasks; // Total number of tasks
this.results = []; // Collection of results
this.progress = new ProgressTracker(totalTasks);
this.rateLimitErrors = new Map(); // Track rate limit errors by model
this.errorCounts = new Map(); // Track error counts by type
}
async processQueue() {
if (this.taskQueue.length === 0 || this.freeWorkers.length === 0) {
return;
}
// Get a free worker ID and task
const workerId = this.freeWorkers.shift();
const task = this.taskQueue.shift();
// Create the worker with task data
const worker = new Worker(workerPath, { workerData: task });
// Process results and handle errors
worker.on('message', async (result) => {
this.results.push(result);
this.completedTasks++;
this.progress.increment(result.success);
// Clean up and process next task
worker.terminate();
this.freeWorkers.push(workerId);
setTimeout(() => this.processQueue(), this.shouldDelay() ? 1000 : 0);
});
}
}
Key aspects of this architecture include:
Worker Pool: Multiple workers process different pages simultaneously
Task Distribution: Pages are assigned to workers based on availability
Resource Management: Worker count adapts to available system resources
Progress Tracking: Real-time monitoring of processing status
Result Collection: Processed pages are collected for merging
Error Handling: Failed tasks are retried or redirected to fallback processors
Job Queue Integration
The PDF processing pipeline is integrated with BullMQ for reliable job processing:
// Example from PdfProcessingService
public async processPdf(
pdfBuffer: Buffer,
user: User,
): Promise<PdfKnowledgeGraph> {
const pageBuffers = await this.pdfService.pdfToCompressedImages(pdfBuffer);
const totalPages = pageBuffers.length;
const pdfPages: PdfPage[] = [];
for (let i = 0; i < totalPages; i++) {
const pageBuffer = pageBuffers[i];
const pageBase64 = pageBuffer.toString('base64');
pdfPages.push({ page: i + 1, data: pageBase64 });
}
const knowledgeGraph = await this.createKnowledgeGraph(totalPages, user);
const chunks = chunkArray(pdfPages, 10);
const jobs = [];
for (let i = 0; i < chunks.length; i++) {
const chunk = chunks[i];
const pdfProcessingJob = await this.pdfQueue.add('pdf', {
pdfId: knowledgeGraph.id,
pages: chunk,
});
this.logger.log(
`Added job ${pdfProcessingJob.id} for chunk ${i + 1}/${chunks.length}`,
);
jobs.push(pdfProcessingJob.id);
}
knowledgeGraph.jobs = jobs.map((jobId) => {
return { jobId, status: KgStatus.Processing } as PdfJob;
});
await this.pdfKnowledgeGraphRepository.update(knowledgeGraph);
return knowledgeGraph;
}
Error Handling and Recovery
The PDF processing pipeline includes robust error handling and recovery mechanisms:
This comprehensive documentation covers the technical details of Qwello's PDF processing pipeline, from initial upload to final knowledge graph generation. The pipeline's sophisticated architecture, with its parallel processing capabilities and AI model integration, enables efficient and accurate transformation of PDF documents into structured knowledge graphs.