Package rdkit :: Package Dbase :: Module DbResultSet
[hide private]
[frames] | no frames]

Source Code for Module rdkit.Dbase.DbResultSet

  1  # 
  2  #  Copyright (C) 2003  Greg Landrum and Rational Discovery LLC 
  3  # 
  4  """ defines class _DbResultSet_ for lazy interactions with Db query results 
  5   
  6  **Note** 
  7   
  8    this uses the Python iterator interface, so you'll need python 2.2 or above. 
  9   
 10  """ 
 11  from __future__ import print_function 
 12  import sys 
 13  from rdkit.Dbase import DbInfo 
 14   
 15   
16 -class DbResultBase(object):
17
18 - def __init__(self, cursor, conn, cmd, removeDups=-1, transform=None, extras=None):
19 self.cursor = cursor 20 self.removeDups = removeDups 21 self.transform = transform 22 self.cmd = cmd 23 self.conn = conn 24 self.extras = extras 25 self.Reset() 26 self._initColumnNamesAndTypes() 27 self.Reset()
28
29 - def Reset(self):
30 """ implement in subclasses 31 32 """ 33 try: 34 if not self.extras: 35 self.cursor.execute(self.cmd) 36 else: 37 self.cursor.execute(self.cmd, self.extras) 38 except Exception: 39 sys.stderr.write('the command "%s" generated errors:\n' % (self.cmd)) 40 import traceback 41 traceback.print_exc()
42
43 - def __iter__(self):
44 self.Reset() 45 return self
46
47 - def _initColumnNamesAndTypes(self):
48 self.colNames = [] 49 self.colTypes = [] 50 for cName, cType in DbInfo.GetColumnInfoFromCursor(self.cursor): 51 self.colNames.append(cName) 52 self.colTypes.append(cType) 53 self.colNames = tuple(self.colNames) 54 self.colTypes = tuple(self.colTypes)
55
56 - def GetColumnNames(self):
57 return self.colNames
58
59 - def GetColumnTypes(self):
60 return self.colTypes
61
62 - def GetColumnNamesAndTypes(self):
63 return tuple(nt for nt in zip(self.colNames, self.colTypes))
64 65
66 -class DbResultSet(DbResultBase):
67 """ Only supports forward iteration """ 68
69 - def __init__(self, *args, **kwargs):
70 DbResultBase.__init__(self, *args, **kwargs) 71 self.seen = [] 72 self._stopped = 0
73
74 - def Reset(self):
75 self._stopped = 0 76 DbResultBase.Reset(self)
77
78 - def next(self):
79 if self._stopped: 80 raise StopIteration 81 r = None 82 while r is None: 83 r = self.cursor.fetchone() 84 if not r: 85 self._stopped = 1 86 raise StopIteration 87 if self.transform is not None: 88 r = self.transform(r) 89 if self.removeDups >= 0: 90 v = r[self.removeDups] 91 if v in self.seen: 92 r = None 93 else: 94 self.seen.append(v) 95 return r
96 97 __next__ = next # PY3
98 99
100 -class RandomAccessDbResultSet(DbResultBase):
101 """ Supports random access """ 102
103 - def __init__(self, *args, **kwargs):
104 DbResultBase.__init__(self, *args, **kwargs) 105 self.results = [] 106 self.seen = [] 107 self._pos = -1
108
109 - def Reset(self):
110 self._pos = -1 111 if self.cursor is not None: 112 DbResultBase.Reset(self)
113
114 - def _finish(self):
115 if self.cursor: 116 r = self.cursor.fetchone() 117 while r: 118 if self.transform is not None: 119 r = self.transform(r) 120 if self.removeDups >= 0: 121 v = r[self.removeDups] 122 if v not in self.seen: 123 self.seen.append(v) 124 self.results.append(r) 125 else: 126 self.results.append(r) 127 r = self.cursor.fetchone() 128 self.cursor = None
129
130 - def __getitem__(self, idx):
131 if idx < 0: 132 raise IndexError("negative indices not supported") 133 if self.cursor is None: 134 if len(self.results): 135 if idx >= len(self.results): 136 raise IndexError('index %d too large (%d max)' % (idx, len(self.results))) 137 else: 138 raise ValueError('Invalid cursor') 139 140 while idx >= len(self.results): 141 r = None 142 while r is None: 143 r = self.cursor.fetchone() 144 if not r: 145 self.cursor = None 146 raise IndexError('index %d too large (%d max)' % (idx, len(self.results))) 147 148 if self.transform is not None: 149 r = self.transform(r) 150 if self.removeDups >= 0: 151 v = r[self.removeDups] 152 if v in self.seen: 153 r = None 154 else: 155 self.results.append(r) 156 self.seen.append(v) 157 else: 158 self.results.append(r) 159 160 return self.results[idx]
161
162 - def __len__(self):
163 if self.results is None: 164 raise ValueError("len() not supported for noMemory Results Sets") 165 self._finish() 166 return len(self.results)
167
168 - def next(self):
169 self._pos += 1 170 res = None 171 if self._pos < len(self): 172 res = self.results[self._pos] 173 else: 174 raise StopIteration 175 return res
176 177 __next__ = next # PY3
178 179 180 if __name__ == '__main__': # pragma: nocover 181 from rdkit.Dbase.DbConnection import DbConnect 182 conn = DbConnect('TEST.GDB') 183 curs = conn.GetCursor() 184 print('curs:', repr(curs)) 185 curs.execute('select * from ten_elements') 186 resultSet = RandomAccessDbResultSet(curs) 187 for i in range(12): 188 try: 189 val = resultSet[i] 190 except IndexError: 191 assert i >= 10 192 193 print('use len') 194 curs = conn.GetCursor() 195 curs.execute('select * from ten_elements') 196 resultSet = RandomAccessDbResultSet(curs) 197 for i in range(len(resultSet)): 198 val = resultSet[i] 199 200 print('use iter') 201 curs = conn.GetCursor() 202 curs.execute('select * from ten_elements') 203 resultSet = DbResultSet(curs) 204 for thing in resultSet: 205 ID, val = thing 206 207 print('dups') 208 curs = conn.GetCursor() 209 curs.execute('select * from ten_elements_dups') 210 resultSet = DbResultSet(curs) 211 r = [] 212 for thing in resultSet: 213 r.append(thing) 214 assert len(r) == 20 215 216 curs = conn.GetCursor() 217 curs.execute('select * from ten_elements_dups') 218 resultSet = DbResultSet(curs, removeDups=0) 219 r = [] 220 for thing in resultSet: 221 r.append(thing) 222 assert len(r) == 10 223 224 curs = conn.GetCursor() 225 curs.execute('select * from ten_elements_dups') 226 resultSet = RandomAccessDbResultSet(curs, removeDups=0) 227 assert len(resultSet) == 10 228 assert resultSet[0] == (0, 11) 229 230 curs = conn.GetCursor() 231 curs.execute('select * from ten_elements_dups') 232 resultSet = RandomAccessDbResultSet(curs, removeDups=0) 233 assert resultSet[0] == (0, 11) 234 assert resultSet[1] == (2, 21) 235 assert resultSet[5] == (10, 61) 236 237 curs = conn.GetCursor() 238 curs.execute('select * from ten_elements_dups') 239 resultSet = RandomAccessDbResultSet(curs) 240 assert resultSet[0] == (0, 11) 241 assert resultSet[1] == (0, 11) 242