logo
down
shadow

Database is not updated in Celery task with Flask and SQLAlchemy


Database is not updated in Celery task with Flask and SQLAlchemy

Content Index :

Database is not updated in Celery task with Flask and SQLAlchemy
Tag : python , By : DicksGarage
Date : November 27 2020, 09:01 AM

this one helps. Okay, I got it. stuff passed to process_stuff() is not attached to db.session. I have to make explicit request in process_stuff() to get the right stuff object like this:
@celery.task()
def process_stuff(stuff):
    # process stuff here

    my_stuff = Stuff.query.get(stuff.id)

    my_stuff.processed = True
    db.session.commit()

Comments
No Comments Right Now !

Boards Message :
You Must Login Or Sign Up to Add Your Comments .

Share : facebook icon twitter icon

How to use Flask-SQLAlchemy in a Celery task


Tag : python , By : Mforg
Date : March 29 2020, 07:55 AM
it should still fix some issue Update: We've since started using a better way to handle application teardown and set up on a per-task basis, based on the pattern described in the more recent flask documentation.
extensions.py
import flask
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

class FlaskCelery(Celery):

    def __init__(self, *args, **kwargs):

        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()

        if 'app' in kwargs:
            self.init_app(kwargs['app'])

    def patch_task(self):
        TaskBase = self.Task
        _celery = self

        class ContextTask(TaskBase):
            abstract = True

            def __call__(self, *args, **kwargs):
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)

        self.Task = ContextTask

    def init_app(self, app):
        self.app = app
        self.config_from_object(app.config)


celery = FlaskCelery()
db = SQLAlchemy()
from flask import Flask
from extensions import celery, db

def create_app():
    app = Flask()

    #configure/initialize all your extensions
    db.init_app(app)
    celery.init_app(app)

    return app
from flask import Flask

def create_app():
    app = Flask()

    initiliaze_extensions(app)

    return app

def initiliaze_extensions(app):
    from extensions import celery, db # DOOMED! Keep celery import at the FILE level

    db.init_app(app)
    celery.init_app(app)
celery worker -A app:celery -l info -f celery.log
from from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

db = SQLAlchemy()
celery = Celery()
from extensions import celery, db
from flask.globals import current_app
from celery.signals import task_postrun

@celery.task
def do_some_stuff():
    current_app.logger.info("I have the application context")
    #you can now use the db object from extensions

@task_postrun.connect
def close_session(*args, **kwargs):
    # Flask SQLAlchemy will automatically create new sessions for you from 
    # a scoped session factory, given that we are maintaining the same app
    # context, this ensures tasks have a fresh session (e.g. session errors 
    # won't propagate across tasks)
    db.session.remove()
from extensions import celery, db

def create_app():
    app = Flask()

    #configure/initialize all your extensions
    db.init_app(app)
    celery.config_from_object(app.config)

    return app
from app import create_app
from extensions import celery

app = create_app()

if __name__ == '__main__':
    with app.app_context():
        celery.start()

pushing celery task from flask view detach SQLAlchemy instances (DetachedInstanceError)


Tag : python , By : Tornike
Date : March 29 2020, 07:55 AM
This might help you ah my mistake in running task. it should be empty_task.apply_async()
calling it directly it creates new app context with new session causing closing old one.

Correctly managing postgresql connections in celery task for Flask-SQLAlchemy and Celery


Tag : python , By : Enrique Anaya
Date : March 29 2020, 07:55 AM
wish helps you Okay. I figured it out, for every process that's using an application context, you must use a new application context. Before, in my app/__init__.py I was simply creating the application globally like so:
from flask import Flask
app = Flask(__name__)
from myapp import create_app
from celery import Celery
def make_celery(app=None):
    app = app or create_app()
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_BACKEND'],
        broker=app.config['CELERY_BROKER_URL'],
    )
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

celery = make_celery()

Celery with Flask and Flask-SQLAlchemy cannot update a database record


Tag : python , By : Yohan Lee
Date : March 29 2020, 07:55 AM
I wish this help you In my question, I simplified the code substantially and when doing so I changed the reserved property to processed. My actual model looks more like this:
class Stuff(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    reserved = db.Column(db.Boolean)

Celery: unable to call task through flask, works outside of flask


Tag : development , By : user123585
Date : March 29 2020, 07:55 AM
hop of those help? Solution: The solution to have the shared_task work as expected was answered here: LINK Modifying celerz initization as:
from kombu import Queue
from celery import Celery


celery = Celery('LdapProvider',
                broker='amqp://admin:passwd@localhost:5672/dev1',
                backend='rpc')
                # include=['app.tasks.greeting_tasks'])
celery.conf.task_queues = (
    Queue("q1", routing_key="c1.q1"),
    Queue("q2", routing_key="c2.q2"),
)
celery.set_default()
[tasks]
  . app.tasks.greeting_tasks.say_hello
Related Posts Related QUESTIONS :
  • How to fix 'AttributeError: 'list' object has no attribute 'shape'' error in python with Tensorflow / Keras when loading
  • python - thread`s target is a method of an object
  • Retrieve Variable From Class
  • What is the reason for matplotlib for printing labels multiple times?
  • Why would people use ThreadPoolExecutor instead of direct function call?
  • When clear_widgets is called, it doesnt remove screens in ScreenManager
  • Python can't import function
  • Pieces doesn't stack after one loop on my connect4
  • How to change font size of all .docx document with python-docx
  • How to store a word with # in .cfg file
  • How to append dictionaries to a dictionary?
  • How can I scrape text within paragraph tag with some other tags then within the paragraph text?
  • Custom entity ruler with SpaCy did not return a match
  • Logging with two handlers - one to file and one to stderr
  • How to do pivot_table in dask with aggfunc 'min'?
  • This for loop displays only the last entry of the student record
  • How to split a string by a specific pattern in number of characters?
  • Python 3: how to scrape research results from a website using CSFR?
  • Setting the scoring parameter of RandomizedSeachCV to r2
  • How to send alert or message from view.py to template?
  • How to add qml ScatterSeries to existing qml defined ChartView?
  • Django + tox: Apps aren't loaded yet
  • My css and images arent showing in django
  • Probability mass function sum 2 dice roll?
  • Cannot call ubuntu 'ulimit' from python subprocess without using shell option
  • Dataframe Timestamp Filter for new/repeating value
  • Problem with clicking select2 dropdownlist in selenium
  • pandas dataframe masks to write values into new column
  • How to click on item in navigation bar on top of page using selenium python?
  • Add multiple EntityRuler with spaCy (ValueError: 'entity_ruler' already exists in pipeline)
  • error when replacing missing ')' using negative look ahead regex in python
  • Is there a way to remove specific strings from indexes using a for loop?
  • select multiple tags by position in beautifulSoup
  • pytest: getting AttributeError: 'CaptureFixture' object has no attribute 'readouterror' capturing stdout
  • Shipping PyGObject/GTK+ app on Windows with MingW
  • Python script to deduplicate lines in multiple files
  • How to prevent window and widgets in a pyqt5 application from changing size when the visibility of one widget is altered
  • How to draw stacked bar plot from df.groupby('feature')['label'].value_counts()
  • Python subprocess doesn't work without sleep
  • How can I adjust 'the time' in python with module Re
  • Join original np array with resulting np array in a form of dictionary? multidimensional array? etc?
  • Forcing labels on histograms in each individual graph in a figure
  • For an infinite dataset, is the data used in each epoch the same?
  • Is there a more efficent way to extend a string?
  • How to calculate each single element of a numpy array based on conditions
  • How do I change the width of Jupyter notebook's cell's left part?
  • Measure distance between lat/lon coordinates and utm coordinates
  • Installing megam for NLTK on Windows
  • filter dataframe on each value of a samn column have a specific value of another column in Panda\Python
  • Threading with pubsub throwing AssertionError: 'callableObj is not callable' in wxPython
  • Get grouped data from 2 dataframes with condition
  • How can I import all of sklearns regressors
  • How to take all elements except the first k
  • Whats wrong with my iteration list of lists from csv
  • Tensorflow Estimator API save image summary in eval mode
  • How to Pack with PyQt - how to make QFrame/Layout adapt to content
  • How do I get certain Time Range in Python
  • python doubly linked list - insertAfter node
  • Open .h5 file in Python
  • Joining a directory name with a binary file name
  • shadow
    Privacy Policy - Terms - Contact Us © scrbit.com