-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove my postgres connection pools #149
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,69 +1,44 @@ | ||
from itertools import cycle | ||
from psycopg2.pool import ThreadedConnectionPool | ||
from psycopg2.extras import register_hstore, register_json | ||
import psycopg2 | ||
import threading | ||
import ujson as json | ||
|
||
|
||
class DatabaseCycleConnectionPool(object): | ||
|
||
""" | ||
Maintains a psycopg2 ThreadedConnectionPool for each of the | ||
given dbnames. When a client requests a set of connections, | ||
all of those connections will come from the same database. | ||
""" | ||
|
||
def __init__(self, min_conns_per_db, max_conns_per_db, dbnames, conn_info): | ||
self._pools = [] | ||
self._conns_to_pool = {} | ||
|
||
for dbname in dbnames: | ||
pool = ThreadedConnectionPool( | ||
min_conns_per_db, | ||
max_conns_per_db, | ||
dbname=dbname, | ||
**conn_info | ||
) | ||
self._pools.append(pool) | ||
|
||
self._pool_cycle = cycle(self._pools) | ||
self._lock = threading.Lock() | ||
|
||
def get_conns(self, n_conns): | ||
conns = [] | ||
|
||
try: | ||
with self._lock: | ||
pool_to_use = next(self._pool_cycle) | ||
for _ in range(n_conns): | ||
conn = pool_to_use.getconn() | ||
|
||
conn.set_session(readonly=True, autocommit=True) | ||
register_json(conn, loads=json.loads, globally=True) | ||
register_hstore(conn, globally=True) | ||
|
||
self._conns_to_pool[id(conn)] = pool_to_use | ||
conns.append(conn) | ||
assert len(conns) == n_conns, \ | ||
"Couldn't collect enough connections" | ||
except: | ||
if conns: | ||
self.put_conns(conns) | ||
conns = [] | ||
raise | ||
|
||
import ujson | ||
|
||
|
||
class DBAffinityConnectionsNoLimit(object): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking back at history I was expecting to see 3x as much content here, mostly based off this SHA: And then bringing some of the hstore / ujson changes in from: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We didn't use the other database pools, so I removed them. The ujson stuff is there. The globally=True thing I wanted to leave off in case it caused the connection problem. |
||
|
||
# Similar to the db affinity pool, but without keeping track of | ||
# the connections. It's the caller's responsibility to call us | ||
# back with the connection objects so that we can close them. | ||
|
||
def __init__(self, dbnames, conn_info): | ||
self.dbnames = cycle(dbnames) | ||
self.conn_info = conn_info | ||
self.conn_mapping = {} | ||
self.lock = threading.Lock() | ||
|
||
def _make_conn(self, conn_info): | ||
conn = psycopg2.connect(**conn_info) | ||
conn.set_session(readonly=True, autocommit=True) | ||
register_hstore(conn) | ||
register_json(conn, loads=ujson.loads) | ||
return conn | ||
|
||
def get_conns(self, n_conn): | ||
with self.lock: | ||
dbname = self.dbnames.next() | ||
conn_info_with_db = dict(self.conn_info, dbname=dbname) | ||
conns = [self._make_conn(conn_info_with_db) | ||
for i in range(n_conn)] | ||
return conns | ||
|
||
def put_conns(self, conns): | ||
with self._lock: | ||
for conn in conns: | ||
pool = self._conns_to_pool.pop(id(conn), None) | ||
assert pool is not None, \ | ||
"Couldn't find the pool for connection" | ||
pool.putconn(conn) | ||
for conn in conns: | ||
try: | ||
conn.close() | ||
except: | ||
pass | ||
|
||
def closeall(self): | ||
with self._lock: | ||
for pool in self._pools: | ||
pool.closeall() | ||
self._conns_to_pool.clear() | ||
raise Exception('DBAffinityConnectionsNoLimit pool does not track ' | ||
'connections') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
from psycopg2.extras import RealDictCursor | ||
from tilequeue.postgresql import DatabaseCycleConnectionPool | ||
from tilequeue.postgresql import DBAffinityConnectionsNoLimit | ||
from tilequeue.tile import calc_meters_per_pixel_dim | ||
from tilequeue.tile import coord_to_mercator_bounds | ||
from tilequeue.transform import calculate_padded_bounds | ||
|
@@ -150,15 +150,15 @@ def enqueue_queries(sql_conns, thread_pool, layer_data, zoom, unpadded_bounds): | |
|
||
class DataFetcher(object): | ||
|
||
def __init__(self, conn_info, layer_data, io_pool, n_conn, max_conn): | ||
def __init__(self, conn_info, layer_data, io_pool, n_conn): | ||
self.conn_info = dict(conn_info) | ||
self.layer_data = layer_data | ||
self.io_pool = io_pool | ||
|
||
self.dbnames = self.conn_info.pop('dbnames') | ||
self.dbnames_query_index = 0 | ||
self.sql_conn_pool = DatabaseCycleConnectionPool( | ||
n_conn, max_conn, self.dbnames, self.conn_info) | ||
self.sql_conn_pool = DBAffinityConnectionsNoLimit( | ||
self.dbnames, self.conn_info) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This section around line 160 used to read:
Where Similarly on 170 we used to say:
instead of proposed:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, that was fairly messy, so this is a cleanup of that. |
||
self.n_conn = n_conn | ||
|
||
def __call__(self, coord, layer_data=None): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM