Quick Start

Upload your first document and extract structured data in minutes.

This guide walks you through the core Ragnerock workflow: connecting to a project, uploading a document, running a workflow, and querying the extracted data.

1. Connect to Your Project

Connect to Ragnerock using a connection string with your credentials and project name:

from ragnerock import create_engine, Session, Document

engine = create_engine("ragnerock://your.email@company.com:password@api.ragnerock.com/sec_filings")

with Session(engine) as session:
    # All operations happen within this session
    pass

2. Upload a Document

Upload a document to your project. Ragnerock automatically processes the document, extracting text, generating embeddings, and preparing it for annotation.

from ragnerock import create_engine, Session, Document
import time

engine = create_engine("ragnerock://your.email@company.com:password@api.ragnerock.com/sec_filings")

with Session(engine) as session:
    # Create a document from a local file
    doc = Document(
        file_path="/path/to/apple-10k-2024.pdf",
        name="Apple 10-K 2024"
    )
    session.create(doc)

    print(f"Uploaded document: {doc.id}")
    print(f"Status: {doc.status}")

    # Wait for processing to complete
    while doc.status.value == "processing":
        time.sleep(5)
        doc = session.get(Document, id=doc.id)

    print(f"Processing complete: {doc.status}")

3. Explore Your Documents

List and browse documents in your project:

with Session(engine) as session:
    # List all documents
    for doc in session.list(Document):
        print(f"{doc.name} - {doc.status}")

    # Get a specific document by name
    doc = session.get(Document, name="Apple 10-K 2024")
    if doc:
        print(f"Found: {doc.name}")
        print(f"Size: {doc.filesize} bytes")
        print(f"Created: {doc.created_at}")

4. Run a Workflow

Workflows are pre-configured annotation pipelines. Run a workflow on your documents to extract structured data:

from ragnerock import Workflow

with Session(engine) as session:
    # Get an existing workflow
    workflow = session.get(Workflow, name="Financial Metrics Extraction")

    # Get documents to process
    doc = session.get(Document, name="Apple 10-K 2024")

    # Run the workflow
    job = session.run(workflow, documents=[doc])

    # Wait for completion (with timeout)
    job.wait(timeout=300)

    print(f"Job status: {job.status}")

5. Access Annotations

After a workflow runs, you can access the extracted annotations:

from ragnerock import Annotation

with Session(engine) as session:
    doc = session.get(Document, name="Apple 10-K 2024")

    # List all annotations for this document
    for ann in doc.list(Annotation):
        print(f"Schema: {ann.schema_id}")
        print(f"Data: {ann.data}")
        print("---")

    # Filter by operator (annotation schema)
    for ann in doc.list(Annotation, operator="financial_metrics"):
        print(f"Revenue: ${ann.data.get('revenue')}M")
        print(f"Net Income: ${ann.data.get('net_income')}M")

6. Query Your Data

Once you have annotations, query them using SQL:

with Session(engine) as session:
    result = session.query("""
        SELECT document_name, revenue, net_income, gross_margin
        FROM financial_metrics
        WHERE revenue > 100000
        ORDER BY revenue DESC
    """)

    print(f"Found {result.row_count} companies")

    # As a list of dictionaries
    for row in result.to_dict():
        print(f"{row['document_name']}: ${row['revenue']}M revenue")

    # Or as a pandas DataFrame
    df = result.to_pandas()
    print(df.describe())

Complete Example

Here’s a complete example combining all the steps:

from ragnerock import create_engine, Session, Document, Workflow, Annotation
import time

# Connect
engine = create_engine("ragnerock://analyst@firm.com:password@api.ragnerock.com/research")

with Session(engine) as session:
    # Upload a document
    doc = Document(
        file_path="/data/reports/company-10k.pdf",
        name="Company 10-K 2024"
    )
    session.create(doc)
    print(f"Uploaded: {doc.name}")

    # Wait for processing
    while doc.status.value == "processing":
        time.sleep(5)
        doc = session.get(Document, id=doc.id)
    print(f"Processing: {doc.status}")

    # Run a workflow
    workflow = session.get(Workflow, name="SEC Filing Analysis")
    if workflow:
        job = session.run(workflow, documents=[doc])
        job.wait(timeout=600)
        print(f"Workflow: {job.status}")

    # View annotations
    for ann in doc.list(Annotation, operator="financial_metrics"):
        print(f"Extracted data: {ann.data}")

    # Query across all documents
    result = session.query("""
        SELECT document_name, revenue, risk_factors
        FROM financial_metrics
        ORDER BY created_at DESC
        LIMIT 10
    """)

    for row in result.to_dict():
        print(f"{row['document_name']}: ${row['revenue']}M")

Next Steps