Session

Use Sessions to interact with Ragnerock resources in Python.

The Session is your gateway to interacting with Ragnerock. It provides methods to get, list, create, delete, query, and run resources within a project.

Basic Usage

Use Session as a context manager:

from ragnerock import create_engine, Session

engine = create_engine("ragnerock://user@example.com:pass@api.ragnerock.com/my_project")

with Session(engine) as session:
    # All operations happen here
    docs = session.list(Document).all()

The session authenticates when you enter the with block and handles cleanup when you exit.

Session Methods

get()

Retrieve a single resource by ID or name.

from ragnerock import Document, Operator, Workflow, Annotation, Chunk, Page

# Get by ID
doc = session.get(Document, id="550e8400-e29b-41d4-a716-446655440000")

# Get by name (for resources that support it)
doc = session.get(Document, name="Apple 10-K 2024")
operator = session.get(Operator, name="financial_metrics")
workflow = session.get(Workflow, name="SEC Analysis Pipeline")

# Returns None if not found
doc = session.get(Document, id="nonexistent")
if doc is None:
    print("Document not found")

Supported resource types:

ResourceSupports id=Supports name=
DocumentYesYes
OperatorYesYes
WorkflowYesYes
AnnotationYesNo
ChunkYesNo
PageYesNo

list()

List resources with optional filters. Returns a PaginatedIterator.

from ragnerock import Document, Annotation, Chunk, Operator, Workflow

# List all documents
for doc in session.list(Document):
    print(doc.name)

# Get all at once
all_docs = session.list(Document).all()

# Get first only
first_doc = session.list(Document).first()

# Limit results
recent = session.list(Document).limit(10).all()

Listing annotations requires a filter:

# By document
annotations = session.list(Annotation, document_id=doc.id).all()

# By chunk
annotations = session.list(Annotation, chunk_id=chunk.id).all()

# By schema (operator)
annotations = session.list(Annotation, schema_id=operator.id).all()

create()

Create a new resource. The resource is mutated in place with server-assigned fields.

from ragnerock import Document, Annotation

# Create a document
doc = Document(
    file_path="/path/to/report.pdf",
    name="Q4 Earnings Report"
)
session.create(doc)

# doc now has server-assigned fields
print(doc.id)          # UUID assigned by server
print(doc.created_at)  # Timestamp
print(doc.status)      # Processing status

# Create an annotation
annotation = Annotation(
    schema_id=operator.id,
    document_id=doc.id,
    data={"sentiment": 0.8, "topics": ["revenue", "growth"]}
)
session.create(annotation)

Supported for creation:

ResourceCreatable
DocumentYes
AnnotationYes
OperatorNo (use web app)
WorkflowNo (use web app)

delete()

Delete a resource.

# Delete a document
doc = session.get(Document, name="Old Report")
if doc:
    session.delete(doc)

# Delete an annotation
annotation = session.get(Annotation, id="...")
if annotation:
    session.delete(annotation)

query()

Execute a SQL query on annotation data. Returns a QueryResult.

result = session.query("""
    SELECT document_name, sentiment_score, key_topics
    FROM sentiment_analysis
    WHERE sentiment_score > 0.5
    ORDER BY sentiment_score DESC
""")

# Access results
print(result.columns)      # ['document_name', 'sentiment_score', 'key_topics']
print(result.row_count)    # Number of rows
print(result.query_time_ms)  # Execution time

# Convert to formats
data = result.to_dict()    # List of dicts
df = result.to_pandas()    # pandas DataFrame

# Limit results
result = session.query("SELECT * FROM large_table", limit=100)

See SQL Queries for more details.

run()

Trigger a workflow on documents. Returns a Job handle.

from ragnerock import Workflow, Document, Job

# Get the workflow
workflow = session.get(Workflow, name="Extract Financial Metrics")

# Get documents to process
docs = session.list(Document).limit(10).all()

# Run the workflow
job = session.run(workflow, documents=docs)

# Wait for completion
job.wait(timeout=300, poll_interval=2.0)

# Check status
print(job.status)  # "SUCCEEDED" or "FAILED"
if job.status_message:
    print(job.status_message)

See Resources for details on the Job class.

Working with Resources

Resources retrieved from a session have convenience methods:

Document.list()

List related resources for a document:

doc = session.get(Document, name="Apple 10-K")

# List all annotations
for ann in doc.list(Annotation):
    print(ann.data)

# Filter by operator
for ann in doc.list(Annotation, operator="sentiment"):
    print(ann.data)

# List chunks
for chunk in doc.list(Chunk):
    print(chunk.content[:100])

# List pages
for page in doc.list(Page):
    print(f"Page {page.page_number}")

Document.status

Check document processing status:

doc = session.get(Document, name="New Upload")
print(doc.status)  # "processing", "success", "error", etc.

# Poll until ready
import time
while doc.status == "processing":
    time.sleep(5)
    doc = session.get(Document, id=doc.id)  # Refresh

Operator.list()

List annotations for an operator:

operator = session.get(Operator, name="sentiment")

# All annotations from this operator
for ann in operator.list(Annotation):
    print(ann.data)

# Filter by document
for ann in operator.list(Annotation, document=doc):
    print(ann.data)

Annotation relationships

Annotations have lazy-loaded relationships:

ann = session.get(Annotation, id="...")

# These fetch on access
parent_doc = ann.document
parent_chunk = ann.chunk
parent_page = ann.page
parent_operator = ann.operator

Complete Example

from ragnerock import create_engine, Session, Document, Annotation, Workflow

engine = create_engine("ragnerock://user@example.com:pass@api.ragnerock.com/research")

with Session(engine) as session:
    # Upload a document
    doc = Document(file_path="/data/10k.pdf", name="Company 10-K")
    session.create(doc)
    print(f"Created document: {doc.id}")

    # Wait for processing
    import time
    while doc.status == "processing":
        time.sleep(5)
        doc = session.get(Document, id=doc.id)

    # Run a workflow
    workflow = session.get(Workflow, name="Financial Analysis")
    job = session.run(workflow, documents=[doc])
    job.wait(timeout=600)

    # Query the results
    result = session.query(f"""
        SELECT revenue, net_income, risk_factors
        FROM financial_metrics
        WHERE document_id = '{doc.id}'
    """)

    for row in result.to_dict():
        print(f"Revenue: ${row['revenue']}M")
        print(f"Net Income: ${row['net_income']}M")
        print(f"Risks: {row['risk_factors']}")

Next Steps