Resources
Learn about Documents, Annotations, Operators, Workflows, and other SDK resources.
The Ragnerock SDK provides several resource types that represent data in the platform. Each resource is a Pydantic model with typed fields and convenience methods.
Document
A document represents an uploaded file (PDF, Word, spreadsheet, etc.) in Ragnerock.
Fields
| Field | Type | Description |
|---|---|---|
id | UUID | None | Server-assigned unique identifier |
name | str | Document name |
project_id | UUID | None | Parent project ID |
group_id | UUID | None | Optional document group |
created_at | str | None | Creation timestamp |
updated_at | str | None | Last update timestamp |
filesize | int | None | File size in bytes |
file_type | int | Internal file type code |
file_path | str | None | Local path (for upload only) |
Creating Documents
from ragnerock import Document
# Create with file path
doc = Document(
file_path="/path/to/report.pdf",
name="Q4 Earnings Report"
)
session.create(doc)
# After creation, server fields are populated
print(doc.id) # UUID('...')
print(doc.created_at) # '2024-01-15T10:30:00Z'
print(doc.filesize) # 1048576
Document Status
The status property polls the server for processing status:
doc = session.get(Document, name="New Upload")
print(doc.status) # DocumentStatus.PROCESSING
# Possible values:
# - DocumentStatus.PENDING - Queued for processing
# - DocumentStatus.PROCESSING - Currently being processed
# - DocumentStatus.SUCCESS - Processing complete
# - DocumentStatus.ERROR - Processing failed
# - DocumentStatus.UNKNOWN - Status unavailable
Document Jobs
The jobs property returns processing job details:
for job in doc.jobs:
print(f"Job {job['job_id']}: {job['status']}")
Listing Related Resources
# List annotations
for ann in doc.list(Annotation):
print(ann.data)
# Filter by operator
for ann in doc.list(Annotation, operator="sentiment"):
print(ann.data)
# List chunks
for chunk in doc.list(Chunk):
print(chunk.content[:100])
# List pages
for page in doc.list(Page):
print(f"Page {page.page_number}: {len(page.content)} chars")
Annotation
An annotation contains AI-generated structured data extracted from a document or chunk.
Fields
| Field | Type | Description |
|---|---|---|
root_id | UUID | None | Unique identifier (also accessible as id) |
schema_id | UUID | None | The operator (schema) that created this |
document_id | UUID | None | Parent document |
chunk_id | UUID | None | Parent chunk (if chunk-level) |
page_id | UUID | None | Parent page (if page-level) |
data | dict | The extracted data matching the schema |
confidence_score | float | None | Model confidence (0-1) |
created_at | str | None | Creation timestamp |
generation_metadata | dict | Model and generation details |
Accessing Annotation Data
ann = session.get(Annotation, id="...")
# The data field contains the extracted values
print(ann.data)
# {'sentiment': 0.85, 'topics': ['revenue', 'growth'], 'risk_level': 'low'}
# Access individual fields
print(ann.data['sentiment'])
print(ann.data.get('topics', []))
Lazy-Loaded Relationships
Annotations have properties that fetch related resources on demand:
ann = session.get(Annotation, id="...")
# These make API calls when accessed
parent_doc = ann.document # Document object
parent_chunk = ann.chunk # Chunk object (or None)
parent_page = ann.page # Page object (or None)
operator = ann.operator # Operator object
Creating Annotations
from ragnerock import Annotation
# Create annotation on a document
ann = Annotation(
schema_id=operator.id,
document_id=doc.id,
data={"sentiment": 0.9, "summary": "Positive outlook"},
confidence_score=0.95
)
session.create(ann)
# Create annotation on a chunk
ann = Annotation(
schema_id=operator.id,
chunk_id=chunk.id,
data={"entity": "Apple Inc.", "type": "company"}
)
session.create(ann)
Operator
An operator defines an AI extraction task with a JSON schema for outputs.
Fields
| Field | Type | Description |
|---|---|---|
id | UUID | None | Unique identifier |
project_id | UUID | None | Parent project |
name | str | Operator name |
description | str | None | Human-readable description |
jsonschema | dict | JSON Schema for output structure |
generation_prompt | str | Instructions for the AI model |
scope | ChunkType | Processing granularity |
created_at | str | None | Creation timestamp |
Listing Operators
from ragnerock import Operator
for op in session.list(Operator):
print(f"{op.name}: {op.description}")
print(f" Scope: {op.scope.name}")
print(f" Schema: {op.jsonschema}")
Listing Annotations for an Operator
operator = session.get(Operator, name="sentiment_analysis")
# All annotations from this operator
for ann in operator.list(Annotation):
print(ann.data)
# Filter by document
for ann in operator.list(Annotation, document=doc):
print(ann.data)
Operator Scope
The scope field determines what level of text the operator processes:
from ragnerock import ChunkType
# ChunkType values:
# - ChunkType.DOCUMENT (0) - Entire document
# - ChunkType.PAGE (1) - Individual pages
# - ChunkType.PARAGRAPH (2) - Paragraphs
# - ChunkType.SENTENCE (3) - Sentences
Workflow
A workflow is a DAG (directed acyclic graph) of operators that process documents.
Fields
| Field | Type | Description |
|---|---|---|
id | UUID | None | Unique identifier |
project_id | UUID | None | Parent project |
name | str | Workflow name |
description | str | None | Human-readable description |
is_active | bool | Whether the workflow is enabled |
auto_run_on_upload | bool | Run automatically on new documents |
operators | list[WorkflowOperator] | Nodes in the workflow DAG |
execution_order | list[str] | Order of operator execution |
Listing Workflows
from ragnerock import Workflow
for wf in session.list(Workflow):
print(f"{wf.name}: {wf.description}")
print(f" Active: {wf.is_active}")
print(f" Auto-run: {wf.auto_run_on_upload}")
print(f" Operators: {[op.operator_name for op in wf.operators]}")
Running Workflows
workflow = session.get(Workflow, name="Financial Analysis")
docs = session.list(Document).limit(5).all()
job = session.run(workflow, documents=docs)
job.wait(timeout=600)
Job
A job tracks the execution of a workflow on documents.
Fields
| Field | Type | Description |
|---|---|---|
id | UUID | None | Job identifier |
document_id | UUID | None | Document being processed |
status_code | int | Numeric status code |
status_message | str | None | Human-readable status message |
phase | str | None | Current processing phase |
Job Status
from ragnerock import JobStatus
job = session.run(workflow, documents=[doc])
# status property returns human-readable string
print(job.status) # "NOT_STARTED", "IN_PROGRESS", "SUCCEEDED", "FAILED"
# JobStatus enum values:
# - JobStatus.NOT_STARTED (1)
# - JobStatus.IN_PROGRESS (2)
# - JobStatus.SUCCEEDED (3)
# - JobStatus.FAILED (4)
Waiting for Completion
job = session.run(workflow, documents=[doc])
# Block until complete (with timeout)
try:
job.wait(timeout=300, poll_interval=2.0)
except TimeoutError:
print("Job did not complete in time")
# Check result
if job.status == "SUCCEEDED":
print("Job completed successfully")
else:
print(f"Job failed: {job.status_message}")
Chunk
A chunk is a text segment within a document (paragraph, sentence, etc.).
Fields
| Field | Type | Description |
|---|---|---|
id | UUID | None | Unique identifier |
document_id | UUID | None | Parent document |
content | str | Text content |
document_offset | int | Position in document |
chunk_type | ChunkType | Granularity level |
start_char_idx | int | None | Start character index |
end_char_idx | int | None | End character index |
Working with Chunks
from ragnerock import Chunk
# List chunks for a document
for chunk in doc.list(Chunk):
print(f"[{chunk.chunk_type.name}] {chunk.content[:100]}...")
# Get a specific chunk
chunk = session.get(Chunk, id="...")
print(chunk.content)
print(f"Position: {chunk.start_char_idx}-{chunk.end_char_idx}")
Page
A page represents a single page within a document.
Fields
| Field | Type | Description |
|---|---|---|
id | UUID | None | Unique identifier |
document_id | UUID | None | Parent document |
page_number | int | None | Page number (1-indexed) |
content | str | None | Extracted text content |
Working with Pages
from ragnerock import Page
# List pages for a document (in order)
for page in doc.list(Page):
print(f"Page {page.page_number}:")
print(page.content[:200])
print("---")
# Get a specific page
page = session.get(Page, id="...")
print(f"Page {page.page_number}: {len(page.content)} characters")
Enums
DocumentStatus
from ragnerock import DocumentStatus
DocumentStatus.PENDING # Queued for processing
DocumentStatus.PROCESSING # Currently processing
DocumentStatus.SUCCESS # Completed successfully
DocumentStatus.ERROR # Processing failed
DocumentStatus.UNKNOWN # Status unavailable
ChunkType
from ragnerock import ChunkType
ChunkType.DOCUMENT # 0 - Entire document
ChunkType.PAGE # 1 - Single page
ChunkType.PARAGRAPH # 2 - Paragraph
ChunkType.SENTENCE # 3 - Sentence
JobStatus
from ragnerock import JobStatus
JobStatus.NOT_STARTED # 1 - Not yet started
JobStatus.IN_PROGRESS # 2 - Currently running
JobStatus.SUCCEEDED # 3 - Completed successfully
JobStatus.FAILED # 4 - Failed
Next Steps
- Session — Learn about Session methods
- SQL Queries — Query annotation data
- Pagination — Work with large result sets