Usage¶
Basic Example¶
Here's a simple example of how to use pypdown:
from pypdown import run_step
from pypdown.models import Step
from pydantic import BaseModel
from pathlib import Path
class StepParams(BaseModel):
input_file: Path = "input.txt"
output_file: Path = "output.txt"
final_file: Path = "final.txt"
def process_input(input_file: Path, output_file: Path, config: StepParams):
"""Process input file and create output file."""
output_file.write_text(input_file.read_text().upper())
def finalize_output(output_file: Path, final_file: Path, config: StepParams):
"""Process output file and create final file."""
final_file.write_text(f"Processed: {output_file.read_text()}")
config = StepParams()
# Define your pipeline tasks by reference to config field names
task_refs = [
{
"src": ["input_file"],
"dst": ["output_file"],
"fn": process_input,
},
{
"src": ["output_file"],
"dst": ["final_file"],
"fn": finalize_output,
},
]
# Create a Step
step = Step(name="Example Pipeline Step", task_refs=task_refs, config=config)
# Run the step
run_step(step)
This will create a pipeline that reads from input.txt
, processes it to create output.txt
, and then processes output.txt
to create final.txt
.
Defining Tasks¶
Tasks have three components:
src
: Input file pathsdst
: Output file pathsfn
: A callback function that performs the task
Rather than supplying the paths themselves in src
and dst
, pass the config
Pydantic model's field names.
Callback Functions¶
Callback functions should take the input and output file paths as keyword arguments,
along with a config
parameter for the Pydantic model that all the parameters are set from.
Running Steps¶
Use the run_step
function to execute a pipeline step.
This function will check for the existence of input files and the non-existence of output files before running each task.
For full implementation details, please refer to the API Reference.