Flask SQLAlchemy ile Postgresql Kullanımı ve Basit İşlemler
Python için sık kullanılan database çözümlerinden bir tanesi olan SQLAlchemy ile select, update, delete gibi birçok database işlemini kolaylıkla gerçekleştirebilirsiniz. Pek çok Python kütüphanesi gibi SQLAlchemy’de Flask ile uyumlu çalışacak bir versiyona dönüştürülmüştür. Bu yazıda Flask ile yazılmış uygulamanızı Flask-SQLAlchemy kullanarak nasıl çalıştırabileceğinizi anlatacağım.
Aşağıdaki gibi bir uygulama yapımız olduğunu varsayıyorum.
flask-app
├── templates
│ ├── index.html
├── app.py
├── config.py
└── requirements.txt
İlk olarak uygulamamızda (app.py) SQLAlchemy’yi ve config dosyasını import ediyoruz.
from flask_sqlalchemy import SQLAlchemy
import config
Bu uygulama için postgresql kullandım. Database bilgilerinin config.py dosyasında durdugunu varsayıyorum.
CONFIG = {
'postgresUrl':'localhost:5432',
'postgresUser':'dbuser',
'postgresPass':'123qwe123',
'postgresDb':'dbname',
}
Uygulamamızda database bilgilerini aşağıdaki gibi oluşturuyoruz. Postgresql için Python driver’ı olan psycopg’yi kullanıyoruz.
POSTGRES_URL = config.CONFIG['postgresUrl']
POSTGRES_USER = config.CONFIG['postgresUser']
POSTGRES_PASS = config.CONFIG['postgresPass']
POSTGRES_DB = config.CONFIG['postgresDb']
DB_URL = 'postgresql+psycopg2://{user}:{pw}@{url}/{db}'.format(user=POSTGRES_USER, pw=POSTGRES_PASS, url=POSTGRES_URL, db=POSTGRES_DB)
app.config['SQLALCHEMY_DATABASE_URI'] = DB_URLdb = SQLAlchemy(app)
Daha sonra tablolarımız için class’larımızı oluşturuyoruz. Burada örnek bir users tablosu oluşturdum:
class User(db.Model):
__tablename__ = "users"
user_id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(120), unique=True)
password = db.Column(db.String(120))
email = db.Column(db.String(120), unique=True)
def __init__(self, username, password, email):
self.username = username
self.password = bcrypt.generate_password_hash(password)
self.email = email
Aşağıda bir Flask uygulaması için basit bir register fonksiyon örneğini görebilirsiniz:
@app.route('/register', methods=['POST'])
def register_post():
username = request.form['username']
password = request.form['password']
email = request.form['email']
if not db.session.query(User).filter(User.username == username).count():
if not db.session.query(User).filter(User.email == email).count():
register_url_serializer = URLSafeTimedSerializer(app.config['SECRET_KEY'])
register_url = url_for(
'register_validation',
token=register_url_serializer.dumps(email, salt='password-reset-salt'),
username=register_url_serializer.dumps(username, salt='password-reset-salt'),
password=register_url_serializer.dumps(password, salt='password-reset-salt'),
_external=True)
subject = "App registration mail"
text = "Hello, to complete the registration please click on the link: " + register_url
message = """From: %s\nTo: %s\nSubject: %s\n\n%s
""" % (MAIL_USERNAME, ", ".join(email), subject, text)
sendMail(subject, text, message, email)
flash("Please check your email and click on the link to complete registration.")
return redirect(url_for('register'))
else:
flash("This mail address already has an account.")
return redirect(url_for('register'))
else:
if not db.session.query(User).filter(User.email == email).count():
flash("This username is taken.")
return redirect(url_for('register'))
else:
flash("You already have an account.")
return redirect(url_for('register'))
Bu örnekte bold yapılmış kısımlarda görebileceğiniz üzere öncelikle form’dan gelen bilgilerle bu username’in database’de var olup olmadığını kontrol ediyoruz. Burada önce db.session ile bir session oluşturuyoruz. Sonra db.session.query(User) ile User tablosunda bir select çalıştırıyoruz. Devamındaki .filter(User.username == username) kısmının ise database’de karşılığını where olarak düşünebilirsiniz. Burada form’dan bize gelen username bilgisi ile aslında database’de aşağıdaki komutu çalıştırıyoruz:
select * from users where username='elifcan';
En sonunda eklediğimiz .count() ile aşağıdaki komutu çalıştırıyoruz:
select count(*) from users where username='elifcan';
Yukarıdaki örnekte ilk adımda eğer bu username ile gelen count yani satır sayısı 0'dan fazla ise bu kullanıcı adı ile kullanıcı olduğu hatasını veriyor. Aynı şekilde bir alttaki adımda eğer bu mail adresi ile gelen count yani satır sayısı 0'dan fazla ise bu mail adresinin kullanımda olduğu hatasını veriyor.
Query
Burada SQLAlchemy bize çok geniş kullanım çeşitleri sunuyor. Eğer basit bir uygulama yazacaksanız all(), count(), first(), filter_by() gibi kullanımlar en çok kullanacaklarınız olacaktır. Diğer kullanımlar için Query Api dokümanına göz atabilirsiniz.
Aşağıdaki örnekte user tablosunda username ile yaptığımız aramada ilk gelen sonucu alıyoruz. Ardından da gelen satırdan username, email gibi bilgilere örnekteki gibi erişebiliyoruz:
user = User.query.filter_by(username="elifcan").first()
user_username = user.username
user_email = user.email
Aşağıdaki örnekte user tablosunda username ile yaptığımız aramada gelen tüm satırları alıyoruz. Bu bize user objelerinin olduğu bir liste döndürüyor. Bu liste içinde bir döngü ile tüm kullanıcılara ait bilgileri alabiliriz:
users = User.query.filter_by(username="elifcan").all()
for user in users:
user_username = user.username
Update
Database’de query gerçekleştirdikten sonra bir sütunu aşağıdaki gibi update edebiliriz:
user = User.query.filter_by(username="elifcan").first()
user.email = "newmail@gmail.com"
db.session.commit()
Add
Aşağıdaki örnekte form’dan gelen username, password ve email adresini kullanarak User tablosunda bir element oluşturuyoruz:
username = request.form['username']
password = request.form['password']
email = request.form['email']user = User(username, password, email)
db.session.add(user)
db.session.commit()
Delete
Aşağıdaki örnekte query ile arattığımız satırı delete ile silebiliyoruz:
user = User.query.filter_by(username="elifcan").first()
db.session.delete(user)
db.session.commit()
Ekstra
Flask, web template engine olarak Jinja’yı kullanıyor. Database’den sorguladığınız sonuçları html dosyasında göstermek için aşağıdaki örnekten faydalanabilirsiniz.
app.py:
@app.route('/users', methods=['GET'])
def get_users():
users = User.query.filter_by(username="elifcan").all()
return render_template("index.html", users=users)
index.html:
{% for user in users %}
<p>{{user.username}}</p>
{% endfor %}
Bu işlemler dışında SQLAlchemy çok daha fazla kullanım senaryosu sunuyor. Bütün bunlar için resmi dokümana göz atabilirsiniz.