Session
Use Sessions to interact with Ragnerock resources in Python.
The Session is your gateway to interacting with Ragnerock. It provides methods to get, list, create, delete, query, and run resources within a project.
Basic Usage
Use Session as a context manager:
from ragnerock import create_engine, Session
engine = create_engine("ragnerock://user@example.com:pass@api.ragnerock.com/my_project")
with Session(engine) as session:
# All operations happen here
docs = session.list(Document).all()
The session authenticates when you enter the with block and handles cleanup when you exit.
Session Methods
get()
Retrieve a single resource by ID or name.
from ragnerock import Document, Operator, Workflow, Annotation, Chunk, Page
# Get by ID
doc = session.get(Document, id="550e8400-e29b-41d4-a716-446655440000")
# Get by name (for resources that support it)
doc = session.get(Document, name="Apple 10-K 2024")
operator = session.get(Operator, name="financial_metrics")
workflow = session.get(Workflow, name="SEC Analysis Pipeline")
# Returns None if not found
doc = session.get(Document, id="nonexistent")
if doc is None:
print("Document not found")
Supported resource types:
| Resource | Supports id= | Supports name= |
|---|---|---|
Document | Yes | Yes |
Operator | Yes | Yes |
Workflow | Yes | Yes |
Annotation | Yes | No |
Chunk | Yes | No |
Page | Yes | No |
list()
List resources with optional filters. Returns a PaginatedIterator.
from ragnerock import Document, Annotation, Chunk, Operator, Workflow
# List all documents
for doc in session.list(Document):
print(doc.name)
# Get all at once
all_docs = session.list(Document).all()
# Get first only
first_doc = session.list(Document).first()
# Limit results
recent = session.list(Document).limit(10).all()
Listing annotations requires a filter:
# By document
annotations = session.list(Annotation, document_id=doc.id).all()
# By chunk
annotations = session.list(Annotation, chunk_id=chunk.id).all()
# By schema (operator)
annotations = session.list(Annotation, schema_id=operator.id).all()
create()
Create a new resource. The resource is mutated in place with server-assigned fields.
from ragnerock import Document, Annotation
# Create a document
doc = Document(
file_path="/path/to/report.pdf",
name="Q4 Earnings Report"
)
session.create(doc)
# doc now has server-assigned fields
print(doc.id) # UUID assigned by server
print(doc.created_at) # Timestamp
print(doc.status) # Processing status
# Create an annotation
annotation = Annotation(
schema_id=operator.id,
document_id=doc.id,
data={"sentiment": 0.8, "topics": ["revenue", "growth"]}
)
session.create(annotation)
Supported for creation:
| Resource | Creatable |
|---|---|
Document | Yes |
Annotation | Yes |
Operator | No (use web app) |
Workflow | No (use web app) |
delete()
Delete a resource.
# Delete a document
doc = session.get(Document, name="Old Report")
if doc:
session.delete(doc)
# Delete an annotation
annotation = session.get(Annotation, id="...")
if annotation:
session.delete(annotation)
query()
Execute a SQL query on annotation data. Returns a QueryResult.
result = session.query("""
SELECT document_name, sentiment_score, key_topics
FROM sentiment_analysis
WHERE sentiment_score > 0.5
ORDER BY sentiment_score DESC
""")
# Access results
print(result.columns) # ['document_name', 'sentiment_score', 'key_topics']
print(result.row_count) # Number of rows
print(result.query_time_ms) # Execution time
# Convert to formats
data = result.to_dict() # List of dicts
df = result.to_pandas() # pandas DataFrame
# Limit results
result = session.query("SELECT * FROM large_table", limit=100)
See SQL Queries for more details.
run()
Trigger a workflow on documents. Returns a Job handle.
from ragnerock import Workflow, Document, Job
# Get the workflow
workflow = session.get(Workflow, name="Extract Financial Metrics")
# Get documents to process
docs = session.list(Document).limit(10).all()
# Run the workflow
job = session.run(workflow, documents=docs)
# Wait for completion
job.wait(timeout=300, poll_interval=2.0)
# Check status
print(job.status) # "SUCCEEDED" or "FAILED"
if job.status_message:
print(job.status_message)
See Resources for details on the Job class.
Working with Resources
Resources retrieved from a session have convenience methods:
Document.list()
List related resources for a document:
doc = session.get(Document, name="Apple 10-K")
# List all annotations
for ann in doc.list(Annotation):
print(ann.data)
# Filter by operator
for ann in doc.list(Annotation, operator="sentiment"):
print(ann.data)
# List chunks
for chunk in doc.list(Chunk):
print(chunk.content[:100])
# List pages
for page in doc.list(Page):
print(f"Page {page.page_number}")
Document.status
Check document processing status:
doc = session.get(Document, name="New Upload")
print(doc.status) # "processing", "success", "error", etc.
# Poll until ready
import time
while doc.status == "processing":
time.sleep(5)
doc = session.get(Document, id=doc.id) # Refresh
Operator.list()
List annotations for an operator:
operator = session.get(Operator, name="sentiment")
# All annotations from this operator
for ann in operator.list(Annotation):
print(ann.data)
# Filter by document
for ann in operator.list(Annotation, document=doc):
print(ann.data)
Annotation relationships
Annotations have lazy-loaded relationships:
ann = session.get(Annotation, id="...")
# These fetch on access
parent_doc = ann.document
parent_chunk = ann.chunk
parent_page = ann.page
parent_operator = ann.operator
Complete Example
from ragnerock import create_engine, Session, Document, Annotation, Workflow
engine = create_engine("ragnerock://user@example.com:pass@api.ragnerock.com/research")
with Session(engine) as session:
# Upload a document
doc = Document(file_path="/data/10k.pdf", name="Company 10-K")
session.create(doc)
print(f"Created document: {doc.id}")
# Wait for processing
import time
while doc.status == "processing":
time.sleep(5)
doc = session.get(Document, id=doc.id)
# Run a workflow
workflow = session.get(Workflow, name="Financial Analysis")
job = session.run(workflow, documents=[doc])
job.wait(timeout=600)
# Query the results
result = session.query(f"""
SELECT revenue, net_income, risk_factors
FROM financial_metrics
WHERE document_id = '{doc.id}'
""")
for row in result.to_dict():
print(f"Revenue: ${row['revenue']}M")
print(f"Net Income: ${row['net_income']}M")
print(f"Risks: {row['risk_factors']}")
Next Steps
- Resources — Learn about Document, Annotation, and other resources
- SQL Queries — Query annotation data
- Pagination — Work with large result sets
- Error Handling — Handle errors gracefully