https://github.com/janbjorge/PgQueuer Skip to content Navigation Menu Toggle navigation Sign in * Product + Actions Automate any workflow + Packages Host and manage packages + Security Find and fix vulnerabilities + Codespaces Instant dev environments + GitHub Copilot Write better code with AI + Code review Manage code changes + Issues Plan and track work + Discussions Collaborate outside of code Explore + All features + Documentation + GitHub Skills + Blog * Solutions By size + Enterprise + Teams + Startups By industry + Healthcare + Financial services + Manufacturing By use case + CI/CD & Automation + DevOps + DevSecOps * Resources Topics + AI + DevOps + Security + Software Development Explore + Learning Pathways + White papers, Ebooks, Webinars + Customer Stories + Partners * Open Source + GitHub Sponsors Fund open source developers + The ReadME Project GitHub community articles Repositories + Topics + Trending + Collections * Enterprise + Enterprise platform AI-powered developer platform Available add-ons + Advanced Security Enterprise-grade security features + GitHub Copilot Enterprise-grade AI features + Premium Support Enterprise-grade 24/7 support * Pricing Search or jump to... Search code, repositories, users, issues, pull requests... Search [ ] Clear Search syntax tips Provide feedback We read every piece of feedback, and take your input very seriously. [ ] [ ] Include my email address so I can be contacted Cancel Submit feedback Saved searches Use saved searches to filter your results more quickly Name [ ] Query [ ] To see all available qualifiers, see our documentation. Cancel Create saved search Sign in Sign up Reseting focus You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session. You switched accounts on another tab or window. Reload to refresh your session. Dismiss alert {{ message }} janbjorge / PgQueuer Public * Notifications You must be signed in to change notification settings * Fork 6 * Star 320 PgQueuer is a Python library leveraging PostgreSQL for efficient job queuing. pgqueuer.readthedocs.io/en/latest/index.html License MIT license 320 stars 6 forks Branches Tags Activity Star Notifications You must be signed in to change notification settings * Code * Issues 2 * Pull requests 0 * Actions * Security * Insights Additional navigation options * Code * Issues * Pull requests * Actions * Security * Insights janbjorge/PgQueuer This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository. main BranchesTags Go to file Code Folders and files Name Name Last commit Last commit message date Latest commit History 128 Commits .github/workflows .github/workflows docs docs recipes recipes src/PgQueuer src/PgQueuer test test tools tools .gitignore .gitignore .readthedocs.yaml .readthedocs.yaml CONTRIBUTING.md CONTRIBUTING.md LICENSE LICENSE README.md README.md docker-compose.yml docker-compose.yml pyproject.toml pyproject.toml View all files Repository files navigation * README * MIT license Readme PgQueuer - Building Smoother Workflows One Queue at a Time CI pypi downloads versions --------------------------------------------------------------------- Documentation: Explore the Docs Source Code: View on GitHub Join the Discussion: Discord Community --------------------------------------------------------------------- PgQueuer PgQueuer is a minimalist, high-performance job queue library for Python, leveraging the robustness of PostgreSQL. Designed for simplicity and efficiency, PgQueuer uses PostgreSQL's LISTEN/NOTIFY to manage job queues effortlessly. Features * Simple Integration: Easy to integrate with existing Python applications using PostgreSQL. * Efficient Concurrency Handling: Utilizes PostgreSQL's FOR UPDATE SKIP LOCKED for reliable and concurrent job processing. * Real-time Notifications: Leverages LISTEN and NOTIFY for real-time updates on job status changes. Installation To install PgQueuer, simply install with pip the following command: pip install PgQueuer Example Usage Here's how you can use PgQueuer in a typical scenario processing incoming data messages: Write and run a consumer Start a long-lived consumer that will begin processing jobs as soon as they are enqueued by another process. In this case we want to be a bit more carefull as we want gracefull shutdowns, PgQueuer run will setup signals to ensure this. from __future__ import annotations import asyncpg from PgQueuer.db import AsyncpgDriver, dsn from PgQueuer.models import Job from PgQueuer.qm import QueueManager async def main() -> QueueManager: connection = await asyncpg.connect(dsn()) driver = AsyncpgDriver(connection) qm = QueueManager(driver) # Setup the 'fetch' entrypoint @qm.entrypoint("fetch") async def process_message(job: Job) -> None: print(f"Processed message: {job}") return qm python3 -m PgQueuer run tools.consumer.main Write and run a producer Start a short-lived producer that will enqueue 10,000 jobs. from __future__ import annotations import asyncio import sys import asyncpg from PgQueuer.db import AsyncpgDriver from PgQueuer.queries import Queries async def main(N: int) -> None: connection = await asyncpg.connect() driver = AsyncpgDriver(connection) queries = Queries(driver) await queries.enqueue( ["fetch"] * N, [f"this is from me: {n}".encode() for n in range(1, N+1)], [0] * N, ) if __name__ == "__main__": print(sys.argv) N = 1_000 if len(sys.argv) == 1 else int(sys.argv[1]) asyncio.run(main(N)) python3 tools/producer.py 10000 About PgQueuer is a Python library leveraging PostgreSQL for efficient job queuing. pgqueuer.readthedocs.io/en/latest/index.html Resources Readme License MIT license Activity Stars 320 stars Watchers 4 watching Forks 6 forks Report repository Releases 26 v0.6.1 Latest Aug 18, 2024 + 25 releases Packages 0 No packages published Contributors 2 * @janbjorge janbjorge JeyBee * @poksiala poksiala Languages * Python 98.9% * Other 1.1% Footer (c) 2024 GitHub, Inc. Footer navigation * Terms * Privacy * Security * Status * Docs * Contact * Manage cookies * Do not share my personal information You can't perform that action at this time.