
λ€μ΄κ°λ©°
μλ μ μ€λ¬΄μμ νμ΄μ¬μ μ¬μ©ν΄ λ©ν°νλ‘μΈμ± μμ μ μννλ©΄μ λ¬Έμ κ° λ°μν μ μ΄ μμμ΅λλ€.
νΉμ μμ μμ SQLAlchemy(νμ΄μ¬ ORM λΌμ΄λΈλ¬λ¦¬)μ DB 컀λ₯μ νμ μ¬μ©ν΄ λ°μ΄ν°λ₯Ό INSERTνλ λ‘μ§μ΄ μμλλ°, λ‘컬 κ°λ° νκ²½(μλμ°)μμλ μλ¬΄λ° λ¬Έμ κ° μμμ΅λλ€. κ·Έλ¬λ λμΌν μ½λλ₯Ό μ¬λ΄ κ°λ° μλ²(리λ μ€)μ λ°°ν¬νμ λ λ€μκ³Ό κ°μ μλ¬ λ‘κ·Έκ° λ°μνμ΅λλ€.
2024-12-21 17:57:00,471 [ERROR ] <root> [Process-21][Thread-140372676175552]: write_module:76: Error during bulk insert: ORA-01001: invalid cursor
ORA-03106: fatal two-task communication protocol error
DPI-1080: connection was closed by ORA-3135
μλ¬λ₯Ό μ΄ν΄λ³΄λ©΄, 컀μκ° μλͺ»λκ±°λ 컀λ₯μ μ΄ λ«νμμ μ μ μμ΅λλ€. νΉν SQLAlchemyκ° μ¬μ©νλ DB 컀λ₯μ νμ΄ λ¦¬λ μ€ νκ²½μμ μΆ©λμ μΌμΌμΌ°λ€λ μ μ΄ λ¬Έμ μ ν΅μ¬μΌλ‘ 보μμ΅λλ€.
κ·Έλ λ€λ©΄ μ κ°μ μ½λκ° OSμ λ°λΌ λ€λ₯΄κ² λμνλ κ±ΈκΉμ?
SQLAlchemy 컀λ₯μ νκ³Ό λ©ν°νλ‘μΈμ±
μ΄μ λν ννΈλ₯Ό SQLAlchemy 곡μ λ¬Έμμμ μ°Ύμλ³Ό μ μμμ΅λλ€.
https://docs.sqlalchemy.org/en/20/core/pooling.html#pooling-multiprocessing
Connection Pooling β SQLAlchemy 2.0 Documentation
Connection Pooling A connection pool is a standard technique used to maintain long running connections in memory for efficient re-use, as well as to provide management for the total number of connections an application might use simultaneously. Particularl
docs.sqlalchemy.org
Itβs critical that when using a connection pool, and by extension when using an Engine created via create_engine(), that the pooled connections are not shared to a forked process.
(...μ€λ΅)
The SQLAlchemy Engine object refers to a connection pool of existing database connections. So when this object is replicated to a child process, the goal is to ensure that no database connections are carried over.
곡μ λ¬Έμμμ SQLAlchemyμ μμ§μ forkλ νλ‘μΈμ€μ κΈ°μ‘΄ λ°μ΄ν°λ² μ΄μ€μ 컀λ₯μ νμ μ°Έμ‘°νλ€κ³ μ€λͺ λμ΄ μμ΅λλ€. μ¦, μμ νλ‘μΈμ€λ₯Ό μμ±ν λ fork λ°©μμΌλ‘ λΆλͺ¨ νλ‘μΈμ€λ₯Ό 볡μ¬νκΈ° λλ¬Έμ λΆλͺ¨ νλ‘μΈμ€κ° κ°μ§κ³ μλ 컀λ₯μ νμ 곡μ νκ² λ©λλ€. μ΄λ‘ μΈν΄ μμ νλ‘μΈμ€μμ λΆλͺ¨ νλ‘μΈμ€μ 리μμ€λ₯Ό μ¬μ©νλ €λ€ μΆ©λμ΄ λ°μν©λλ€.
μ΄λ¬ν λ©ν°νλ‘μΈμ± νκ²½μμ λ¬Έμ λ₯Ό ν΄κ²°νκΈ° μν΄ λͺ κ°μ§ λ°©λ²μ μ μνκ³ μλλ° κ·Έ μ€ engine.dispose(close=False)
λ₯Ό μ¬μ©νλ κ²μ κΆμ₯νκ³ μμ΅λλ€.
μλ μμ μ½λμ κ°μ΄ μμ§ μ΄κΈ°ν μ Engine.dispose.close
μ λ§€κ°λ³μ κ°μ False
λ₯Ό μ λ¬νκ² λλ©΄, μμ νλ‘μΈμ€κ° λΆλͺ¨ νλ‘μΈμ€μ 컀λ₯μ
νμ 건λλ¦¬μ§ μκ³ μλ‘μ΄ μ»€λ₯μ
μ μμν©λλ€.
from multiprocessing import Pool
engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
def run_in_process(some_data_record):
with engine.connect() as conn:
conn.execute(text("..."))
def initializer():
"""ensure the parent proc's database connections are not touched
in the new connection pool"""
engine.dispose(close=False)
with Pool(10, initializer=initializer) as p:
p.map(run_in_process, data)
μ½λ μ€λͺ
- engine.dispose(close=False)
- μμ νλ‘μΈμ€μμ μλ‘μ΄ μ»€λ₯μ νμ μμ±νλλ‘ κ°μ ν©λλ€.
- close=Falseλ κΈ°μ‘΄ μ°κ²°μ λ«μ§ μκ³ μ 컀λ₯μ λ§ μμ±ν©λλ€.
- initializer ν¨μ
- initializerλ₯Ό ν΅ν΄ κ° μμ νλ‘μΈμ€μ μ΄κΈ°ν μ μμ§ λ¦¬μμ€λ₯Ό μ 리ν©λλ€.
- Pool μμ± μ μ΄κΈ°ν ν¨μ μ§μ
- multiprocessing.Poolμμ initializerλ₯Ό μ§μ νλ©΄ μμ νλ‘μΈμ€ μμ± μ μλμΌλ‘ νΈμΆλ©λλ€.
μλμ°μ 리λ μ€μ λ©ν°νλ‘μΈμ± μ°¨μ΄
κ·Έλ λ€λ©΄ μλμ°μμλ μ μ΄λ° λ¬Έμ κ° λ°μνμ§ μμλκ±ΈκΉμ?
κ·Έ μ΄μ λ νμ΄μ¬μμ λ©ν°νλ‘μΈμ±μ μννλ λ°©μμ΄ μ΄μ체μ μ λ°λΌ λ€λ₯΄κ² λμνκΈ° λλ¬Έμ λλ€.
- μλμ°:
spawn()
λ°©μμ μ¬μ©ν΄ μλ‘μ΄ νλ‘μΈμ€λ₯Ό λ 립μ μΌλ‘ μμ±ν©λλ€. μ΄ κ³Όμ μμ λΆλͺ¨ νλ‘μΈμ€μ 리μμ€λ₯Ό μλ‘ μ΄κΈ°νν©λλ€. - 리λ
μ€:
fork()
λ°©μμ μ¬μ©ν΄ λΆλͺ¨ νλ‘μΈμ€λ₯Ό 볡μ¬ν©λλ€. μ΄λ‘ μΈν΄ λΆλͺ¨ νλ‘μΈμ€μ λͺ¨λ 리μμ€, μ¦ SQLAlchemyμ 컀λ₯μ νλ 곡μ λ©λλ€.
μ΄ λμ μ°¨μ΄λ₯Ό κ°λ¨ν μ½λλ‘ νμΈν΄ λ³΄κ² μ΅λλ€.
import multiprocessing as mp
import random
val = random.random()
def simple_func():
print(val)
if __name__ == '__main__':
print('Before multiprocessing: ')
simple_func()
print('After multiprocessing:')
p = mp.Process(target=simple_func)
p.start()
p.join()
μλμ° μΆλ ₯ κ²°κ³Ό
Before multiprocessing:
0.7345451427940827
After multiprocessing:
0.23541224006190398
μλμ°λ μμ νλ‘μΈμ€κ° μμ ν λ
립μ μΈ μλ‘μ΄ κ³΅κ°μμ μμνλ―λ‘, random.random()
μΌλ‘ μμ±λ val
κ°μ 볡μ¬νμ§ μκ³ μμ νλ‘μΈμ€μμ μλ‘ μ΄κΈ°νλ©λλ€.
κ²°κ³Όμ μΌλ‘ λΆλͺ¨ νλ‘μΈμ€μ μμ νλ‘μΈμ€μ val
κ°μ΄ μλ‘ λ€λ¦
λλ€.
리λ μ€ μΆλ ₯ κ²°κ³Ό
Before multiprocessing:
0.19180520863603323
After multiprocessing:
0.19180520863603323
fork()λ‘ μΈν΄ λΆλͺ¨μ μμμ΄ λμΌν 리μμ€λ₯Ό 곡μ ν©λλ€. λΆλͺ¨ νλ‘μΈμ€μ λͺ¨λ λ©λͺ¨λ¦¬ μνλ₯Ό 볡μ¬νκΈ° λλ¬Έμ, λΆλͺ¨ νλ‘μΈμ€μμ μμ±λ val
κ°μ΄ μμ νλ‘μΈμ€μλ λμΌνκ² μ λ¬λ©λλ€.
κ²°κ³Όμ μΌλ‘ λΆλͺ¨ νλ‘μΈμ€μ μμ νλ‘μΈμ€μ val
κ°μ΄ λμΌν©λλ€.
SQLAlchemyμμλ λμΌν μλ¦¬λ‘ μΈν΄ λΆλͺ¨ νλ‘μΈμ€μ 컀λ₯μ νμ΄ μμ νλ‘μΈμ€μ 볡μ¬λλ©° OS κ°μ μ°¨μ΄κ° λ°μνλ κ²μ λλ€.
κ²°κ³Ό μμ½
- μλμ°μ 리λ
μ€μ λ©ν°νλ‘μΈμ± λ°©μμ λ€λ₯΄λ€.
- μλμ°λ
spawn()
λ°©μμ μ¬μ©ν΄ λΆλͺ¨ νλ‘μΈμ€μ λ 립λ μλ‘μ΄ νλ‘μΈμ€λ₯Ό μμ±νμ§λ§, 리λ μ€λfork()
λ°©μμΌλ‘ λΆλͺ¨ νλ‘μΈμ€λ₯Ό 볡μ¬ν΄ 리μμ€λ₯Ό 곡μ ν©λλ€. - μ΄ μ°¨μ΄λ‘ μΈν΄ 리λ μ€ νκ²½μμλ λΆλͺ¨ νλ‘μΈμ€μ 리μμ€κ° μμ νλ‘μΈμ€μμ μΆ©λμ μΌμΌν¬ μ μμ΅λλ€.
- μλμ°λ
- SQLAlchemyλ λ©ν°νλ‘μΈμ± νκ²½μ μν΄ dispose(close=False) λ°©μμ κΆμ₯νλ€.
engine.dispose(close=False)
λ₯Ό μ¬μ©ν΄ μμ νλ‘μΈμ€μμ μλ‘μ΄ μ»€λ₯μ νμ μμ±ν¨μΌλ‘μ¨ λ¬Έμ λ₯Ό ν΄κ²°ν μ μμμ΅λλ€.
μ°Έκ³
https://pythonforthelab.com/blog/differences-between-multiprocessing-windows-and-linux/
https://docs.sqlalchemy.org/en/20/core/pooling.html#pooling-multiprocessing
'π» κ°λ° > π₯οΈ μ΄μ체μ ' μΉ΄ν κ³ λ¦¬μ λ€λ₯Έ κΈ
νλ‘μΈμ€(Process)λ? (0) | 2022.12.12 |
---|---|
μΈν°λ½νΈλ 무μμ΄λ©° μ΄λ€ μν μ ν κΉ? (0) | 2022.11.23 |