Files
geotech/extract_single.py

430 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Single PDF Layer Extractor - Extract geological layers from a single PDF
"""
import click
import json
import logging
import PyPDF2
import os
from dotenv import load_dotenv
from pathlib import Path
from typing import List, Dict, Tuple
from datetime import datetime
from azure.ai.formrecognizer import DocumentAnalysisClient
from azure.core.credentials import AzureKeyCredential
from openai import AzureOpenAI
load_dotenv()
# Configuration Azure Document Intelligence
AZURE_DOC_ENDPOINT = os.getenv("AZURE_DOC_ENDPOINT")
AZURE_DOC_KEY = os.getenv("AZURE_DOC_KEY")
# Configuration Azure OpenAI
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_KEY = os.getenv("AZURE_OPENAI_KEY")
AZURE_DEPLOYMENT = os.getenv("AZURE_DEPLOYMENT", "gpt-4.1")
API_VERSION = os.getenv("API_VERSION", "2024-12-01-preview")
if not all([AZURE_DOC_ENDPOINT, AZURE_DOC_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_KEY]):
raise ValueError(
"❌ Missing Azure credentials! Please check your .env file.\n"
"Required variables: AZURE_DOC_ENDPOINT, AZURE_DOC_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_KEY"
)
# Configuration du logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
logger.info("✅ Environment variables loaded successfully")
logger.info(f"📍 Document Intelligence endpoint: {AZURE_DOC_ENDPOINT}")
logger.info(f"🤖 OpenAI deployment: {AZURE_DEPLOYMENT}")
class SinglePDFExtractor:
"""Extract geological layers from a single PDF"""
def __init__(self):
"""Initialize Azure clients"""
self.doc_client = DocumentAnalysisClient(
endpoint=AZURE_DOC_ENDPOINT,
credential=AzureKeyCredential(AZURE_DOC_KEY)
)
self.openai_client = AzureOpenAI(
azure_endpoint=AZURE_OPENAI_ENDPOINT,
api_key=AZURE_OPENAI_KEY,
api_version=API_VERSION
)
def get_prompts(self) -> Tuple[str, str]:
"""Get the system and user prompts"""
system_prompt = """Tu es un expert géotechnicien spécialisé dans l'analyse de logs de forage.
Ta tâche est d'extraire UNIQUEMENT les couches géologiques avec leurs profondeurs et descriptions COMPLÈTES.
RÈGLES CRITIQUES:
1. Chaque couche DOIT avoir une profondeur de début et de fin en mètres
2. La description doit contenir TOUT le texte associé à la couche (même sur plusieurs lignes)
3. Notation décimale: 0,3 ou 0.3 = 0.3m (PAS 3m!)
4. Profondeurs typiques: 0-0.3m, 0.3-1m, 1-4m, 4-10m, etc.
5. NE PAS séparer ou classifier le matériau - tout dans la description
6. IGNORER les mots isolés qui ne sont pas dans la colonne description
RÈGLE DE CONTINUITÉ DES COUCHES:
⚠️ Une couche géologique est CONTINUE entre deux profondeurs
⚠️ Si tu vois une profondeur isolée suivie de description, cherche OBLIGATOIREMENT la profondeur de fin
⚠️ Les profondeurs de fin peuvent être:
- Sur la même ligne après la description
- Sur une ligne suivante
- Au début de la couche suivante (la fin d'une couche = début de la suivante)
⚠️ NE JAMAIS fragmenter une description continue en plusieurs couches
GESTION DES PAGES:
⚠️ Les couches peuvent continuer d'une page à l'autre
⚠️ Sur une nouvelle page, si la première profondeur n'est pas 0.0m, c'est la continuation de la page précédente
⚠️ Ne pas dupliquer les couches identiques sur plusieurs pages
SORTIE: JSON valide uniquement"""
user_prompt_template = """Extrait toutes les couches géologiques de cette page de log de forage.
CE QUI EST UNE COUCHE GÉOLOGIQUE:
✓ Descriptions de matériaux: sol, roche, sable, argile, limon, remblai, terre végétale, etc.
✓ A une plage de profondeur ET des propriétés de matériau
✓ Décrit les caractéristiques physiques: couleur, consistance, humidité, altération, granulométrie
CE QUI N'EST PAS UNE COUCHE (IGNORER):
✗ Descriptions d'images ou légendes
✗ Métadonnées du projet (coordonnées, dates, noms d'entreprise)
✗ Descriptions d'équipement ou de méthodologie
✗ Données administratives ou commentaires sur la procédure
✗ Mots isolés dans d'autres colonnes (classifications, formations géologiques, etc.)
EXCLUSIONS ABSOLUES - NE JAMAIS extraire:
✗ Annotations techniques isolées: CZ, JT, JTx5, etc.
✗ Mesures techniques sans description de matériau
✗ Échantillons ("Undisturbed sample", "Disturbed sample", "sample", "échantillon")
✗ Lignes contenant uniquement des angles, pressions ou mesures
✗ Références à des essais ou tests sans description de sol
ALGORITHME DE DÉTECTION DES PROFONDEURS:
1. Pour chaque nombre qui ressemble à une profondeur (X.X m ou X,X m):
- Si suivi de "-" ou "à" ou "bis" → profondeur de début, chercher la fin
- Si précédé de "-" ou "à" ou "bis" → profondeur de fin
- Si isolé → vérifier le contexte:
* Début de description → profondeur de début
* Fin de description → profondeur de fin
* Nouvelle ligne → début de nouvelle couche
2. TOUJOURS vérifier la cohérence: fin couche N = début couche N+1
3. Si une couche n'a pas de profondeur de fin explicite, elle va jusqu'au début de la couche suivante
MOTIFS À RECHERCHER:
- "0.00-0.30m: Terre végétale, brun foncé..."
- "0,3 à 1,0m Sable fin beige..."
- "de 1m à 4m: Argile plastique grise..."
- "1.00 - 4.00 ARGILE limoneuse..."
- "0.3 m Description... 4.0 m" (profondeurs séparées)
- Tableaux avec colonnes profondeur/description
IDENTIFICATION DES COLONNES DE TABLEAU:
⚠️ Dans un tableau, identifier VISUELLEMENT les colonnes avant d'extraire
⚠️ Colonnes à IGNORER complètement:
- Formation géologique (Quaternaire, Tertiaire, Quaternaire, etc.)
- Stratigraphie
- Classification (Colluvions, Alluvions, etc.)
- Symboles ou codes isolés
- Âge géologique
⚠️ Colonnes à UTILISER:
- Description du matériau
- Propriétés physiques (couleur, texture, consistance)
⚠️ Si doute sur une colonne, vérifier si elle contient des propriétés physiques détaillées
RÈGLE CRITIQUE DE SÉPARATION:
⚠️ Les profondeurs (0-0.3m, 0.30m, etc.) ne font JAMAIS partie de la description
⚠️ Si une ligne contient "0,30 m - Mutterboden, braun", alors:
- depth_start: 0.0, depth_end: 0.3
- description: "Mutterboden, braun" (SANS les profondeurs)
⚠️ Retirer TOUS les préfixes de profondeur du champ description
⚠️ NE PAS inclure les profondeurs répétées à la fin de la description
IMPORTANT:
- Capture TOUT le texte de description, même s'il est long ou sur plusieurs lignes
- Ne pas résumer ou reformuler - garder le texte original
- Inclure TOUTES les propriétés mentionnées (couleur, texture, humidité, inclusions, etc.)
- EXCLURE ABSOLUMENT les profondeurs du champ description
- Si le texte commence par "0,30 m - Mutterboden", la description est SEULEMENT "Mutterboden"
- Séparer clairement: profondeurs dans depth_start/depth_end, matériau et propriétés dans description
- NE PAS mélanger les colonnes d'un tableau - prendre uniquement la colonne description principale
VALIDATION OBLIGATOIRE avant de retourner le JSON:
□ Toutes les couches ont une profondeur de début ET de fin
□ Les profondeurs sont continues (pas de trous entre les couches)
□ Les profondeurs sont cohérentes (début < fin)
□ Les descriptions ne contiennent PAS les profondeurs
□ Les descriptions ne contiennent PAS de noms de colonnes de classification
□ Aucune annotation technique isolée n'est présente
□ Pas de duplication de couches identiques
Retourne ce JSON:
{{
"layers": [
{{
"depth_start": 0.0,
"depth_end": 0.3,
"description": "Texte de description SANS les profondeurs - uniquement matériau et propriétés"
}}
]
}}
EXEMPLE CONCRET:
Texte original: "0,00 - 0,30 m Mutterboden, dunkelbraun, humos"
Résultat attendu:
{{
"depth_start": 0.0,
"depth_end": 0.3,
"description": "Mutterboden, dunkelbraun, humos"
}}
PAS: "description": "0,00 - 0,30 m Mutterboden, dunkelbraun, humos"
TEXTE DE LA PAGE {page_num}:
{text}"""
return system_prompt, user_prompt_template
def extract_text_from_pdf(self, pdf_path: Path) -> List[str]:
"""Extract text from PDF with chunking"""
logger.info(f"🔍 Extracting text from {pdf_path.name}")
total_pages = self.get_page_count(pdf_path)
logger.info(f"📄 PDF has {total_pages} pages")
if total_pages == 0:
logger.error("Could not determine page count")
return []
# Extraire par chunks de 2 pages
pages_text = []
chunk_size = 2
for start_page in range(1, total_pages + 1, chunk_size):
end_page = min(start_page + chunk_size - 1, total_pages)
with open(pdf_path, "rb") as f:
poller = self.doc_client.begin_analyze_document(
model_id="prebuilt-document",
document=f,
pages=f"{start_page}-{end_page}"
)
result = poller.result()
for page in result.pages:
page_lines = [line.content for line in (page.lines or [])]
pages_text.append('\n'.join(page_lines))
return pages_text
def get_page_count(self, pdf_path: Path) -> int:
"""Get the number of pages in a PDF"""
try:
with open(pdf_path, "rb") as f:
pdf_reader = PyPDF2.PdfReader(f)
return len(pdf_reader.pages)
except:
pass
return 0
def _extract_table_text(self, table) -> str:
"""Extract text from table with better structure preservation"""
if not hasattr(table, 'cells') or not table.cells:
return ""
max_row = max(cell.row_index for cell in table.cells)
max_col = max(cell.column_index for cell in table.cells)
table_array = [["" for _ in range(max_col + 1)] for _ in range(max_row + 1)]
for cell in table.cells:
if hasattr(cell, 'content'):
table_array[cell.row_index][cell.column_index] = cell.content
lines = []
for row_idx, row in enumerate(table_array):
row_text = " | ".join(str(cell).strip() for cell in row)
if row_text.strip():
lines.append(row_text)
# Add separator after header row
if row_idx == 0 and len(lines) == 1:
lines.append("-" * len(row_text))
return "\n".join(lines)
def extract_layers_from_page(self, page_text: str, page_num: int, system_prompt: str, user_prompt_template: str) -> List[Dict]:
"""Extract layers from a single page using GPT"""
if not page_text.strip():
logger.warning(f"⚠️ Page {page_num} is empty")
return []
max_chars = 8000 # todo
user_prompt = user_prompt_template.format(
page_num=page_num,
text=page_text[:max_chars]
)
try:
logger.info(f"🤖 Processing page {page_num} with GPT-4...")
response = self.openai_client.chat.completions.create(
model=AZURE_DEPLOYMENT,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.1,
max_tokens=4000, # todo
response_format={"type": "json_object"}
)
result = json.loads(response.choices[0].message.content)
valid_layers = []
for layer in result.get("layers", []):
try:
depth_start = float(str(layer.get("depth_start", 0)).replace(",", "."))
depth_end = float(str(layer.get("depth_end", 0)).replace(",", "."))
description = layer.get("description", "").strip()
if (depth_start < depth_end and depth_end <= 100 and depth_start >= 0 and description):
valid_layers.append({
"page": page_num,
"depth_start": depth_start,
"depth_end": depth_end,
"description": description
})
except Exception as e:
logger.warning(f" Invalid layer data: {e}")
continue
logger.info(f" ✓ Found {len(valid_layers)} valid layers on page {page_num}")
return valid_layers
except Exception as e:
logger.error(f"❌ GPT extraction failed for page {page_num}: {e}")
return []
def merge_duplicate_layers(self, layers: List[Dict]) -> List[Dict]:
"""Merge duplicate layers across pages"""
if not layers:
return []
layers.sort(key=lambda x: (x['depth_start'], x['page']))
merged = []
current = None
for layer in layers:
if current is None:
current = layer.copy()
else:
if (abs(current['depth_start'] - layer['depth_start']) < 0.01 and
abs(current['depth_end'] - layer['depth_end']) < 0.01):
if len(layer['description']) > len(current['description']):
current['description'] = layer['description']
current['page'] = min(current['page'], layer['page'])
else:
merged.append(current)
current = layer.copy()
if current:
merged.append(current)
return merged
def extract_layers_from_pdf(self, pdf_path: Path) -> Dict:
"""Main extraction function"""
logger.info(f"\n🚀 Starting extraction for: {pdf_path.name}")
start_time = datetime.now()
pages_text = self.extract_text_from_pdf(pdf_path)
if not pages_text:
logger.error("❌ No text extracted from PDF")
return {
"metadata": {
"source_file": str(pdf_path.resolve()),
"extraction_date": datetime.now().isoformat(),
"extraction_method": "Azure Document Intelligence + GPT-4",
"pages": 0,
"layer_count": 0,
"error": "No text extracted from PDF"
},
"layers": []
}
system_prompt, user_prompt_template = self.get_prompts()
all_layers = []
for page_idx, page_text in enumerate(pages_text):
page_layers = self.extract_layers_from_page(
page_text, page_idx + 1, system_prompt, user_prompt_template
)
all_layers.extend(page_layers)
logger.info(f"\n🔄 Merging duplicate layers...")
merged_layers = self.merge_duplicate_layers(all_layers)
extraction_time = (datetime.now() - start_time).total_seconds()
logger.info(f"\n✅ Extraction completed!")
logger.info(f" Total layers found: {len(merged_layers)}")
logger.info(f" Extraction time: {extraction_time:.1f} seconds")
return {
"metadata": {
"source_file": str(pdf_path.resolve()),
"extraction_date": datetime.now().isoformat(),
"extraction_method": "Azure Document Intelligence + GPT-4",
"pages": len(pages_text),
"layer_count": len(merged_layers),
"extraction_time_seconds": round(extraction_time, 1)
},
"layers": merged_layers
}
@click.command()
@click.argument('pdf_path', type=click.Path(exists=True, path_type=Path))
@click.option('--output', '-o', type=click.Path(path_type=Path), help='Output JSON file path (default: same name as PDF)')
@click.option('--pretty', is_flag=True, help='Pretty print the JSON output')
def main(pdf_path: Path, output: Path, pretty: bool):
"""Extract geological layers from a single PDF file."""
click.echo("🪨 Geological Layer Extractor")
click.echo("=" * 40)
extractor = SinglePDFExtractor()
result = extractor.extract_layers_from_pdf(pdf_path)
if output is None:
output_dir = Path('output')
output_dir.mkdir(exist_ok=True)
output = output_dir / pdf_path.with_suffix('.json').name
with open(output, 'w', encoding='utf-8') as f:
if pretty:
json.dump(result, f, indent=2, ensure_ascii=False)
else:
json.dump(result, f, ensure_ascii=False)
click.echo(f"\n💾 Results saved to: {output}")
click.echo("\n📊 Extraction Summary:")
click.echo(f" Source: {pdf_path.name}")
click.echo(f" Pages processed: {result['metadata']['pages']}")
click.echo(f" Layers extracted: {result['metadata']['layer_count']}")
click.echo(f" Time taken: {result['metadata']['extraction_time_seconds']}s")
if result['layers']:
click.echo("\n📋 Preview of extracted layers:")
for i, layer in enumerate(result['layers'][:3], 1):
click.echo(f"\n {i}. {layer['depth_start']:.1f} - {layer['depth_end']:.1f}m")
click.echo(f" {layer['description'][:80]}{'...' if len(layer['description']) > 80 else ''}")
if len(result['layers']) > 3:
click.echo(f"\n ... and {len(result['layers']) - 3} more layers")
if __name__ == '__main__':
main()