Source code for erppeek

#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" erppeek.py -- OpenERP command line tool

Author: Florent Xicluna
(derived from a script by Alan Bell)
"""
from __future__ import with_statement

import _ast
import atexit
import collections
import csv
import functools
import optparse
import os
from pprint import pprint
import re
import sys
import time
import traceback
import warnings
try:                    # Python 3
    import configparser
    from threading import current_thread
    from xmlrpc.client import Fault, ServerProxy
    basestring = str
    int_types = int
    _DictWriter = csv.DictWriter
except ImportError:     # Python 2
    import ConfigParser as configparser
    from threading import currentThread as current_thread
    from xmlrpclib import Fault, ServerProxy
    int_types = int, long

    class _DictWriter(csv.DictWriter):
        """Unicode CSV Writer, which encodes output to UTF-8."""

        def writeheader(self):
            # Method 'writeheader' does not exist in Python 2.6
            header = dict(zip(self.fieldnames, self.fieldnames))
            self.writerow(header)

        def _dict_to_list(self, rowdict):
            rowlst = csv.DictWriter._dict_to_list(self, rowdict)
            return [cell.encode('utf-8') if hasattr(cell, 'encode') else cell
                    for cell in rowlst]


__version__ = '1.5.3'
__all__ = ['Client', 'Model', 'Record', 'RecordList', 'Service',
           'format_exception', 'read_config', 'start_openerp_services']

CONF_FILE = 'erppeek.ini'
HIST_FILE = os.path.expanduser('~/.erppeek_history')
DEFAULT_URL = 'http://localhost:8069'
DEFAULT_DB = 'openerp'
DEFAULT_USER = 'admin'
MAXCOL = [79, 179, 9999]    # Line length in verbose mode

USAGE = """\
Usage (some commands):
    models(name)                    # List models matching pattern
    model(name)                     # Return a Model instance
    model(name).keys()              # List field names of the model
    model(name).fields(names=None)  # Return details for the fields
    model(name).field(name)         # Return details for the field
    model(name).browse(domain)
    model(name).browse(domain, offset=0, limit=None, order=None)
                                    # Return a RecordList

    rec = model(name).get(domain)   # Get the Record matching domain
    rec.some_field                  # Return the value of this field
    rec.read(fields=None)           # Return values for the fields

    client.login(user)              # Login with another user
    client.connect(env)             # Connect to another env.
    client.modules(name)            # List modules matching pattern
    client.upgrade(module1, module2, ...)
                                    # Upgrade the modules
"""

STABLE_STATES = ('uninstallable', 'uninstalled', 'installed')
DOMAIN_OPERATORS = frozenset('!|&')
# Supported operators are:
#   =, !=, >, >=, <, <=, like, ilike, in, not like, not ilike, not in, child_of
#   =like, =ilike (6.0), =? (6.0)
_term_re = re.compile(
    '([\w._]+)\s*'  '(=(?:like|ilike|\?)|[<>]=?|!?=(?!=)'
    '|(?<= )(?:like|ilike|in|not like|not ilike|not in|child_of))'  '\s*(.*)')
_fields_re = re.compile(r'(?:[^%]|^)%\(([^)]+)\)')

# Published object methods
_methods = {
    'db': ['create', 'drop', 'dump', 'restore', 'rename',
           'get_progress', 'list', 'list_lang',
           'change_admin_password', 'server_version', 'migrate_databases'],
    'common': ['about', 'login', 'timezone_get', 'get_server_environment',
               'login_message', 'check_connectivity'],
    'object': ['execute', 'exec_workflow'],
    'report': ['report', 'report_get'],
    'wizard': ['execute', 'create'],
}
_methods_6_1 = {
    'db': ['create_database', 'db_exist'],
    'common': ['get_stats', 'list_http_services', 'version',
               'authenticate', 'get_os_time', 'get_sqlcount'],
    'object': ['execute_kw'],
    'report': ['render_report'],
}
# Hidden methods:
#  - common: get_available_updates, get_migration_scripts, set_loglevel
_cause_message = ("\nThe above exception was the direct cause "
                  "of the following exception:\n\n")


# Simplified ast.literal_eval which does not parse operators
def _convert(node, _consts={'None': None, 'True': True, 'False': False}):
    if isinstance(node, _ast.Str):
        return node.s
    if isinstance(node, _ast.Num):
        return node.n
    if isinstance(node, _ast.Tuple):
        return tuple(map(_convert, node.elts))
    if isinstance(node, _ast.List):
        return list(map(_convert, node.elts))
    if isinstance(node, _ast.Dict):
        return dict([(_convert(k), _convert(v))
                     for (k, v) in zip(node.keys, node.values)])
    if hasattr(node, 'value') and str(node.value) in _consts:
        return node.value         # Python 3.4+
    if isinstance(node, _ast.Name) and node.id in _consts:
        return _consts[node.id]   # Python <= 3.3
    raise ValueError('malformed or disallowed expression')


def literal_eval(expression):
    node = compile(expression, '<unknown>', 'eval', _ast.PyCF_ONLY_AST)
    return _convert(node.body)


def is_list_of_dict(iterator):
    """Return True if the first non-false item is a dict."""
    for item in iterator:
        if item:
            return isinstance(item, dict)
    return False


[docs]def mixedcase(s, _cache={}): """Convert to MixedCase. >>> mixedcase('res.company') 'ResCompany' """ try: return _cache[s] except KeyError: _cache[s] = s = ''.join([w.capitalize() for w in s.split('.')]) return s
[docs]def lowercase(s, _sub=re.compile('[A-Z]').sub, _repl=(lambda m: '.' + m.group(0).lower()), _cache={}): """Convert to lowercase with dots. >>> lowercase('ResCompany') 'res.company' """ try: return _cache[s] except KeyError: _cache[s] = s = _sub(_repl, s).lstrip('.') return s
[docs]def format_exception(exc_type, exc, tb, limit=None, chain=True, _format_exception=traceback.format_exception): """Format a stack trace and the exception information. This wrapper is a replacement of ``traceback.format_exception`` which formats the error and traceback received by XML-RPC. If `chain` is True, then the original exception is printed too. """ values = _format_exception(exc_type, exc, tb, limit=limit) if ((issubclass(exc_type, Fault) and isinstance(exc.faultCode, basestring))): # Format readable 'Fault' errors (etype, __, msg) = exc.faultCode.partition('--') server_tb = None if etype.strip() != 'warning': msg = exc.faultCode if not msg.startswith('FATAL:'): server_tb = exc.faultString fault = '%s: %s\n' % (exc_type.__name__, msg.strip()) if chain: values = [server_tb or fault, _cause_message] + values values[-1] = fault else: values = [server_tb or fault] return values
[docs]def read_config(section=None): """Read the environment settings from the configuration file. The config file ``erppeek.ini`` contains a `section` for each environment. Each section provides parameters for the connection: ``host``, ``port``, ``database``, ``user`` and (optional) ``password``. Default values are read from the ``[DEFAULT]`` section. If the ``password`` is not in the configuration file, it is requested on login. Return a tuple ``(server, db, user, password or None)``. Without argument, it returns the list of configured environments. """ p = configparser.SafeConfigParser() with open(Client._config_file) as f: p.readfp(f) if section is None: return p.sections() env = dict(p.items(section)) scheme = env.get('scheme', 'http') if scheme == 'local': server = (scheme, env.get('options', '')) else: server = '%s://%s:%s' % (scheme, env['host'], env['port']) return (server, env['database'], env['username'], env.get('password'))
[docs]def start_openerp_services(options=None, appname=None): """Initialize the OpenERP services. Import the ``openerp`` package and load the OpenERP services. The argument `options` receives the command line arguments for ``openerp``. Example: ``"-c /path/to/openerp-server.conf --without-demo all"``. Return the openerp module. """ import openerp if not openerp.osv.osv.service: os.environ['TZ'] = 'UTC' if appname is not None: os.environ['PGAPPNAME'] = appname options = options.split() if options else [] openerp.tools.config.parse_config(options) if openerp.release.version_info < (7,): openerp.netsvc.init_logger() openerp.osv.osv.start_object_proxy() openerp.service.web_services.start_web_services() else: openerp.service.start_internal() def close_all(): for db in openerp.modules.registry.RegistryManager.registries: openerp.sql_db.close_db(db) atexit.register(close_all) return openerp
[docs]def issearchdomain(arg): """Check if the argument is a search domain. Examples: - ``[('name', '=', 'mushroom'), ('state', '!=', 'draft')]`` - ``['name = mushroom', 'state != draft']`` - ``[]`` """ # These ones are supported but discouraged: # - 'state != draft' # - ('state', '!=', 'draft') return isinstance(arg, (list, tuple, basestring)) and not (arg and ( # Not a list of ids: [1, 2, 3] isinstance(arg[0], int_types) or # Not a list of ids as str: ['1', '2', '3'] (isinstance(arg[0], basestring) and arg[0].isdigit())))
[docs]def searchargs(params, kwargs=None, context=None): """Compute the 'search' parameters.""" if not params: return ([],) domain = params[0] if isinstance(domain, (basestring, tuple)): domain = [domain] warnings.warn('Domain should be a list: %s' % domain) elif not isinstance(domain, list): return params for (idx, term) in enumerate(domain): if isinstance(term, basestring) and term not in DOMAIN_OPERATORS: m = _term_re.match(term.strip()) if not m: raise ValueError('Cannot parse term %r' % term) (field, operator, value) = m.groups() try: value = literal_eval(value) except Exception: # Interpret the value as a string pass domain[idx] = (field, operator, value) if (kwargs or context) and len(params) == 1: params = (domain, kwargs.pop('offset', 0), kwargs.pop('limit', None), kwargs.pop('order', None), context) else: params = (domain,) + params[1:] return params
[docs]class Service(object): """A wrapper around XML-RPC endpoints. The connected endpoints are exposed on the Client instance. The `server` argument is the URL of the server (scheme+host+port). If `server` is an ``openerp`` module, it is used to connect to the local server. The `endpoint` argument is the name of the service (examples: ``"object"``, ``"db"``). The `methods` is the list of methods which should be exposed on this endpoint. Use ``dir(...)`` on the instance to list them. """ def __init__(self, server, endpoint, methods, verbose=False): if isinstance(server, basestring): self._rpcpath = rpcpath = server + '/xmlrpc/' proxy = ServerProxy(rpcpath + endpoint, allow_none=True) self._dispatch = proxy._ServerProxy__request if hasattr(proxy._ServerProxy__transport, 'close'): # >= 2.7 self.close = proxy._ServerProxy__transport.close else: self._rpcpath = '' proxy = server.netsvc.ExportService.getService(endpoint) self._dispatch = proxy.dispatch self._endpoint = endpoint self._methods = methods self._verbose = verbose def __repr__(self): return "<Service '%s%s'>" % (self._rpcpath, self._endpoint) __str__ = __repr__ def __dir__(self): return sorted(self._methods) def __getattr__(self, name): if name not in self._methods: raise AttributeError("'Service' object has no attribute %r" % name) if self._verbose: def sanitize(args): if self._endpoint != 'db' and len(args) > 2: args = list(args) args[2] = '*' return args maxcol = MAXCOL[min(len(MAXCOL), self._verbose) - 1] def wrapper(self, *args): snt = ', '.join([repr(arg) for arg in sanitize(args)]) snt = '%s.%s(%s)' % (self._endpoint, name, snt) if len(snt) > maxcol: suffix = '... L=%s' % len(snt) snt = snt[:maxcol - len(suffix)] + suffix print('--> ' + snt) res = self._dispatch(name, args) rcv = str(res) if len(rcv) > maxcol: suffix = '... L=%s' % len(rcv) rcv = rcv[:maxcol - len(suffix)] + suffix print('<-- ' + rcv) return res else: wrapper = lambda s, *args: s._dispatch(name, args) wrapper.__name__ = name return wrapper.__get__(self, type(self)) def __del__(self): if hasattr(self, 'close'): self.close()
[docs]class Client(object): """Connection to an OpenERP instance. This is the top level object. The `server` is the URL of the instance, like ``http://localhost:8069``. If `server` is an ``openerp`` module, it is used to connect to the local server (>= 6.1). The `db` is the name of the database and the `user` should exist in the table ``res.users``. If the `password` is not provided, it will be asked on login. """ _config_file = os.path.join(os.path.curdir, CONF_FILE) def __init__(self, server, db=None, user=None, password=None, verbose=False): if isinstance(server, basestring) and server[-1:] == '/': server = server.rstrip('/') self._server = server major_version = None def get_proxy(name): if major_version in ('5.0', None) or name == 'wizard': methods = _methods[name] else: # Only for OpenERP >= 6 methods = _methods[name] + _methods_6_1[name] return Service(server, name, methods, verbose=verbose) self.server_version = ver = get_proxy('db').server_version() self.major_version = major_version = '.'.join(ver.split('.', 2)[:2]) # Create the XML-RPC proxies self.db = get_proxy('db') self.common = get_proxy('common') self._object = get_proxy('object') self._report = get_proxy('report') self._wizard = get_proxy('wizard') if major_version < '7.0' else None self.reset() if db: # Try to login self._login(user, password=password, database=db) @classmethod
[docs] def from_config(cls, environment, verbose=False): """Create a connection to a defined environment. Read the settings from the section ``[environment]`` in the ``erppeek.ini`` file and return a connected :class:`Client`. See :func:`read_config` for details of the configuration file format. """ (server, db, user, password) = read_config(environment) if server[0] == 'local': appname = os.path.basename(__file__).rstrip('co') server = start_openerp_services(server[1], appname=appname) client = cls(server, db, user, password, verbose=verbose) client._environment = environment return client
def reset(self): self.user = self._environment = None self._db, self._models = (), {} self._execute = self._exec_workflow = None def __repr__(self): return "<Client '%s#%s'>" % (self._server or '', self._db)
[docs] def login(self, user, password=None, database=None): """Switch `user` and (optionally) `database`. If the `password` is not available, it will be asked. """ if database: dbs = self.db.list() if database not in dbs: print("Error: Database '%s' does not exist: %s" % (database, dbs)) return elif self._db: database = self._db else: print('Error: Not connected') return # Used for logging, copied from openerp.sql_db.db_connect current_thread().dbname = database (uid, password) = self._auth(database, user, password) if not uid: if not self._db: self._db = database print('Error: Invalid username or password') return if self._db != database: self.reset() self._db = database self.user = user # Authenticated endpoints def authenticated(method): return functools.partial(method, self._db, uid, password) self._execute = authenticated(self._object.execute) self._exec_workflow = authenticated(self._object.exec_workflow) self.report = authenticated(self._report.report) self.report_get = authenticated(self._report.report_get) if self.major_version != '5.0': # Only for OpenERP >= 6 self.execute_kw = authenticated(self._object.execute_kw) self.render_report = authenticated(self._report.render_report) if self._wizard: self._wizard_execute = authenticated(self._wizard.execute) self._wizard_create = authenticated(self._wizard.create) return uid # Needed for interactive use
connect = None _login = login _login.cache = {} def _check_valid(self, database, uid, password): execute = self._object.execute try: execute(database, uid, password, 'res.users', 'fields_get_keys') return True except Fault: return False def _auth(self, database, user, password): assert database cache_key = (self._server, database, user) if password: # If password is explicit, call the 'login' method uid = None else: # Read from cache uid, password = self._login.cache.get(cache_key) or (None, None) # Read from table 'res.users' if ((not uid and self._db == database and self.access('res.users', 'write'))): obj = self.read('res.users', [('login', '=', user)], 'id password') if obj: uid = obj[0]['id'] password = obj[0]['password'] else: # Invalid user uid = False # Ask for password if not password and uid is not False: from getpass import getpass password = getpass('Password for %r: ' % user) if uid: # Check if password changed if not self._check_valid(database, uid, password): if cache_key in self._login.cache: del self._login.cache[cache_key] uid = False elif uid is None: # Do a standard 'login' uid = self.common.login(database, user, password) if uid: # Update the cache self._login.cache[cache_key] = (uid, password) return (uid, password) @classmethod def _set_interactive(cls, global_vars={}): # Don't call multiple times del Client._set_interactive for name in ['__name__', '__doc__'] + __all__: global_vars[name] = globals()[name] def get_pool(db_name=None): """Return a model registry. Use get_pool(db_name).db.cursor() to grab a cursor. """ client = global_vars['client'] registry = client._server.modules.registry return registry.RegistryManager.get(db_name or client._db) def connect(self, env=None): """Connect to another environment and replace the globals().""" if env: # Safety measure: turn down the previous connection global_vars['client'].reset() client = self.from_config(env, verbose=self.db._verbose) else: client = self env = self._environment or self._db global_vars['client'] = client if hasattr(client._server, 'modules'): global_vars['get_pool'] = get_pool # Tweak prompt sys.ps1 = '%s >>> ' % (env,) sys.ps2 = '%s ... ' % (env,) # Logged in? if client.user: global_vars['model'] = client.model global_vars['models'] = client.models global_vars['do'] = client.execute print('Logged in as %r' % (client.user,)) else: global_vars.update({'model': None, 'models': None, 'do': None}) def login(self, user, password=None, database=None): """Switch `user` and (optionally) `database`.""" if self._login(user, password=password, database=database): # Register the new globals() self.connect() # Set hooks to recreate the globals() cls.login = login cls.connect = connect return global_vars
[docs] def create_database(self, passwd, database, demo=False, lang='en_US', user_password='admin'): """Create a new database. The superadmin `passwd` and the `database` name are mandatory. By default, `demo` data are not loaded and `lang` is ``en_US``. Wait for the thread to finish and login if successful. """ thread_id = self.db.create(passwd, database, demo, lang, user_password) progress = 0 try: while progress < 1: time.sleep(3) progress, users = self.db.get_progress(passwd, thread_id) # [1.0, [{'login': 'admin', 'password': 'admin', # 'name': 'Administrator'}]] self.login(users[0]['login'], users[0]['password'], database=database) except KeyboardInterrupt: print({'id': thread_id, 'progress': progress})
[docs] def execute(self, obj, method, *params, **kwargs): """Wrapper around ``object.execute`` RPC method. Argument `method` is the name of an ``osv.osv`` method or a method available on this `obj`. Method `params` are allowed. If needed, keyword arguments are collected in `kwargs`. """ assert self.user, 'Not connected' assert isinstance(obj, basestring) assert isinstance(method, basestring) and method != 'browse' context = kwargs.pop('context', None) ordered = single_id = False if method in ('read', 'name_get'): assert params if issearchdomain(params[0]): # Combine search+read search_params = searchargs(params[:1], kwargs, context) ordered = len(search_params) > 3 and search_params[3] ids = self._execute(obj, 'search', *search_params) elif isinstance(params[0], list): ordered = kwargs.pop('order', False) and params[0] ids = set(params[0]) ids.discard(False) if not ids and ordered: return [False] * len(ordered) ids = sorted(ids) else: single_id = True ids = [params[0]] if params[0] else False if not ids: return ids if len(params) > 1: params = (ids,) + params[1:] elif method == 'read': params = (ids, kwargs.pop('fields', None)) else: params = (ids,) elif method == 'search': # Accept keyword arguments for the search method params = searchargs(params, kwargs, context) context = None elif method == 'search_count': params = searchargs(params) elif method == 'perm_read': # broken with a single id (verified with 5.0 and 6.1) if params and isinstance(params[0], int_types): params = ([params[0]],) + params[1:] if context: params = params + (context,) # Ignore extra keyword arguments for item in kwargs.items(): print('Ignoring: %s = %r' % item) res = self._execute(obj, method, *params) if ordered: # The results are not in the same order as the ids # when received from the server resdic = dict([(val['id'], val) for val in res]) if not isinstance(ordered, list): ordered = ids res = [resdic.get(id_, False) for id_ in ordered] return res[0] if single_id else res
[docs] def exec_workflow(self, obj, signal, obj_id): """Wrapper around ``object.exec_workflow`` RPC method. Argument `obj` is the name of the model. The `signal` is sent to the object identified by its integer ``id`` `obj_id`. """ assert self.user, 'Not connected' assert isinstance(obj, basestring) and isinstance(signal, basestring) return self._exec_workflow(obj, signal, obj_id)
[docs] def wizard(self, name, datas=None, action='init', context=None): """Wrapper around ``wizard.create`` and ``wizard.execute`` RPC methods. If only `name` is provided, a new wizard is created and its ``id`` is returned. If `action` is not ``"init"``, then the action is executed. In this case the `name` is either an ``id`` or a string. If the `name` is a string, the wizard is created before the execution. The optional `datas` argument provides data for the action. The optional `context` argument is passed to the RPC method. Removed in OpenERP 7. """ if isinstance(name, int_types): wiz_id = name else: wiz_id = self._wizard_create(name) if datas is None: if action == 'init' and name != wiz_id: return wiz_id datas = {} return self._wizard_execute(wiz_id, datas, action, context)
def _upgrade(self, modules, button): # First, update the list of modules updated, added = self.execute('ir.module.module', 'update_list') if added: print('%s module(s) added to the list' % added) # Find modules ids = modules and self.search('ir.module.module', [('name', 'in', modules)]) if ids: # Click upgrade/install/uninstall button self.execute('ir.module.module', button, ids) mods = self.read('ir.module.module', [('state', 'not in', STABLE_STATES)], 'name state') if not mods: if ids: print('Already up-to-date: %s' % self.modules([('id', 'in', ids)])) elif modules: print('Module(s) not found: %s' % ', '.join(modules)) else: print('%s module(s) updated' % updated) return print('%s module(s) selected' % len(ids)) print('%s module(s) to process:' % len(mods)) for mod in mods: print(' %(state)s\t%(name)s' % mod) # Empty the models' cache self._models.clear() # Apply scheduled upgrades if self.major_version == '5.0': # Wizard "Apply Scheduled Upgrades" rv = self.wizard('module.upgrade', action='start') if 'config' not in [state[0] for state in rv.get('state', ())]: # Something bad happened return rv else: self.execute('base.module.upgrade', 'upgrade_module', [])
[docs] def upgrade(self, *modules): """Press the button ``Upgrade``.""" return self._upgrade(modules, button='button_upgrade')
[docs] def install(self, *modules): """Press the button ``Install``.""" return self._upgrade(modules, button='button_install')
[docs] def uninstall(self, *modules): """Press the button ``Uninstall``.""" return self._upgrade(modules, button='button_uninstall')
[docs] def search(self, obj, *params, **kwargs): """Filter the records in the `domain`, return the ``ids``.""" return self.execute(obj, 'search', *params, **kwargs)
[docs] def count(self, obj, domain=None): """Count the records in the `domain`.""" return self.execute(obj, 'search_count', domain or [])
[docs] def read(self, obj, *params, **kwargs): """Wrapper for ``client.execute(obj, 'read', [...], ('a', 'b'))``. The first argument `obj` is the model name (example: ``"res.partner"``) The second argument, `domain`, accepts: - ``[('name', '=', 'mushroom'), ('state', '!=', 'draft')]`` - ``['name = mushroom', 'state != draft']`` - ``[]`` - a list of ids ``[1, 2, 3]`` or a single id ``42`` The third argument, `fields`, accepts: - a single field: ``'first_name'`` - a tuple of fields: ``('street', 'city')`` - a space separated string: ``'street city'`` - a format spec: ``'%(street)s %(city)s'`` If `fields` is omitted, all fields are read. If `domain` is a single id, then: - return a single value if a single field is requested. - return a string if a format spec is passed in the `fields` argument. - else, return a dictionary. If `domain` is not a single id, the returned value is a list of items. Each item complies with the rules of the previous paragraph. The optional keyword arguments `offset`, `limit` and `order` are used to restrict the search. The `order` is also used to order the results returned. Note: the low-level RPC method ``read`` itself does not preserve the order of the results. """ fmt = None if len(params) > 1 and isinstance(params[1], basestring): fmt = ('%(' in params[1]) and params[1] if fmt: fields = _fields_re.findall(fmt) else: # transform: "zip city" --> ("zip", "city") fields = params[1].split() if len(fields) == 1: fmt = () # marker params = (params[0], fields) + params[2:] res = self.execute(obj, 'read', *params, **kwargs) if not res: return res if fmt: if isinstance(res, list): return [(d and fmt % d) for d in res] return fmt % res if fmt == (): if isinstance(res, list): return [(d and d[fields[0]]) for d in res] return res[fields[0]] return res
def _models_get(self, name): try: return self._models[name] except KeyError: self._models[name] = m = Model._new(self, name) return m
[docs] def models(self, name=''): """Return a dictionary of models. The argument `name` is a pattern to filter the models returned. If omitted, all models are returned. Keys are camel case names of the models. Values are instances of :class:`Model`. The return value can be used to declare the models in the global namespace: >>> globals().update(client.models('res.')) """ domain = [('model', 'like', name)] models = self.execute('ir.model', 'read', domain, ('model',)) names = [m['model'] for m in models] return dict([(mixedcase(mod), self._models_get(mod)) for mod in names])
[docs] def model(self, name, check=True): """Return a :class:`Model` instance. The argument `name` is the name of the model. If the optional argument `check` is :const:`False`, no validity check is done. """ try: return self._models[name] if check else self._models_get(name) except KeyError: models = self.models(name) if name in self._models: return self._models[name] if models: errmsg = 'Model not found. These models exist:' else: errmsg = 'Model not found: %s' % (name,) print('\n * '.join([errmsg] + [str(m) for m in models.values()]))
[docs] def modules(self, name='', installed=None): """Return a dictionary of modules. The optional argument `name` is a pattern to filter the modules. If the boolean argument `installed` is :const:`True`, the modules which are "Not Installed" or "Not Installable" are omitted. If the argument is :const:`False`, only these modules are returned. If argument `installed` is omitted, all modules are returned. The return value is a dictionary where module names are grouped in lists according to their ``state``. """ if isinstance(name, basestring): domain = [('name', 'like', name)] else: domain = name if installed is not None: op = 'not in' if installed else 'in' domain.append(('state', op, ['uninstalled', 'uninstallable'])) mods = self.read('ir.module.module', domain, 'name state') if mods: res = collections.defaultdict(list) for mod in mods: res[mod['state']].append(mod['name']) return dict(res)
[docs] def keys(self, obj): """Wrapper for :meth:`Model.keys` method.""" return self.model(obj).keys()
[docs] def fields(self, obj, names=None): """Wrapper for :meth:`Model.fields` method.""" return self.model(obj).fields(names=names)
[docs] def field(self, obj, name): """Wrapper for :meth:`Model.field` method.""" return self.model(obj).field(name)
[docs] def access(self, obj, mode='read'): """Wrapper for :meth:`Model.access` method.""" try: self._execute('ir.model.access', 'check', obj, mode) return True except (TypeError, Fault): return False
def __getattr__(self, method): if not method.islower(): rv = self.model(lowercase(method)) self.__dict__[method] = rv return rv if method.startswith('_'): errmsg = "'Client' object has no attribute %r" % method raise AttributeError(errmsg) # miscellaneous object methods def wrapper(self, obj, *params, **kwargs): """Wrapper for client.execute(obj, %r, *params, **kwargs).""" return self.execute(obj, method, *params, **kwargs) wrapper.__name__ = method wrapper.__doc__ %= method return wrapper.__get__(self, type(self)) def __enter__(self): return self def __exit__(self, exc_type, exc_value, tb): self.reset()
[docs]class Model(object): """The class for OpenERP models.""" def __new__(cls, client, name): return client.model(name) @classmethod def _new(cls, client, name): m = object.__new__(cls) (m.client, m._name) = (client, name) m._execute = functools.partial(client.execute, name) m.search = functools.partial(client.search, name) m.count = functools.partial(client.count, name) m.read = functools.partial(client.read, name) return m def __repr__(self): return "<Model '%s'>" % (self._name,) def _get_keys(self): obj_keys = self._execute('fields_get_keys') obj_keys.sort() return obj_keys def _get_fields(self): return self._execute('fields_get')
[docs] def keys(self): """Return the keys of the model.""" return self._keys
[docs] def fields(self, names=None): """Return a dictionary of the fields of the model. Optional argument `names` is a sequence of field names or a space separated string of these names. If omitted, all fields are returned. """ if names is None: return self._fields if isinstance(names, basestring): names = names.split() return dict([(k, v) for (k, v) in self._fields.items() if k in names])
[docs] def field(self, name): """Return the field properties for field `name`.""" return self._fields[name]
[docs] def access(self, mode="read"): """Check if the user has access to this model. Optional argument `mode` is the access mode to check. Valid values are ``read``, ``write``, ``create`` and ``unlink``. If omitted, the ``read`` mode is checked. Return a boolean. """ return self.client.access(self._name, mode)
[docs] def browse(self, domain, *params, **kwargs): """Return a :class:`Record` or a :class:`RecordList`. The argument `domain` accepts a single integer ``id``, a list of ids or a search domain. If it is a single integer, the return value is a :class:`Record`. Otherwise, the return value is a :class:`RecordList`. """ context = kwargs.pop('context', None) if isinstance(domain, int_types): assert not params and not kwargs return Record(self, domain, context=context) if issearchdomain(domain): params = searchargs((domain,) + params, kwargs, context) domain = self._execute('search', *params) # Ignore extra keyword arguments for item in kwargs.items(): print('Ignoring: %s = %r' % item) else: assert not params and not kwargs return RecordList(self, domain, context=context)
[docs] def get(self, domain, context=None): """Return a single :class:`Record`. The argument `domain` accepts a single integer ``id`` or a search domain, or an ``xml_id``. The return value is a :class:`Record` or None. If multiple records are found, a ``ValueError`` is raised. """ if isinstance(domain, int_types): # a single id return Record(self, domain, context=context) if isinstance(domain, basestring): # lookup the xml_id (module, name) = domain.split('.') data = self._imd_read( [('module', '=', module), ('name', '=', name)], 'model res_id') assert not data or data[0]['model'] == self._name ids = [res['res_id'] for res in data] else: # a search domain assert issearchdomain(domain) params = searchargs((domain,), {}, context) ids = self._execute('search', *params) if len(ids) > 1: raise ValueError('domain matches too many records (%d)' % len(ids)) return Record(self, ids[0], context=context) if ids else None
[docs] def create(self, values, context=None): """Create a :class:`Record`. The argument `values` is a dictionary of values which are used to create the record. The newly created :class:`Record` is returned. """ values = self._unbrowse_values(values) new_id = self._execute('create', values, context=context) return Record(self, new_id, context=context)
def _browse_values(self, values, context=None): """Wrap the values of a Record. The argument `values` is a dictionary of values read from a Record. When the field type is relational (many2one, one2many or many2many), the value is wrapped in a Record or a RecordList. Return a dictionary with the same keys as the `values` argument. """ for (key, value) in values.items(): if key == 'id' or hasattr(value, 'id'): continue field = self._fields[key] field_type = field['type'] if field_type == 'many2one': if value: rel_model = self.client.model(field['relation'], False) values[key] = Record(rel_model, value, context=context) elif field_type in ('one2many', 'many2many'): rel_model = self.client.model(field['relation'], False) values[key] = RecordList(rel_model, value, context=context) elif value and field_type == 'reference': (res_model, res_id) = value.split(',') rel_model = self.client.model(res_model, False) values[key] = Record(rel_model, int(res_id)) return values def _unbrowse_values(self, values): """Unwrap the id of Record and RecordList.""" new_values = values.copy() for (key, value) in values.items(): field_type = self._fields[key]['type'] if hasattr(value, 'id'): if field_type == 'reference': new_values[key] = '%s,%s' % (value._model_name, value.id) else: new_values[key] = value = value.id if field_type in ('one2many', 'many2many'): if not value: new_values[key] = [(6, 0, [])] elif isinstance(value[0], int_types): new_values[key] = [(6, 0, value)] return new_values
[docs] def _get_external_ids(self, ids=None): """Retrieve the External IDs of the records. Return a dictionary with keys being the fully qualified External IDs, and values the ``Record`` entries. """ search_domain = [('model', '=', self._name)] if ids is not None: search_domain.append(('res_id', 'in', ids)) existing = self._imd_read(search_domain, ['module', 'name', 'res_id']) res = {} for rec in existing: res['%(module)s.%(name)s' % rec] = self.get(rec['res_id']) return res
def __getattr__(self, attr): if attr in ('_keys', '_fields'): self.__dict__[attr] = rv = getattr(self, '_get' + attr)() return rv if attr.startswith('_imd_'): imd = self.client.model('ir.model.data') self.__dict__[attr] = imd_method = getattr(imd, attr[5:]) return imd_method if attr.startswith('_'): raise AttributeError("'Model' object has no attribute %r" % attr) def wrapper(self, *params, **kwargs): """Wrapper for client.execute(%r, %r, *params, **kwargs).""" return self._execute(attr, *params, **kwargs) wrapper.__name__ = attr wrapper.__doc__ %= (self._name, attr) self.__dict__[attr] = mobj = wrapper.__get__(self, type(self)) return mobj
[docs]class RecordList(object): """A sequence of OpenERP :class:`Record`. It has a similar API as the :class:`Record` class, but for a list of records. The attributes of the ``RecordList`` are read-only, and they return list of attribute values in the same order. The ``many2one``, ``one2many`` and ``many2many`` attributes are wrapped in ``RecordList`` and list of ``RecordList`` objects. Use the method ``RecordList.write`` to assign a single value to all the selected records. """ def __init__(self, res_model, ids, context=None): idnames = list(ids) for (index, id_) in enumerate(ids): if isinstance(id_, (list, tuple)): ids[index] = id_ = id_[0] assert isinstance(id_, int_types), repr(id_) # Bypass the __setattr__ method self.__dict__.update({ 'id': ids, '_model_name': res_model._name, '_model': res_model, '_idnames': idnames, '_context': context, '_execute': res_model._execute, }) def __repr__(self): if len(self.id) > 16: ids = 'length=%d' % len(self.id) else: ids = self.id return "<RecordList '%s,%s'>" % (self._model_name, ids) def __dir__(self): return ['__getitem__', 'read', 'write', 'unlink', '_context', '_idnames', '_model', '_model_name', '_external_id'] + self._model._keys def __len__(self): return len(self.id)
[docs] def read(self, fields=None, context=None): """Wrapper for :meth:`Record.read` method.""" if context is None and self._context: context = self._context client = self._model.client if self.id: values = client.read(self._model_name, self.id, fields, order=True, context=context) if is_list_of_dict(values): browse_values = self._model._browse_values return [v and browse_values(v) for v in values] else: values = [] if isinstance(fields, basestring): field = self._model._fields.get(fields) if field: if field['type'] == 'many2one': rel_model = client.model(field['relation'], False) return RecordList(rel_model, values, context=context) if field['type'] in ('one2many', 'many2many'): rel_model = client.model(field['relation'], False) return [RecordList(rel_model, v) for v in values] if field['type'] == 'reference': records = [] for value in values: if value: (res_model, res_id) = value.split(',') rel_model = client.model(res_model, False) value = Record(rel_model, int(res_id)) records.append(value) return records return values
[docs] def write(self, values, context=None): """Write the `values` in the :class:`RecordList`.""" if not self.id: return True if context is None and self._context: context = self._context values = self._model._unbrowse_values(values) rv = self._execute('write', self.id, values, context=context) return rv
@property
[docs] def _external_id(self): """Retrieve the External IDs of the :class:`RecordList`. Return the fully qualified External IDs with default value False if there's none. If multiple IDs exist for a record, only one of them is returned (randomly). """ xml_ids = dict([(r.id, xml_id) for (xml_id, r) in self._model._get_external_ids(self.id).items()]) return [xml_ids.get(res_id, False) for res_id in self.id]
def __getitem__(self, key): idname = self._idnames[key] if idname is False: return False cls = RecordList if isinstance(key, slice) else Record return cls(self._model, idname, context=self._context) def __getattr__(self, attr): context = self._context if attr in self._model._keys: return self.read(attr, context=context) if attr.startswith('_'): errmsg = "'RecordList' object has no attribute %r" % attr raise AttributeError(errmsg) def wrapper(self, *params, **kwargs): """Wrapper for client.execute(%r, %r, [...], *params, **kwargs).""" if context: kwargs.setdefault('context', context) return self._execute(attr, self.id, *params, **kwargs) wrapper.__name__ = attr wrapper.__doc__ %= (self._model_name, attr) self.__dict__[attr] = mobj = wrapper.__get__(self, type(self)) return mobj def __setattr__(self, attr, value): if attr in self._model._keys or attr == 'id': msg = "attribute %r is read-only; use 'RecordList.write' instead." else: msg = "has no attribute %r" raise AttributeError("'RecordList' object %s" % msg % attr)
[docs]class Record(object): """A class for all OpenERP records. It maps any OpenERP object. The fields can be accessed through attributes. The changes are immediately sent to the server. The ``many2one``, ``one2many`` and ``many2many`` attributes are wrapped in ``Record`` and ``RecordList`` objects. These attributes support writing too. The attributes are evaluated lazily, and they are cached in the record. The Record's cache is invalidated if any attribute is changed. """ def __init__(self, res_model, res_id, context=None): if isinstance(res_id, (list, tuple)): (res_id, res_name) = res_id self.__dict__['_name'] = res_name assert isinstance(res_id, int_types), repr(res_id) # Bypass the __setattr__ method self.__dict__.update({ 'id': res_id, '_model_name': res_model._name, '_model': res_model, '_context': context, '_cached_keys': set(), '_execute': res_model._execute, }) def __repr__(self): return "<Record '%s,%d'>" % (self._model_name, self.id) def __str__(self): return self._name def _get_name(self): try: (id_name,) = self._execute('name_get', [self.id]) name = '%s' % (id_name[1],) except Exception: name = '%s,%d' % (self._model_name, self.id) self.__dict__['_name'] = name return name @property def _keys(self): return self._model._keys @property def _fields(self): return self._model._fields
[docs] def refresh(self): """Force refreshing the record's data.""" self._cached_keys.discard('id') for key in self._cached_keys: delattr(self, key) self._cached_keys.clear()
def _update(self, values): new_values = self._model._browse_values(values, context=self._context) self.__dict__.update(new_values) self._cached_keys.update(new_values) return new_values
[docs] def read(self, fields=None, context=None): """Read the `fields` of the :class:`Record`. The argument `fields` accepts different kinds of values. See :meth:`Client.read` for details. """ if context is None and self._context: context = self._context rv = self._model.read(self.id, fields, context=context) if isinstance(rv, dict): return self._update(rv) elif isinstance(fields, basestring) and '%(' not in fields: return self._update({fields: rv})[fields] return rv
[docs] def perm_read(self, context=None): """Read the metadata of the :class:`Record`. Return a dictionary of values. See :meth:`Client.perm_read` for details. """ rv = self._execute('perm_read', [self.id], context=context) return rv[0] if rv else None
[docs] def write(self, values, context=None): """Write the `values` in the :class:`Record`.""" if context is None and self._context: context = self._context values = self._model._unbrowse_values(values) rv = self._execute('write', [self.id], values, context=context) self.refresh() return rv
[docs] def copy(self, default=None, context=None): """Copy a record and return the new :class:`Record`. The optional argument `default` is a mapping which overrides some values of the new record. """ if context is None and self._context: context = self._context if default: default = self._model._unbrowse_values(default) new_id = self._execute('copy', self.id, default, context=context) return Record(self._model, new_id)
[docs] def _send(self, signal): """Trigger workflow `signal` for this :class:`Record`.""" exec_workflow = self._model.client.exec_workflow rv = exec_workflow(self._model_name, signal, self.id) self.refresh() return rv
@property
[docs] def _external_id(self): """Retrieve the External ID of the :class:`Record`. Return the fully qualified External ID of the :class:`Record`, with default value False if there's none. If multiple IDs exist, only one of them is returned (randomly). """ xml_ids = self._model._get_external_ids([self.id]) return list(xml_ids)[0] if xml_ids else False
def _set_external_id(self, xml_id): """Set the External ID of this record.""" (mod, name) = xml_id.split('.') obj = self._model_name domain = ['|', '&', ('model', '=', obj), ('res_id', '=', self.id), '&', ('module', '=', mod), ('name', '=', name)] if self._model._imd_search(domain): raise ValueError('ID %r collides with another entry' % xml_id) vals = {'model': obj, 'res_id': self.id, 'module': mod, 'name': name} self._model._imd_create(vals) def __dir__(self): return ['read', 'write', 'copy', 'unlink', '_send', 'refresh', '_context', '_model', '_model_name', '_name', '_external_id', '_keys', '_fields'] + self._model._keys def __getattr__(self, attr): context = self._context if attr in self._model._keys: return self.read(attr, context=context) if attr == '_name': return self._get_name() if attr.startswith('_'): raise AttributeError("'Record' object has no attribute %r" % attr) def wrapper(self, *params, **kwargs): """Wrapper for client.execute(%r, %r, %d, *params, **kwargs).""" if context: kwargs.setdefault('context', context) res = self._execute(attr, [self.id], *params, **kwargs) self.refresh() if isinstance(res, list) and len(res) == 1: return res[0] return res wrapper.__name__ = attr wrapper.__doc__ %= (self._model_name, attr, self.id) self.__dict__[attr] = mobj = wrapper.__get__(self, type(self)) return mobj def __setattr__(self, attr, value): if attr == '_external_id': return self._set_external_id(value) if attr not in self._model._keys: raise AttributeError("'Record' object has no attribute %r" % attr) if attr == 'id': raise AttributeError("'Record' object attribute 'id' is read-only") self.write({attr: value}) def __eq__(self, other): return (isinstance(other, Record) and self.id == other.id and self._model is other._model)
def _interact(global_vars, use_pprint=True, usage=USAGE): import code try: import builtins _exec = getattr(builtins, 'exec') except ImportError: def _exec(code, g): exec('exec code in g') import __builtin__ as builtins if use_pprint: def displayhook(value, _printer=pprint, _builtins=builtins): # Pretty-format the output if value is None: return _printer(value) _builtins._ = value sys.displayhook = displayhook class Usage(object): def __call__(self): print(usage) __repr__ = lambda s: usage builtins.usage = Usage() try: import readline as rl import rlcompleter rl.parse_and_bind('tab: complete') # IOError if file missing, or broken Apple readline rl.read_history_file(HIST_FILE) except Exception: pass else: if rl.get_history_length() < 0: rl.set_history_length(int(os.environ.get('HISTSIZE', 500))) # better append instead of replace? atexit.register(rl.write_history_file, HIST_FILE) class Console(code.InteractiveConsole): def runcode(self, code): try: _exec(code, global_vars) except SystemExit: raise except: # Print readable 'Fault' errors # Work around http://bugs.python.org/issue12643 (exc_type, exc, tb) = sys.exc_info() msg = ''.join(format_exception(exc_type, exc, tb, chain=False)) print(msg.strip()) warnings.simplefilter('always', UserWarning) sys.exc_clear() if hasattr(sys, 'exc_clear') else None # Python 2.x # Key UP to avoid an empty line Console().interact('\033[A') def main(): description = ('Inspect data on OpenERP objects. Use interactively ' 'or query a model (-m) and pass search terms or ' 'ids as positional parameters after the options.') parser = optparse.OptionParser( usage='%prog [options] [search_term_or_id [search_term_or_id ...]]', version=__version__, description=description) parser.add_option( '-l', '--list', action='store_true', dest='list_env', help='list sections of the configuration') parser.add_option( '--env', help='read connection settings from the given section') parser.add_option( '-c', '--config', default=CONF_FILE, help='specify alternate config file (default: %r)' % CONF_FILE) parser.add_option( '--server', default=DEFAULT_URL, help='full URL to the XML-RPC server (default: %s)' % DEFAULT_URL) parser.add_option('-d', '--db', default=DEFAULT_DB, help='database') parser.add_option('-u', '--user', default=DEFAULT_USER, help='username') parser.add_option( '-p', '--password', default=None, help='password, or it will be requested on login') parser.add_option( '-m', '--model', help='the type of object to find') parser.add_option( '-f', '--fields', action='append', help='restrict the output to certain fields (multiple allowed)') parser.add_option( '-i', '--interact', action='store_true', help='use interactively; default when no model is queried') parser.add_option( '-v', '--verbose', default=0, action='count', help='verbose') (args, domain) = parser.parse_args() Client._config_file = os.path.join(os.path.curdir, args.config) if args.list_env: print('Available settings: ' + ' '.join(read_config())) return if (args.interact or not args.model): global_vars = Client._set_interactive() print(USAGE) if args.env: client = Client.from_config(args.env, verbose=args.verbose) else: client = Client(args.server, args.db, args.user, args.password, verbose=args.verbose) if args.model and domain and client.user: context = {'lang': (os.environ.get('LANG') or 'en_US').split('.')[0]} data = client.execute(args.model, 'read', domain, args.fields, context) if not args.fields: args.fields = ['id'] if data: args.fields.extend([fld for fld in data[0] if fld != 'id']) writer = _DictWriter(sys.stdout, args.fields, "", "ignore", quoting=csv.QUOTE_NONNUMERIC) writer.writeheader() writer.writerows(data) if client.connect is not None: # Set the globals() client.connect() # Enter interactive mode _interact(global_vars) if __name__ == '__main__': main()