#! /usr/bin/env python
# -*- coding: latin1 -*-
"""
defs - Basic definitions for adapya
===================================
The module defs.py in the adapya.base package defines basic functionality
for the adapya packages
- Abuf class (read/write buffer) provides byte buffers with read and write
access funtions
- Logging
- dummymutex for synchronizing printing and logging
"""
from __future__ import print_function # PY3
__version__ = 'v.r.l'
if __version__ == 'v.r.l':
_svndate='$Date: 2023-12-01 00:54:33 +0100 (Fri, 01 Dec 2023) $'
_svnrev='$Rev: 1072 $'
__version__ = 'Dev ' + _svnrev.strip('$') + \
' '.join(_svndate.strip('$').split()[0:3])
import ast, ctypes, logging, sys
debug = 0 # log extra information if set
if sys.hexversion < 0x03010100 and sys.platform == 'zos':
# Get helpers for string conversion between Latin1 and EBCDIC (cp037)
# for read() and write() with Abuf instances -
# for Rocket Python port: source is in Latin1 and system runs EBCDIC.
# No need for this in PY3 where strings are in unicode and
# decode() / encode() will do the job.
# todo: for other EBCDIC flavours these functions would need to be
# properly parameterized
from .conv import str2ebc, ebc2str
[docs]class ProgrammingError(Exception): pass
class DummyContext(object):
def __enter__(self):
pass
def __exit__(self,excty,excva,tt):
pass
"""adapya.base.defs.dummymutex is a dummy lock for
printing and logging in the Adabas api modules
override with a real lock object e.g. passed
as pmutex parameter to the Adabas class
E. g. with multi-processing
>>> pmutex = multiprocessing.Lock()
>>> with pmutex:
... print 'synchronized printing'
"""
dummymutex = DummyContext()
if sys.hexversion > 0x03010100:
# Python 3
def Abuf(init, encoding='utf-8', errors='strict'):
''' Create a read/write buffer similar to a bytearray.
This factory function returns a Cbuf object.
:param init: a buffer size > 0 or an initial nonempty
string or unicode string which determines the size
of the buffer.
:param encoding: standard encoding encode string to binary
string, default is 'utf-8'
Some Abuf use examples (Python2):
>>> a=Abuf(10)
>>> a.write('abc')
>>> a.write('DEFG')
>>> a.value
b'abcDEFG'
>>> a.raw
b'abcDEFG\\x00\\x00\\x00'
>>> a.tell()
7
>>> a.seek(0)
>>> a.read(3)
b'abc'
>>> a.read_text(4)
'DEFG'
>>> a[0]
b'a'
>>> a[0:3]
b'abc'
>>> a[3:]=b'1234567'
>>> a.value
b'abc1234567'
Some examples (Python3):
>>> e=Abuf(10,encoding='cp037')
>>> e.write('ABC')
>>> e.write(b'123')
>>> e.value
b'\\xc1\\xc2\\xc3123'
>>> e.raw
b'\\xc1\\xc2\\xc3123\\x00\\x00\\x00\\x00'
>>> e.seek(0)
>>> e.read_text(3)
'ABC'
>>> e.read(3)
b'123'
>>> e[0]
b'\xc1'
>>> e[0:3]
b'\xc1\xc2\xc3'
'''
size = 0
binit = b''
if isinstance(init, str):
binit = init.encode(encoding=encoding,errors=errors)
size = len(binit)
elif isinstance(init, (bytes,bytearray)):
size = len(init)
elif isinstance(init, (int, float)):
size=int(init)
else:
raise TypeError(init)
Cchar = ctypes.c_char * size
class Cbuf(Cchar):
_type_ = ctypes.c_char
_length_ = size
def __init__(self,size,encoding=encoding,errors=errors):
self.encoding = encoding
self.pos = 0
if size>0:
Cchar.__init__(self, b'\x00')
else:
raise ProgrammingError('Cbuf: Unable to allocate zero size buffer')
# add file methods to the object
def write(self, wstr):
""" Supports writing string or binary string """
p1 = self.pos
if isinstance(wstr, str):
bstr = wstr.encode(encoding=encoding,errors=errors)
else:
bstr = wstr
length = len(bstr)
if p1 + length > self._length_:
# c_char_Array size exceeded
length = self._length_ - p1
p2 = p1 + length
self[p1:p2] = bstr[0:length]
self.pos = p2
write_text = write # write_text same method as write PY3
def read(self, size):
" return bytes string from buffer from current position "
if size < 1:
return ''
p1 = self.pos
p2 = p1 + size
if p2 <= len(self):
self.pos=p2
return self[p1:p2]
else:
self.pos=len(self)
return self[p1:p2]
def read_text(self, size, encoding=encoding, errors=errors):
""" return string from buffer from current position """
if size < 1:
return ''
# determine size of one blank
ss = len(' '.encode(encoding=encoding,errors=errors))
p1 = self.pos
p2 = p1 + size*ss
if p2 <= len(self):
self.pos=p2
else:
self.pos=len(self)
return self[p1:p2].decode(encoding=encoding,errors=errors)
def seek(self, offset, where=0):
if where < 1: # from buffer start
np=offset
elif where == 1: # from current pos
np=self.pos+offset
else: # where=2 from the end
np=len(size)+offset
if np < 0:
self.pos=0
elif np < len(self):
self.pos=np
else:
self.pos=len(self)
def tell(self):
return self.pos
def buf2str(self):
"convert byte string in buffer to string and strip blanks and nul"
s = self[:].decode(encoding=encoding,errors=errors)
return s.strip(' \x00')
cc = Cbuf(size)
if isinstance(init, str):
cc.value=binit
if isinstance(init, (bytes,bytearray)):
cc.value=init
return cc
else:
# Python 2
[docs] def Abuf(init, encoding='utf-8', errors='strict'):
''' Create a read/write buffer similar to a bytearray.
This factory function returns a Cbuf object.
:param init: a buffer size > 0 or an initial nonempty
string or unicode string which determines the size
of the buffer.
Some Abuf use examples:
>>> a=Abuf(10)
>>> a.write('abc')
>>> a.write('DEFG')
>>> a.value
'abcDEFG'
>>> a.raw
'abcDEFG\\x00\\x00\\x00'
>>> a.tell()
7
>>> a.seek(0)
>>> a.read(3)
'abc'
>>> a.read_text(4)
u'DEFG'
>>> a[0]
'a'
>>> a[0:3]
'abc'
>>> a[3:]='1234567'
>>> a.value
'abc1234567'
'''
size = 0
istr=''
if isinstance(init, str):
istr=init
size = len(init)
elif isinstance(init, unicode):
istr=init.encode(encoding,errors)
size = len(init)
elif isinstance(init, (int, long)):
size=init
else:
raise TypeError(init)
Cchar = ctypes.c_char * size
class Cbuf(Cchar):
_type_ = ctypes.c_char
_length_ = size
def __init__(self,size):
self.encoding = encoding
self.pos = 0
if size > 0:
Cchar.__init__(self, '\x00')
else:
raise ProgrammingError('Cbuf: Unable to allocate zero size buffer')
if u' '.encode(encoding) == '\x40': # and sys.platform=='zos':
self.ebcdic = 1 # target is ebcdic encoding
else:
self.ebcdic = 0
# add file methods to the object
def write(self, wstr):
p1 = self.pos
if isinstance(wstr, unicode):
bstr = wstr.encode(encoding,errors)
# elif isinstance(wstr, str) and self.ebcdic:
# bstr = unicode(wstr).encode(encoding,errors)
## bstr = str2ebc(wstr)
else:
bstr = wstr
length = len(bstr)
if p1 + length > self._length_:
# c_char_Array size exceeded
length = self._length_ - p1
p2 = p1 + length
self[p1:p2] = bstr[0:length]
self.pos = p2
def write_text(self, wstr):
p1 = self.pos
if isinstance(wstr, unicode):
bstr = wstr.encode(encoding,errors)
elif isinstance(wstr, str) and self.ebcdic:
bstr = unicode(wstr).encode(encoding,errors)
## bstr = str2ebc(wstr)
else:
bstr = wstr
length = len(bstr)
if p1 + length > self._length_:
# c_char_Array size exceeded
length = self._length_ - p1
p2 = p1 + length
self[p1:p2] = bstr[0:length]
self.pos = p2
def read(self, size):
if size < 1:
return ''
p1 = self.pos
p2 = p1 + size
if p2 <= len(self):
self.pos=p2
else:
self.pos=len(self)
rs = self[p1:p2]
if self.ebcdic:
return ebc2str(rs)
else:
return rs
def read_text(self, size, encoding=encoding, errors=errors):
""" return unicode string from buffer from current position """
if size < 1:
return u''
# determine size of one blank
ss = len(u' '.encode(encoding,errors))
p1 = self.pos
p2 = p1 + size*ss
if p2 <= len(self):
self.pos=p2
else:
self.pos=len(self)
return self[p1:p2].decode(encoding,errors)
def seek(self, offset, where=0):
if where < 1: # from buffer start
np=offset
elif where == 1: # from current pos
np=self.pos+offset
else: # where=2 from the end
np=len(size)+offset
if np < 0:
self.pos=0
elif np < len(self):
self.pos=np
else:
self.pos=len(self)
def tell(self):
return self.pos
def buf2str(self):
"Convert byte string in buffer to string and strip blanks and nul"
if self.ebcdic:
s = ebc2str(self[:])
else:
s = self[:]
return s.strip(' \x00') # remove blanks NUL at end
cc = Cbuf(size)
if isinstance(init, (str, unicode)):
cc.value=istr
return cc
[docs]def evalb(s):
""" Evaluates string for single byte escape sequences
:returns: byte string
>>> evalb('ABC\xe4\xff')
b'ABCäÿ'
.. note:: In PY3 no unicode escapes can be used,
use evals() for that purpose.
"""
b2="b'%s'" % s
return ast.literal_eval(b2)
[docs]def evals(s):
""" Evaluates string for unicode escape sequences
:returns: string
>>> evals('ABC\u00e4\u00ff')
'ABCäÿ'
>>> evals('ABC\xe4\xff')
'ABCäÿ'
.. note::
In PY3 no escaped single bytes can be defined in strings
- they are mapped to unicode characters.
use evalb() for that purpose.
"""
s2="'%s'" % s
return ast.literal_eval(s2)
LOGCMD=1 #0x01
LOGBEFORE=1<<1 #0x02
LOGCB=1<<2 #0x04
LOGFB=1<<3 #0x08
LOGRB=1<<4 #0x10
LOGSB=1<<5 #0x20
LOGVB=1<<6 #0x40
LOGIB=1<<7 #0x80
LOGMB=1<<8 #0x0100 # Multifetch buffer
LOGPB=1<<9 #0x0200 # Performance buffer
LOGUB=1<<10 #0x0400
LOGAOS=1<<16 #0x10000
LOGRSP=1<<17 #0x20000
LOGBUF=1<<18 #0x40000 LOG all buffers
LOGSP=1<<19 #0x80000 LOG depending on command option (command table option)
LOGTCP=1<<20 #0x0100000 log on TCP level
logopt = LOGRSP|LOGCB # global log parameter for current log setting
logstr = '' # log options printable last set
# define a Handler which writes DEBUG messages or higher to the sys.stderr
# changed to stdout so that nosetests can capture the logged data
# and output it only on error (2011_1102)
# leaving it on sys.stderr would write the full log even for successful tests
console = logging.StreamHandler(sys.stdout) #default is sys.stderr
#console.setLevel(logging.DEBUG)
# format for simple console use
#formatter = logging.Formatter('adalog: %(message)s')
## tell the handler to use this format
#console.setFormatter(formatter)
# add the handler to the root logger
rootlog = logging.getLogger('')
if not rootlog.handlers:
rootlog.addHandler(console)
adalog = logging.getLogger('adalog')
adalog.setLevel(logging.DEBUG)
# adalog.propagate = False # avoid duplicate messages
[docs]def log(logparm=None):
""" set adapya.base interface log options
"""
global logopt,adalog,logstr
from .datamap import flag_str
if logparm == None:
pass
elif logparm != logopt:
logopt=logparm
if debug:
if logopt == 0:
logstr='off'
else:
logstr = flag_str(logopt, ((LOGCMD,'LOGCMD'),
(LOGBEFORE,'LOGBEFORE'),(LOGCB,'LOGCB'),(LOGFB,'LOGFB'),
(LOGRB,'LOGRB'),(LOGSB,'LOGSB'),(LOGVB,'LOGVB'),
(LOGIB,'LOGIB'),(LOGAOS,'LOGAOS'),(LOGRSP,'LOGRSP'),
(LOGMB,'LOGMB'),(LOGPB,'LOGPB'),(LOGBUF,'LOGBUF'),
(LOGSP,'LOGSP'),(LOGTCP,'LOGTCP'),
))
adalog.info('Adapya logging: %s\n' % logstr)
if __name__ == "__main__":
import doctest
doctest.testmod()
# Copyright 2004-ThisYear Software AG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.