#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" erppeek.py -- OpenERP command line tool
Author: Florent Xicluna
(derived from a script by Alan Bell)
"""
from __future__ import with_statement
import _ast
import atexit
import collections
import csv
import functools
import optparse
import os
from pprint import pprint
import re
import sys
import time
import traceback
import warnings
try: # Python 3
import configparser
from threading import current_thread
from xmlrpc.client import Fault, ServerProxy
basestring = str
int_types = int
_DictWriter = csv.DictWriter
except ImportError: # Python 2
import ConfigParser as configparser
from threading import currentThread as current_thread
from xmlrpclib import Fault, ServerProxy
int_types = int, long
class _DictWriter(csv.DictWriter):
"""Unicode CSV Writer, which encodes output to UTF-8."""
def writeheader(self):
# Method 'writeheader' does not exist in Python 2.6
header = dict(zip(self.fieldnames, self.fieldnames))
self.writerow(header)
def _dict_to_list(self, rowdict):
rowlst = csv.DictWriter._dict_to_list(self, rowdict)
return [cell.encode('utf-8') if hasattr(cell, 'encode') else cell
for cell in rowlst]
__version__ = '1.5.3'
__all__ = ['Client', 'Model', 'Record', 'RecordList', 'Service',
'format_exception', 'read_config', 'start_openerp_services']
CONF_FILE = 'erppeek.ini'
HIST_FILE = os.path.expanduser('~/.erppeek_history')
DEFAULT_URL = 'http://localhost:8069'
DEFAULT_DB = 'openerp'
DEFAULT_USER = 'admin'
MAXCOL = [79, 179, 9999] # Line length in verbose mode
USAGE = """\
Usage (some commands):
models(name) # List models matching pattern
model(name) # Return a Model instance
model(name).keys() # List field names of the model
model(name).fields(names=None) # Return details for the fields
model(name).field(name) # Return details for the field
model(name).browse(domain)
model(name).browse(domain, offset=0, limit=None, order=None)
# Return a RecordList
rec = model(name).get(domain) # Get the Record matching domain
rec.some_field # Return the value of this field
rec.read(fields=None) # Return values for the fields
client.login(user) # Login with another user
client.connect(env) # Connect to another env.
client.modules(name) # List modules matching pattern
client.upgrade(module1, module2, ...)
# Upgrade the modules
"""
STABLE_STATES = ('uninstallable', 'uninstalled', 'installed')
DOMAIN_OPERATORS = frozenset('!|&')
# Supported operators are:
# =, !=, >, >=, <, <=, like, ilike, in, not like, not ilike, not in, child_of
# =like, =ilike (6.0), =? (6.0)
_term_re = re.compile(
'([\w._]+)\s*' '(=(?:like|ilike|\?)|[<>]=?|!?=(?!=)'
'|(?<= )(?:like|ilike|in|not like|not ilike|not in|child_of))' '\s*(.*)')
_fields_re = re.compile(r'(?:[^%]|^)%\(([^)]+)\)')
# Published object methods
_methods = {
'db': ['create', 'drop', 'dump', 'restore', 'rename',
'get_progress', 'list', 'list_lang',
'change_admin_password', 'server_version', 'migrate_databases'],
'common': ['about', 'login', 'timezone_get', 'get_server_environment',
'login_message', 'check_connectivity'],
'object': ['execute', 'exec_workflow'],
'report': ['report', 'report_get'],
'wizard': ['execute', 'create'],
}
_methods_6_1 = {
'db': ['create_database', 'db_exist'],
'common': ['get_stats', 'list_http_services', 'version',
'authenticate', 'get_os_time', 'get_sqlcount'],
'object': ['execute_kw'],
'report': ['render_report'],
}
# Hidden methods:
# - common: get_available_updates, get_migration_scripts, set_loglevel
_cause_message = ("\nThe above exception was the direct cause "
"of the following exception:\n\n")
# Simplified ast.literal_eval which does not parse operators
def _convert(node, _consts={'None': None, 'True': True, 'False': False}):
if isinstance(node, _ast.Str):
return node.s
if isinstance(node, _ast.Num):
return node.n
if isinstance(node, _ast.Tuple):
return tuple(map(_convert, node.elts))
if isinstance(node, _ast.List):
return list(map(_convert, node.elts))
if isinstance(node, _ast.Dict):
return dict([(_convert(k), _convert(v))
for (k, v) in zip(node.keys, node.values)])
if hasattr(node, 'value') and str(node.value) in _consts:
return node.value # Python 3.4+
if isinstance(node, _ast.Name) and node.id in _consts:
return _consts[node.id] # Python <= 3.3
raise ValueError('malformed or disallowed expression')
def literal_eval(expression):
node = compile(expression, '<unknown>', 'eval', _ast.PyCF_ONLY_AST)
return _convert(node.body)
def is_list_of_dict(iterator):
"""Return True if the first non-false item is a dict."""
for item in iterator:
if item:
return isinstance(item, dict)
return False
[docs]def mixedcase(s, _cache={}):
"""Convert to MixedCase.
>>> mixedcase('res.company')
'ResCompany'
"""
try:
return _cache[s]
except KeyError:
_cache[s] = s = ''.join([w.capitalize() for w in s.split('.')])
return s
[docs]def lowercase(s, _sub=re.compile('[A-Z]').sub,
_repl=(lambda m: '.' + m.group(0).lower()), _cache={}):
"""Convert to lowercase with dots.
>>> lowercase('ResCompany')
'res.company'
"""
try:
return _cache[s]
except KeyError:
_cache[s] = s = _sub(_repl, s).lstrip('.')
return s
[docs]def read_config(section=None):
"""Read the environment settings from the configuration file.
The config file ``erppeek.ini`` contains a `section` for each environment.
Each section provides parameters for the connection: ``host``, ``port``,
``database``, ``user`` and (optional) ``password``. Default values are
read from the ``[DEFAULT]`` section. If the ``password`` is not in the
configuration file, it is requested on login.
Return a tuple ``(server, db, user, password or None)``.
Without argument, it returns the list of configured environments.
"""
p = configparser.SafeConfigParser()
with open(Client._config_file) as f:
p.readfp(f)
if section is None:
return p.sections()
env = dict(p.items(section))
scheme = env.get('scheme', 'http')
if scheme == 'local':
server = (scheme, env.get('options', ''))
else:
server = '%s://%s:%s' % (scheme, env['host'], env['port'])
return (server, env['database'], env['username'], env.get('password'))
[docs]def start_openerp_services(options=None, appname=None):
"""Initialize the OpenERP services.
Import the ``openerp`` package and load the OpenERP services.
The argument `options` receives the command line arguments for ``openerp``.
Example: ``"-c /path/to/openerp-server.conf --without-demo all"``.
Return the openerp module.
"""
import openerp
if not openerp.osv.osv.service:
os.environ['TZ'] = 'UTC'
if appname is not None:
os.environ['PGAPPNAME'] = appname
options = options.split() if options else []
openerp.tools.config.parse_config(options)
if openerp.release.version_info < (7,):
openerp.netsvc.init_logger()
openerp.osv.osv.start_object_proxy()
openerp.service.web_services.start_web_services()
else:
openerp.service.start_internal()
def close_all():
for db in openerp.modules.registry.RegistryManager.registries:
openerp.sql_db.close_db(db)
atexit.register(close_all)
return openerp
[docs]def issearchdomain(arg):
"""Check if the argument is a search domain.
Examples:
- ``[('name', '=', 'mushroom'), ('state', '!=', 'draft')]``
- ``['name = mushroom', 'state != draft']``
- ``[]``
"""
# These ones are supported but discouraged:
# - 'state != draft'
# - ('state', '!=', 'draft')
return isinstance(arg, (list, tuple, basestring)) and not (arg and (
# Not a list of ids: [1, 2, 3]
isinstance(arg[0], int_types) or
# Not a list of ids as str: ['1', '2', '3']
(isinstance(arg[0], basestring) and arg[0].isdigit())))
[docs]def searchargs(params, kwargs=None, context=None):
"""Compute the 'search' parameters."""
if not params:
return ([],)
domain = params[0]
if isinstance(domain, (basestring, tuple)):
domain = [domain]
warnings.warn('Domain should be a list: %s' % domain)
elif not isinstance(domain, list):
return params
for (idx, term) in enumerate(domain):
if isinstance(term, basestring) and term not in DOMAIN_OPERATORS:
m = _term_re.match(term.strip())
if not m:
raise ValueError('Cannot parse term %r' % term)
(field, operator, value) = m.groups()
try:
value = literal_eval(value)
except Exception:
# Interpret the value as a string
pass
domain[idx] = (field, operator, value)
if (kwargs or context) and len(params) == 1:
params = (domain,
kwargs.pop('offset', 0),
kwargs.pop('limit', None),
kwargs.pop('order', None),
context)
else:
params = (domain,) + params[1:]
return params
[docs]class Service(object):
"""A wrapper around XML-RPC endpoints.
The connected endpoints are exposed on the Client instance.
The `server` argument is the URL of the server (scheme+host+port).
If `server` is an ``openerp`` module, it is used to connect to the
local server. The `endpoint` argument is the name of the service
(examples: ``"object"``, ``"db"``). The `methods` is the list of methods
which should be exposed on this endpoint. Use ``dir(...)`` on the
instance to list them.
"""
def __init__(self, server, endpoint, methods, verbose=False):
if isinstance(server, basestring):
self._rpcpath = rpcpath = server + '/xmlrpc/'
proxy = ServerProxy(rpcpath + endpoint, allow_none=True)
self._dispatch = proxy._ServerProxy__request
if hasattr(proxy._ServerProxy__transport, 'close'): # >= 2.7
self.close = proxy._ServerProxy__transport.close
else:
self._rpcpath = ''
proxy = server.netsvc.ExportService.getService(endpoint)
self._dispatch = proxy.dispatch
self._endpoint = endpoint
self._methods = methods
self._verbose = verbose
def __repr__(self):
return "<Service '%s%s'>" % (self._rpcpath, self._endpoint)
__str__ = __repr__
def __dir__(self):
return sorted(self._methods)
def __getattr__(self, name):
if name not in self._methods:
raise AttributeError("'Service' object has no attribute %r" % name)
if self._verbose:
def sanitize(args):
if self._endpoint != 'db' and len(args) > 2:
args = list(args)
args[2] = '*'
return args
maxcol = MAXCOL[min(len(MAXCOL), self._verbose) - 1]
def wrapper(self, *args):
snt = ', '.join([repr(arg) for arg in sanitize(args)])
snt = '%s.%s(%s)' % (self._endpoint, name, snt)
if len(snt) > maxcol:
suffix = '... L=%s' % len(snt)
snt = snt[:maxcol - len(suffix)] + suffix
print('--> ' + snt)
res = self._dispatch(name, args)
rcv = str(res)
if len(rcv) > maxcol:
suffix = '... L=%s' % len(rcv)
rcv = rcv[:maxcol - len(suffix)] + suffix
print('<-- ' + rcv)
return res
else:
wrapper = lambda s, *args: s._dispatch(name, args)
wrapper.__name__ = name
return wrapper.__get__(self, type(self))
def __del__(self):
if hasattr(self, 'close'):
self.close()
[docs]class Client(object):
"""Connection to an OpenERP instance.
This is the top level object.
The `server` is the URL of the instance, like ``http://localhost:8069``.
If `server` is an ``openerp`` module, it is used to connect to the local
server (>= 6.1).
The `db` is the name of the database and the `user` should exist in the
table ``res.users``. If the `password` is not provided, it will be
asked on login.
"""
_config_file = os.path.join(os.path.curdir, CONF_FILE)
def __init__(self, server, db=None, user=None, password=None,
verbose=False):
if isinstance(server, basestring) and server[-1:] == '/':
server = server.rstrip('/')
self._server = server
major_version = None
def get_proxy(name):
if major_version in ('5.0', None) or name == 'wizard':
methods = _methods[name]
else:
# Only for OpenERP >= 6
methods = _methods[name] + _methods_6_1[name]
return Service(server, name, methods, verbose=verbose)
self.server_version = ver = get_proxy('db').server_version()
self.major_version = major_version = '.'.join(ver.split('.', 2)[:2])
# Create the XML-RPC proxies
self.db = get_proxy('db')
self.common = get_proxy('common')
self._object = get_proxy('object')
self._report = get_proxy('report')
self._wizard = get_proxy('wizard') if major_version < '7.0' else None
self.reset()
if db:
# Try to login
self._login(user, password=password, database=db)
@classmethod
[docs] def from_config(cls, environment, verbose=False):
"""Create a connection to a defined environment.
Read the settings from the section ``[environment]`` in the
``erppeek.ini`` file and return a connected :class:`Client`.
See :func:`read_config` for details of the configuration file format.
"""
(server, db, user, password) = read_config(environment)
if server[0] == 'local':
appname = os.path.basename(__file__).rstrip('co')
server = start_openerp_services(server[1], appname=appname)
client = cls(server, db, user, password, verbose=verbose)
client._environment = environment
return client
def reset(self):
self.user = self._environment = None
self._db, self._models = (), {}
self._execute = self._exec_workflow = None
def __repr__(self):
return "<Client '%s#%s'>" % (self._server or '', self._db)
[docs] def login(self, user, password=None, database=None):
"""Switch `user` and (optionally) `database`.
If the `password` is not available, it will be asked.
"""
if database:
dbs = self.db.list()
if database not in dbs:
print("Error: Database '%s' does not exist: %s" %
(database, dbs))
return
elif self._db:
database = self._db
else:
print('Error: Not connected')
return
# Used for logging, copied from openerp.sql_db.db_connect
current_thread().dbname = database
(uid, password) = self._auth(database, user, password)
if not uid:
if not self._db:
self._db = database
print('Error: Invalid username or password')
return
if self._db != database:
self.reset()
self._db = database
self.user = user
# Authenticated endpoints
def authenticated(method):
return functools.partial(method, self._db, uid, password)
self._execute = authenticated(self._object.execute)
self._exec_workflow = authenticated(self._object.exec_workflow)
self.report = authenticated(self._report.report)
self.report_get = authenticated(self._report.report_get)
if self.major_version != '5.0':
# Only for OpenERP >= 6
self.execute_kw = authenticated(self._object.execute_kw)
self.render_report = authenticated(self._report.render_report)
if self._wizard:
self._wizard_execute = authenticated(self._wizard.execute)
self._wizard_create = authenticated(self._wizard.create)
return uid
# Needed for interactive use
connect = None
_login = login
_login.cache = {}
def _check_valid(self, database, uid, password):
execute = self._object.execute
try:
execute(database, uid, password, 'res.users', 'fields_get_keys')
return True
except Fault:
return False
def _auth(self, database, user, password):
assert database
cache_key = (self._server, database, user)
if password:
# If password is explicit, call the 'login' method
uid = None
else:
# Read from cache
uid, password = self._login.cache.get(cache_key) or (None, None)
# Read from table 'res.users'
if ((not uid and self._db == database and
self.access('res.users', 'write'))):
obj = self.read('res.users',
[('login', '=', user)], 'id password')
if obj:
uid = obj[0]['id']
password = obj[0]['password']
else:
# Invalid user
uid = False
# Ask for password
if not password and uid is not False:
from getpass import getpass
password = getpass('Password for %r: ' % user)
if uid:
# Check if password changed
if not self._check_valid(database, uid, password):
if cache_key in self._login.cache:
del self._login.cache[cache_key]
uid = False
elif uid is None:
# Do a standard 'login'
uid = self.common.login(database, user, password)
if uid:
# Update the cache
self._login.cache[cache_key] = (uid, password)
return (uid, password)
@classmethod
def _set_interactive(cls, global_vars={}):
# Don't call multiple times
del Client._set_interactive
for name in ['__name__', '__doc__'] + __all__:
global_vars[name] = globals()[name]
def get_pool(db_name=None):
"""Return a model registry.
Use get_pool(db_name).db.cursor() to grab a cursor.
"""
client = global_vars['client']
registry = client._server.modules.registry
return registry.RegistryManager.get(db_name or client._db)
def connect(self, env=None):
"""Connect to another environment and replace the globals()."""
if env:
# Safety measure: turn down the previous connection
global_vars['client'].reset()
client = self.from_config(env, verbose=self.db._verbose)
else:
client = self
env = self._environment or self._db
global_vars['client'] = client
if hasattr(client._server, 'modules'):
global_vars['get_pool'] = get_pool
# Tweak prompt
sys.ps1 = '%s >>> ' % (env,)
sys.ps2 = '%s ... ' % (env,)
# Logged in?
if client.user:
global_vars['model'] = client.model
global_vars['models'] = client.models
global_vars['do'] = client.execute
print('Logged in as %r' % (client.user,))
else:
global_vars.update({'model': None, 'models': None, 'do': None})
def login(self, user, password=None, database=None):
"""Switch `user` and (optionally) `database`."""
if self._login(user, password=password, database=database):
# Register the new globals()
self.connect()
# Set hooks to recreate the globals()
cls.login = login
cls.connect = connect
return global_vars
[docs] def create_database(self, passwd, database, demo=False, lang='en_US',
user_password='admin'):
"""Create a new database.
The superadmin `passwd` and the `database` name are mandatory.
By default, `demo` data are not loaded and `lang` is ``en_US``.
Wait for the thread to finish and login if successful.
"""
thread_id = self.db.create(passwd, database, demo, lang, user_password)
progress = 0
try:
while progress < 1:
time.sleep(3)
progress, users = self.db.get_progress(passwd, thread_id)
# [1.0, [{'login': 'admin', 'password': 'admin',
# 'name': 'Administrator'}]]
self.login(users[0]['login'], users[0]['password'],
database=database)
except KeyboardInterrupt:
print({'id': thread_id, 'progress': progress})
[docs] def execute(self, obj, method, *params, **kwargs):
"""Wrapper around ``object.execute`` RPC method.
Argument `method` is the name of an ``osv.osv`` method or
a method available on this `obj`.
Method `params` are allowed. If needed, keyword
arguments are collected in `kwargs`.
"""
assert self.user, 'Not connected'
assert isinstance(obj, basestring)
assert isinstance(method, basestring) and method != 'browse'
context = kwargs.pop('context', None)
ordered = single_id = False
if method in ('read', 'name_get'):
assert params
if issearchdomain(params[0]):
# Combine search+read
search_params = searchargs(params[:1], kwargs, context)
ordered = len(search_params) > 3 and search_params[3]
ids = self._execute(obj, 'search', *search_params)
elif isinstance(params[0], list):
ordered = kwargs.pop('order', False) and params[0]
ids = set(params[0])
ids.discard(False)
if not ids and ordered:
return [False] * len(ordered)
ids = sorted(ids)
else:
single_id = True
ids = [params[0]] if params[0] else False
if not ids:
return ids
if len(params) > 1:
params = (ids,) + params[1:]
elif method == 'read':
params = (ids, kwargs.pop('fields', None))
else:
params = (ids,)
elif method == 'search':
# Accept keyword arguments for the search method
params = searchargs(params, kwargs, context)
context = None
elif method == 'search_count':
params = searchargs(params)
elif method == 'perm_read':
# broken with a single id (verified with 5.0 and 6.1)
if params and isinstance(params[0], int_types):
params = ([params[0]],) + params[1:]
if context:
params = params + (context,)
# Ignore extra keyword arguments
for item in kwargs.items():
print('Ignoring: %s = %r' % item)
res = self._execute(obj, method, *params)
if ordered:
# The results are not in the same order as the ids
# when received from the server
resdic = dict([(val['id'], val) for val in res])
if not isinstance(ordered, list):
ordered = ids
res = [resdic.get(id_, False) for id_ in ordered]
return res[0] if single_id else res
[docs] def exec_workflow(self, obj, signal, obj_id):
"""Wrapper around ``object.exec_workflow`` RPC method.
Argument `obj` is the name of the model. The `signal`
is sent to the object identified by its integer ``id`` `obj_id`.
"""
assert self.user, 'Not connected'
assert isinstance(obj, basestring) and isinstance(signal, basestring)
return self._exec_workflow(obj, signal, obj_id)
[docs] def wizard(self, name, datas=None, action='init', context=None):
"""Wrapper around ``wizard.create`` and ``wizard.execute``
RPC methods.
If only `name` is provided, a new wizard is created and its ``id`` is
returned. If `action` is not ``"init"``, then the action is executed.
In this case the `name` is either an ``id`` or a string.
If the `name` is a string, the wizard is created before the execution.
The optional `datas` argument provides data for the action.
The optional `context` argument is passed to the RPC method.
Removed in OpenERP 7.
"""
if isinstance(name, int_types):
wiz_id = name
else:
wiz_id = self._wizard_create(name)
if datas is None:
if action == 'init' and name != wiz_id:
return wiz_id
datas = {}
return self._wizard_execute(wiz_id, datas, action, context)
def _upgrade(self, modules, button):
# First, update the list of modules
updated, added = self.execute('ir.module.module', 'update_list')
if added:
print('%s module(s) added to the list' % added)
# Find modules
ids = modules and self.search('ir.module.module',
[('name', 'in', modules)])
if ids:
# Click upgrade/install/uninstall button
self.execute('ir.module.module', button, ids)
mods = self.read('ir.module.module',
[('state', 'not in', STABLE_STATES)], 'name state')
if not mods:
if ids:
print('Already up-to-date: %s' %
self.modules([('id', 'in', ids)]))
elif modules:
print('Module(s) not found: %s' % ', '.join(modules))
else:
print('%s module(s) updated' % updated)
return
print('%s module(s) selected' % len(ids))
print('%s module(s) to process:' % len(mods))
for mod in mods:
print(' %(state)s\t%(name)s' % mod)
# Empty the models' cache
self._models.clear()
# Apply scheduled upgrades
if self.major_version == '5.0':
# Wizard "Apply Scheduled Upgrades"
rv = self.wizard('module.upgrade', action='start')
if 'config' not in [state[0] for state in rv.get('state', ())]:
# Something bad happened
return rv
else:
self.execute('base.module.upgrade', 'upgrade_module', [])
[docs] def upgrade(self, *modules):
"""Press the button ``Upgrade``."""
return self._upgrade(modules, button='button_upgrade')
[docs] def install(self, *modules):
"""Press the button ``Install``."""
return self._upgrade(modules, button='button_install')
[docs] def uninstall(self, *modules):
"""Press the button ``Uninstall``."""
return self._upgrade(modules, button='button_uninstall')
[docs] def search(self, obj, *params, **kwargs):
"""Filter the records in the `domain`, return the ``ids``."""
return self.execute(obj, 'search', *params, **kwargs)
[docs] def count(self, obj, domain=None):
"""Count the records in the `domain`."""
return self.execute(obj, 'search_count', domain or [])
[docs] def read(self, obj, *params, **kwargs):
"""Wrapper for ``client.execute(obj, 'read', [...], ('a', 'b'))``.
The first argument `obj` is the model name (example: ``"res.partner"``)
The second argument, `domain`, accepts:
- ``[('name', '=', 'mushroom'), ('state', '!=', 'draft')]``
- ``['name = mushroom', 'state != draft']``
- ``[]``
- a list of ids ``[1, 2, 3]`` or a single id ``42``
The third argument, `fields`, accepts:
- a single field: ``'first_name'``
- a tuple of fields: ``('street', 'city')``
- a space separated string: ``'street city'``
- a format spec: ``'%(street)s %(city)s'``
If `fields` is omitted, all fields are read.
If `domain` is a single id, then:
- return a single value if a single field is requested.
- return a string if a format spec is passed in the `fields` argument.
- else, return a dictionary.
If `domain` is not a single id, the returned value is a list of items.
Each item complies with the rules of the previous paragraph.
The optional keyword arguments `offset`, `limit` and `order` are
used to restrict the search. The `order` is also used to order the
results returned. Note: the low-level RPC method ``read`` itself does
not preserve the order of the results.
"""
fmt = None
if len(params) > 1 and isinstance(params[1], basestring):
fmt = ('%(' in params[1]) and params[1]
if fmt:
fields = _fields_re.findall(fmt)
else:
# transform: "zip city" --> ("zip", "city")
fields = params[1].split()
if len(fields) == 1:
fmt = () # marker
params = (params[0], fields) + params[2:]
res = self.execute(obj, 'read', *params, **kwargs)
if not res:
return res
if fmt:
if isinstance(res, list):
return [(d and fmt % d) for d in res]
return fmt % res
if fmt == ():
if isinstance(res, list):
return [(d and d[fields[0]]) for d in res]
return res[fields[0]]
return res
def _models_get(self, name):
try:
return self._models[name]
except KeyError:
self._models[name] = m = Model._new(self, name)
return m
[docs] def models(self, name=''):
"""Return a dictionary of models.
The argument `name` is a pattern to filter the models returned.
If omitted, all models are returned.
Keys are camel case names of the models.
Values are instances of :class:`Model`.
The return value can be used to declare the models in the global
namespace:
>>> globals().update(client.models('res.'))
"""
domain = [('model', 'like', name)]
models = self.execute('ir.model', 'read', domain, ('model',))
names = [m['model'] for m in models]
return dict([(mixedcase(mod), self._models_get(mod)) for mod in names])
[docs] def model(self, name, check=True):
"""Return a :class:`Model` instance.
The argument `name` is the name of the model. If the optional
argument `check` is :const:`False`, no validity check is done.
"""
try:
return self._models[name] if check else self._models_get(name)
except KeyError:
models = self.models(name)
if name in self._models:
return self._models[name]
if models:
errmsg = 'Model not found. These models exist:'
else:
errmsg = 'Model not found: %s' % (name,)
print('\n * '.join([errmsg] + [str(m) for m in models.values()]))
[docs] def modules(self, name='', installed=None):
"""Return a dictionary of modules.
The optional argument `name` is a pattern to filter the modules.
If the boolean argument `installed` is :const:`True`, the modules
which are "Not Installed" or "Not Installable" are omitted. If
the argument is :const:`False`, only these modules are returned.
If argument `installed` is omitted, all modules are returned.
The return value is a dictionary where module names are grouped in
lists according to their ``state``.
"""
if isinstance(name, basestring):
domain = [('name', 'like', name)]
else:
domain = name
if installed is not None:
op = 'not in' if installed else 'in'
domain.append(('state', op, ['uninstalled', 'uninstallable']))
mods = self.read('ir.module.module', domain, 'name state')
if mods:
res = collections.defaultdict(list)
for mod in mods:
res[mod['state']].append(mod['name'])
return dict(res)
[docs] def keys(self, obj):
"""Wrapper for :meth:`Model.keys` method."""
return self.model(obj).keys()
[docs] def fields(self, obj, names=None):
"""Wrapper for :meth:`Model.fields` method."""
return self.model(obj).fields(names=names)
[docs] def field(self, obj, name):
"""Wrapper for :meth:`Model.field` method."""
return self.model(obj).field(name)
[docs] def access(self, obj, mode='read'):
"""Wrapper for :meth:`Model.access` method."""
try:
self._execute('ir.model.access', 'check', obj, mode)
return True
except (TypeError, Fault):
return False
def __getattr__(self, method):
if not method.islower():
rv = self.model(lowercase(method))
self.__dict__[method] = rv
return rv
if method.startswith('_'):
errmsg = "'Client' object has no attribute %r" % method
raise AttributeError(errmsg)
# miscellaneous object methods
def wrapper(self, obj, *params, **kwargs):
"""Wrapper for client.execute(obj, %r, *params, **kwargs)."""
return self.execute(obj, method, *params, **kwargs)
wrapper.__name__ = method
wrapper.__doc__ %= method
return wrapper.__get__(self, type(self))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
self.reset()
[docs]class Model(object):
"""The class for OpenERP models."""
def __new__(cls, client, name):
return client.model(name)
@classmethod
def _new(cls, client, name):
m = object.__new__(cls)
(m.client, m._name) = (client, name)
m._execute = functools.partial(client.execute, name)
m.search = functools.partial(client.search, name)
m.count = functools.partial(client.count, name)
m.read = functools.partial(client.read, name)
return m
def __repr__(self):
return "<Model '%s'>" % (self._name,)
def _get_keys(self):
obj_keys = self._execute('fields_get_keys')
obj_keys.sort()
return obj_keys
def _get_fields(self):
return self._execute('fields_get')
[docs] def keys(self):
"""Return the keys of the model."""
return self._keys
[docs] def fields(self, names=None):
"""Return a dictionary of the fields of the model.
Optional argument `names` is a sequence of field names or
a space separated string of these names.
If omitted, all fields are returned.
"""
if names is None:
return self._fields
if isinstance(names, basestring):
names = names.split()
return dict([(k, v) for (k, v) in self._fields.items() if k in names])
[docs] def field(self, name):
"""Return the field properties for field `name`."""
return self._fields[name]
[docs] def access(self, mode="read"):
"""Check if the user has access to this model.
Optional argument `mode` is the access mode to check. Valid values
are ``read``, ``write``, ``create`` and ``unlink``. If omitted,
the ``read`` mode is checked. Return a boolean.
"""
return self.client.access(self._name, mode)
[docs] def browse(self, domain, *params, **kwargs):
"""Return a :class:`Record` or a :class:`RecordList`.
The argument `domain` accepts a single integer ``id``, a list of ids
or a search domain.
If it is a single integer, the return value is a :class:`Record`.
Otherwise, the return value is a :class:`RecordList`.
"""
context = kwargs.pop('context', None)
if isinstance(domain, int_types):
assert not params and not kwargs
return Record(self, domain, context=context)
if issearchdomain(domain):
params = searchargs((domain,) + params, kwargs, context)
domain = self._execute('search', *params)
# Ignore extra keyword arguments
for item in kwargs.items():
print('Ignoring: %s = %r' % item)
else:
assert not params and not kwargs
return RecordList(self, domain, context=context)
[docs] def get(self, domain, context=None):
"""Return a single :class:`Record`.
The argument `domain` accepts a single integer ``id`` or a
search domain, or an ``xml_id``. The return value is a
:class:`Record` or None. If multiple records are found,
a ``ValueError`` is raised.
"""
if isinstance(domain, int_types): # a single id
return Record(self, domain, context=context)
if isinstance(domain, basestring): # lookup the xml_id
(module, name) = domain.split('.')
data = self._imd_read(
[('module', '=', module), ('name', '=', name)], 'model res_id')
assert not data or data[0]['model'] == self._name
ids = [res['res_id'] for res in data]
else: # a search domain
assert issearchdomain(domain)
params = searchargs((domain,), {}, context)
ids = self._execute('search', *params)
if len(ids) > 1:
raise ValueError('domain matches too many records (%d)' % len(ids))
return Record(self, ids[0], context=context) if ids else None
[docs] def create(self, values, context=None):
"""Create a :class:`Record`.
The argument `values` is a dictionary of values which are used to
create the record. The newly created :class:`Record` is returned.
"""
values = self._unbrowse_values(values)
new_id = self._execute('create', values, context=context)
return Record(self, new_id, context=context)
def _browse_values(self, values, context=None):
"""Wrap the values of a Record.
The argument `values` is a dictionary of values read from a Record.
When the field type is relational (many2one, one2many or many2many),
the value is wrapped in a Record or a RecordList.
Return a dictionary with the same keys as the `values` argument.
"""
for (key, value) in values.items():
if key == 'id' or hasattr(value, 'id'):
continue
field = self._fields[key]
field_type = field['type']
if field_type == 'many2one':
if value:
rel_model = self.client.model(field['relation'], False)
values[key] = Record(rel_model, value, context=context)
elif field_type in ('one2many', 'many2many'):
rel_model = self.client.model(field['relation'], False)
values[key] = RecordList(rel_model, value, context=context)
elif value and field_type == 'reference':
(res_model, res_id) = value.split(',')
rel_model = self.client.model(res_model, False)
values[key] = Record(rel_model, int(res_id))
return values
def _unbrowse_values(self, values):
"""Unwrap the id of Record and RecordList."""
new_values = values.copy()
for (key, value) in values.items():
field_type = self._fields[key]['type']
if hasattr(value, 'id'):
if field_type == 'reference':
new_values[key] = '%s,%s' % (value._model_name, value.id)
else:
new_values[key] = value = value.id
if field_type in ('one2many', 'many2many'):
if not value:
new_values[key] = [(6, 0, [])]
elif isinstance(value[0], int_types):
new_values[key] = [(6, 0, value)]
return new_values
[docs] def _get_external_ids(self, ids=None):
"""Retrieve the External IDs of the records.
Return a dictionary with keys being the fully qualified
External IDs, and values the ``Record`` entries.
"""
search_domain = [('model', '=', self._name)]
if ids is not None:
search_domain.append(('res_id', 'in', ids))
existing = self._imd_read(search_domain, ['module', 'name', 'res_id'])
res = {}
for rec in existing:
res['%(module)s.%(name)s' % rec] = self.get(rec['res_id'])
return res
def __getattr__(self, attr):
if attr in ('_keys', '_fields'):
self.__dict__[attr] = rv = getattr(self, '_get' + attr)()
return rv
if attr.startswith('_imd_'):
imd = self.client.model('ir.model.data')
self.__dict__[attr] = imd_method = getattr(imd, attr[5:])
return imd_method
if attr.startswith('_'):
raise AttributeError("'Model' object has no attribute %r" % attr)
def wrapper(self, *params, **kwargs):
"""Wrapper for client.execute(%r, %r, *params, **kwargs)."""
return self._execute(attr, *params, **kwargs)
wrapper.__name__ = attr
wrapper.__doc__ %= (self._name, attr)
self.__dict__[attr] = mobj = wrapper.__get__(self, type(self))
return mobj
[docs]class RecordList(object):
"""A sequence of OpenERP :class:`Record`.
It has a similar API as the :class:`Record` class, but for a list of
records. The attributes of the ``RecordList`` are read-only, and they
return list of attribute values in the same order. The ``many2one``,
``one2many`` and ``many2many`` attributes are wrapped in ``RecordList``
and list of ``RecordList`` objects. Use the method ``RecordList.write``
to assign a single value to all the selected records.
"""
def __init__(self, res_model, ids, context=None):
idnames = list(ids)
for (index, id_) in enumerate(ids):
if isinstance(id_, (list, tuple)):
ids[index] = id_ = id_[0]
assert isinstance(id_, int_types), repr(id_)
# Bypass the __setattr__ method
self.__dict__.update({
'id': ids,
'_model_name': res_model._name,
'_model': res_model,
'_idnames': idnames,
'_context': context,
'_execute': res_model._execute,
})
def __repr__(self):
if len(self.id) > 16:
ids = 'length=%d' % len(self.id)
else:
ids = self.id
return "<RecordList '%s,%s'>" % (self._model_name, ids)
def __dir__(self):
return ['__getitem__', 'read', 'write', 'unlink', '_context',
'_idnames', '_model', '_model_name',
'_external_id'] + self._model._keys
def __len__(self):
return len(self.id)
[docs] def read(self, fields=None, context=None):
"""Wrapper for :meth:`Record.read` method."""
if context is None and self._context:
context = self._context
client = self._model.client
if self.id:
values = client.read(self._model_name, self.id,
fields, order=True, context=context)
if is_list_of_dict(values):
browse_values = self._model._browse_values
return [v and browse_values(v) for v in values]
else:
values = []
if isinstance(fields, basestring):
field = self._model._fields.get(fields)
if field:
if field['type'] == 'many2one':
rel_model = client.model(field['relation'], False)
return RecordList(rel_model, values, context=context)
if field['type'] in ('one2many', 'many2many'):
rel_model = client.model(field['relation'], False)
return [RecordList(rel_model, v) for v in values]
if field['type'] == 'reference':
records = []
for value in values:
if value:
(res_model, res_id) = value.split(',')
rel_model = client.model(res_model, False)
value = Record(rel_model, int(res_id))
records.append(value)
return records
return values
[docs] def write(self, values, context=None):
"""Write the `values` in the :class:`RecordList`."""
if not self.id:
return True
if context is None and self._context:
context = self._context
values = self._model._unbrowse_values(values)
rv = self._execute('write', self.id, values, context=context)
return rv
[docs] def unlink(self, context=None):
"""Wrapper for :meth:`Record.unlink` method."""
if not self.id:
return True
if context is None and self._context:
context = self._context
rv = self._execute('unlink', self.id, context=context)
return rv
@property
[docs] def _external_id(self):
"""Retrieve the External IDs of the :class:`RecordList`.
Return the fully qualified External IDs with default value
False if there's none. If multiple IDs exist for a record,
only one of them is returned (randomly).
"""
xml_ids = dict([(r.id, xml_id) for (xml_id, r) in
self._model._get_external_ids(self.id).items()])
return [xml_ids.get(res_id, False) for res_id in self.id]
def __getitem__(self, key):
idname = self._idnames[key]
if idname is False:
return False
cls = RecordList if isinstance(key, slice) else Record
return cls(self._model, idname, context=self._context)
def __getattr__(self, attr):
context = self._context
if attr in self._model._keys:
return self.read(attr, context=context)
if attr.startswith('_'):
errmsg = "'RecordList' object has no attribute %r" % attr
raise AttributeError(errmsg)
def wrapper(self, *params, **kwargs):
"""Wrapper for client.execute(%r, %r, [...], *params, **kwargs)."""
if context:
kwargs.setdefault('context', context)
return self._execute(attr, self.id, *params, **kwargs)
wrapper.__name__ = attr
wrapper.__doc__ %= (self._model_name, attr)
self.__dict__[attr] = mobj = wrapper.__get__(self, type(self))
return mobj
def __setattr__(self, attr, value):
if attr in self._model._keys or attr == 'id':
msg = "attribute %r is read-only; use 'RecordList.write' instead."
else:
msg = "has no attribute %r"
raise AttributeError("'RecordList' object %s" % msg % attr)
[docs]class Record(object):
"""A class for all OpenERP records.
It maps any OpenERP object.
The fields can be accessed through attributes. The changes are immediately
sent to the server.
The ``many2one``, ``one2many`` and ``many2many`` attributes are wrapped in
``Record`` and ``RecordList`` objects. These attributes support writing
too.
The attributes are evaluated lazily, and they are cached in the record.
The Record's cache is invalidated if any attribute is changed.
"""
def __init__(self, res_model, res_id, context=None):
if isinstance(res_id, (list, tuple)):
(res_id, res_name) = res_id
self.__dict__['_name'] = res_name
assert isinstance(res_id, int_types), repr(res_id)
# Bypass the __setattr__ method
self.__dict__.update({
'id': res_id,
'_model_name': res_model._name,
'_model': res_model,
'_context': context,
'_cached_keys': set(),
'_execute': res_model._execute,
})
def __repr__(self):
return "<Record '%s,%d'>" % (self._model_name, self.id)
def __str__(self):
return self._name
def _get_name(self):
try:
(id_name,) = self._execute('name_get', [self.id])
name = '%s' % (id_name[1],)
except Exception:
name = '%s,%d' % (self._model_name, self.id)
self.__dict__['_name'] = name
return name
@property
def _keys(self):
return self._model._keys
@property
def _fields(self):
return self._model._fields
[docs] def refresh(self):
"""Force refreshing the record's data."""
self._cached_keys.discard('id')
for key in self._cached_keys:
delattr(self, key)
self._cached_keys.clear()
def _update(self, values):
new_values = self._model._browse_values(values, context=self._context)
self.__dict__.update(new_values)
self._cached_keys.update(new_values)
return new_values
[docs] def read(self, fields=None, context=None):
"""Read the `fields` of the :class:`Record`.
The argument `fields` accepts different kinds of values.
See :meth:`Client.read` for details.
"""
if context is None and self._context:
context = self._context
rv = self._model.read(self.id, fields, context=context)
if isinstance(rv, dict):
return self._update(rv)
elif isinstance(fields, basestring) and '%(' not in fields:
return self._update({fields: rv})[fields]
return rv
[docs] def perm_read(self, context=None):
"""Read the metadata of the :class:`Record`.
Return a dictionary of values.
See :meth:`Client.perm_read` for details.
"""
rv = self._execute('perm_read', [self.id], context=context)
return rv[0] if rv else None
[docs] def write(self, values, context=None):
"""Write the `values` in the :class:`Record`."""
if context is None and self._context:
context = self._context
values = self._model._unbrowse_values(values)
rv = self._execute('write', [self.id], values, context=context)
self.refresh()
return rv
[docs] def unlink(self, context=None):
"""Delete the current :class:`Record` from the database."""
if context is None and self._context:
context = self._context
rv = self._execute('unlink', [self.id], context=context)
self.refresh()
return rv
[docs] def copy(self, default=None, context=None):
"""Copy a record and return the new :class:`Record`.
The optional argument `default` is a mapping which overrides some
values of the new record.
"""
if context is None and self._context:
context = self._context
if default:
default = self._model._unbrowse_values(default)
new_id = self._execute('copy', self.id, default, context=context)
return Record(self._model, new_id)
[docs] def _send(self, signal):
"""Trigger workflow `signal` for this :class:`Record`."""
exec_workflow = self._model.client.exec_workflow
rv = exec_workflow(self._model_name, signal, self.id)
self.refresh()
return rv
@property
[docs] def _external_id(self):
"""Retrieve the External ID of the :class:`Record`.
Return the fully qualified External ID of the :class:`Record`,
with default value False if there's none. If multiple IDs
exist, only one of them is returned (randomly).
"""
xml_ids = self._model._get_external_ids([self.id])
return list(xml_ids)[0] if xml_ids else False
def _set_external_id(self, xml_id):
"""Set the External ID of this record."""
(mod, name) = xml_id.split('.')
obj = self._model_name
domain = ['|', '&', ('model', '=', obj), ('res_id', '=', self.id),
'&', ('module', '=', mod), ('name', '=', name)]
if self._model._imd_search(domain):
raise ValueError('ID %r collides with another entry' % xml_id)
vals = {'model': obj, 'res_id': self.id, 'module': mod, 'name': name}
self._model._imd_create(vals)
def __dir__(self):
return ['read', 'write', 'copy', 'unlink', '_send', 'refresh',
'_context', '_model', '_model_name', '_name', '_external_id',
'_keys', '_fields'] + self._model._keys
def __getattr__(self, attr):
context = self._context
if attr in self._model._keys:
return self.read(attr, context=context)
if attr == '_name':
return self._get_name()
if attr.startswith('_'):
raise AttributeError("'Record' object has no attribute %r" % attr)
def wrapper(self, *params, **kwargs):
"""Wrapper for client.execute(%r, %r, %d, *params, **kwargs)."""
if context:
kwargs.setdefault('context', context)
res = self._execute(attr, [self.id], *params, **kwargs)
self.refresh()
if isinstance(res, list) and len(res) == 1:
return res[0]
return res
wrapper.__name__ = attr
wrapper.__doc__ %= (self._model_name, attr, self.id)
self.__dict__[attr] = mobj = wrapper.__get__(self, type(self))
return mobj
def __setattr__(self, attr, value):
if attr == '_external_id':
return self._set_external_id(value)
if attr not in self._model._keys:
raise AttributeError("'Record' object has no attribute %r" % attr)
if attr == 'id':
raise AttributeError("'Record' object attribute 'id' is read-only")
self.write({attr: value})
def __eq__(self, other):
return (isinstance(other, Record) and
self.id == other.id and self._model is other._model)
def _interact(global_vars, use_pprint=True, usage=USAGE):
import code
try:
import builtins
_exec = getattr(builtins, 'exec')
except ImportError:
def _exec(code, g):
exec('exec code in g')
import __builtin__ as builtins
if use_pprint:
def displayhook(value, _printer=pprint, _builtins=builtins):
# Pretty-format the output
if value is None:
return
_printer(value)
_builtins._ = value
sys.displayhook = displayhook
class Usage(object):
def __call__(self):
print(usage)
__repr__ = lambda s: usage
builtins.usage = Usage()
try:
import readline as rl
import rlcompleter
rl.parse_and_bind('tab: complete')
# IOError if file missing, or broken Apple readline
rl.read_history_file(HIST_FILE)
except Exception:
pass
else:
if rl.get_history_length() < 0:
rl.set_history_length(int(os.environ.get('HISTSIZE', 500)))
# better append instead of replace?
atexit.register(rl.write_history_file, HIST_FILE)
class Console(code.InteractiveConsole):
def runcode(self, code):
try:
_exec(code, global_vars)
except SystemExit:
raise
except:
# Print readable 'Fault' errors
# Work around http://bugs.python.org/issue12643
(exc_type, exc, tb) = sys.exc_info()
msg = ''.join(format_exception(exc_type, exc, tb, chain=False))
print(msg.strip())
warnings.simplefilter('always', UserWarning)
sys.exc_clear() if hasattr(sys, 'exc_clear') else None # Python 2.x
# Key UP to avoid an empty line
Console().interact('\033[A')
def main():
description = ('Inspect data on OpenERP objects. Use interactively '
'or query a model (-m) and pass search terms or '
'ids as positional parameters after the options.')
parser = optparse.OptionParser(
usage='%prog [options] [search_term_or_id [search_term_or_id ...]]',
version=__version__,
description=description)
parser.add_option(
'-l', '--list', action='store_true', dest='list_env',
help='list sections of the configuration')
parser.add_option(
'--env',
help='read connection settings from the given section')
parser.add_option(
'-c', '--config', default=CONF_FILE,
help='specify alternate config file (default: %r)' % CONF_FILE)
parser.add_option(
'--server', default=DEFAULT_URL,
help='full URL to the XML-RPC server (default: %s)' % DEFAULT_URL)
parser.add_option('-d', '--db', default=DEFAULT_DB, help='database')
parser.add_option('-u', '--user', default=DEFAULT_USER, help='username')
parser.add_option(
'-p', '--password', default=None,
help='password, or it will be requested on login')
parser.add_option(
'-m', '--model',
help='the type of object to find')
parser.add_option(
'-f', '--fields', action='append',
help='restrict the output to certain fields (multiple allowed)')
parser.add_option(
'-i', '--interact', action='store_true',
help='use interactively; default when no model is queried')
parser.add_option(
'-v', '--verbose', default=0, action='count',
help='verbose')
(args, domain) = parser.parse_args()
Client._config_file = os.path.join(os.path.curdir, args.config)
if args.list_env:
print('Available settings: ' + ' '.join(read_config()))
return
if (args.interact or not args.model):
global_vars = Client._set_interactive()
print(USAGE)
if args.env:
client = Client.from_config(args.env, verbose=args.verbose)
else:
client = Client(args.server, args.db, args.user, args.password,
verbose=args.verbose)
if args.model and domain and client.user:
context = {'lang': (os.environ.get('LANG') or 'en_US').split('.')[0]}
data = client.execute(args.model, 'read', domain, args.fields, context)
if not args.fields:
args.fields = ['id']
if data:
args.fields.extend([fld for fld in data[0] if fld != 'id'])
writer = _DictWriter(sys.stdout, args.fields, "", "ignore",
quoting=csv.QUOTE_NONNUMERIC)
writer.writeheader()
writer.writerows(data)
if client.connect is not None:
# Set the globals()
client.connect()
# Enter interactive mode
_interact(global_vars)
if __name__ == '__main__':
main()