https://neuml.github.io/txtai/workflow/ [ ] [ ] Skip to content logo txtai Workflow ( ) ( ) [ ] Initializing search neuml/txtai logo txtai neuml/txtai * Home * Why txtai? * Installation * Examples * [ ] Embeddings Embeddings + Configuration + Methods + Query Guide * [ ] Pipeline Pipeline + [ ] Audio Audio o Transcription + [ ] Data Data o Segmentation o Tabular o Textractor + [ ] Image Image o Caption o Objects + [ ] Text Text o Extractor o Labels o Similarity o Summary o Translation + [ ] Train Train o HF ONNX o ML ONNX o Trainer * [*] Workflow Workflow + Task + File + Image + Retrieve + Service + Storage + Url + Workflow * [ ] API API + Configuration + Cluster + Docker * Further Reading Table of contents * Example * Configuration-driven example * Methods + __init__() + __call__() Workflow workflow workflow Workflows are a simple yet powerful construct that takes a callable and returns elements. Workflows operate well with pipelines but can work with any callable object. Workflows are streaming and work on data in batches, allowing large volumes of data to be processed efficiently. Given that pipelines are callable objects, workflows enable efficient processing of pipeline data. Transformers models typically work with smaller batches of data, workflows are well suited to feed a series of transformers pipelines. An example of the most basic workflow: workflow = Workflow([Task(lambda x: [y * 2 for y in x])]) list(workflow([1, 2, 3])) This example simply multiplies each input value and returns a outputs via a generator. Example A full-featured example is shown below. This workflow transcribes a set of audio files, translates the text into French and indexes the data. from txtai.embeddings import Embeddings from txtai.pipeline import Transcription, Translation from txtai.workflow import FileTask, Task, Workflow # Embeddings instance embeddings = Embeddings({ "path": "sentence-transformers/paraphrase-MiniLM-L3-v2", "content": True }) # Transcription instance transcribe = Transcription() # Translation instance translate = Translation() tasks = [ FileTask(transcribe, r"\.wav$"), Task(lambda x: translate(x, "fr")) ] # List of files to process data = [ "US_tops_5_million.wav", "Canadas_last_fully.wav", "Beijing_mobilises.wav", "The_National_Park.wav", "Maine_man_wins_1_mil.wav", "Make_huge_profits.wav" ] # Workflow that translate text to French workflow = Workflow(tasks) # Index data embeddings.index((uid, text, None) for uid, text in enumerate(workflow(data))) # Search embeddings.search("wildlife", 1) Configuration-driven example Workflows can be defined using Python as shown above but they can also run with YAML configuration. writable: true embeddings: path: sentence-transformers/paraphrase-MiniLM-L3-v2 content: true # Transcribe audio to text transcription: # Translate text between languages translation: workflow: index: tasks: - action: transcription select: "\\.wav$" task: file - action: translation args: ["fr"] - action: index # Create and run the workflow from txtai.api import API # Create and run the workflow app = API("workflow.yml") list(app.workflow("index", [ "US_tops_5_million.wav", "Canadas_last_fully.wav", "Beijing_mobilises.wav", "The_National_Park.wav", "Maine_man_wins_1_mil.wav", "Make_huge_profits.wav" ])) # Search app.search("wildlife") The code above executes a workflow defined in the file workflow.yml. The API is used to run the workflow locally, there is minimal overhead running workflows in this manner. It's a matter of preference. See the following links for more information. * Workflow Demo * Workflow YAML Examples * Workflow YAML Guide Methods Workflows are callable objects. Workflows take an input of iterable data elements and output iterable data elements. __init__(self, tasks, batch=100, workers=None) special Creates a new workflow. Workflows are lists of tasks to execute. Parameters: Name Type Description Default tasks list of workflow tasks required how many items to process at a time, defaults batch to 100 100 workers number of concurrent workers None Source code in txtai/workflow/base.py def __init__(self, tasks, batch=100, workers=None): """ Creates a new workflow. Workflows are lists of tasks to execute. Args: tasks: list of workflow tasks batch: how many items to process at a time, defaults to 100 workers: number of concurrent workers """ self.tasks = tasks self.batch = batch self.workers = workers # Set default number of executor workers to max number of actions in a task self.workers = max(len(task.action) for task in self.tasks) if not self.workers else self.workers __call__(self, elements) special Executes a workflow for input elements. Parameters: Name Type Description Default elements iterable data elements required Returns: Type Description transformed data elements Source code in txtai/workflow/base.py def __call__(self, elements): """ Executes a workflow for input elements. Args: elements: iterable data elements Returns: transformed data elements """ # Create execute instance for this run with Execute(self.workers) as executor: # Run task initializers self.initialize() # Process elements in batches for batch in self.chunk(elements): yield from self.process(batch, executor) # Run task finalizers self.finalize() Previous Trainer Next Task (c) NeuML LLC, Apache-2.0 License Made with Material for MkDocs