from odoo import models, fields, api,_ from odoo.exceptions import UserError, ValidationError,Warning from psycopg2 import sql, DatabaseError from werkzeug import utils import json import logging import xlwt from io import BytesIO import base64 import csv import os,secrets _logger = logging.getLogger(__name__) class opendons_ensemble(models.Model): _name = 'opendons.ensemble' _description = 'operation marketing ensemble : an ensemble is a part of contacts selected for an segment ' _inherit = ['mail.thread'] name=fields.Char(string='Name',required=True, translate=True,track_visibility='always') logical_operator = fields.Selection([('union','union'),('inter','intersection')],'Type',default='union') mailing_domain = fields.Char(string='partners selection') partner_count = fields.Integer(string="partners count",compute='_count_partner_ensemble',readonly=True) #partner_ids = fields.Many2many('res.partner', 'partner_segment_rel', 'partner_id', 'segment_id', string='partners') sequence = fields.Integer(string="sequence", default=10) segment_id = fields.Many2one( 'opendons.segment','Segment', index=True, readonly=True, track_visibility='onchange', ondelete='cascade') request_ids = fields.One2many( 'opendons.request', 'ensemble_id', string='Request', required=True, track_visibility='onchange') request_count = fields.Integer( compute='_compute_request_count', string="# of request", readonly=True ) csv_export = fields.Binary('csv export', filters='.csv', readonly=True) document_fname=fields.Char() @api.depends('mailing_domain') def _count_partner_ensemble(self): for rec in self: if rec.mailing_domain : rec.partner_count=self.env['res.partner'].search_count(eval(rec.mailing_domain)) else: rec.partner_count=0 def csv_export_ensemble(self): with open('export.csv', mode='w') as file: writer = csv.writer(file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) # create a row contains heading of each column writer.writerow(['name', 'email']) # fetch products and write respective data. partners = self.env['res.partner'].sudo().search([]) for p in partners: name=p.name email=p.email writer.writerow([name,email]) with open('export.csv', 'r', encoding="utf-8") as f2: # file encode and store in a variable ‘data’ data = str.encode(f2.read(), 'utf-8') self.csv_export=base64.encodestring(data) self.document_fname='contacts.csv' return def _compute_request_count(self): # The current user may not have access rights for donations for ensemble in self: try: ensemble.request_count = len(ensemble.request_ids) except Exception: ensemble.request_count = 0 def update_segment_domain(self,segment): logical_operator=segment.logical_operator if logical_operator=='union': str_operator='"|",' if logical_operator=='inter': str_operator='"&",' mailing_domain='' if len(segment.ensemble_ids)>1: i=1 for ens in segment.ensemble_ids: if i==1 and ens.mailing_domain!=False: mailing_domain=ens.mailing_domain[1:-1] if i>1 : if ens.mailing_domain!=False: mailing_domain=str_operator +mailing_domain+","+ens.mailing_domain[1:-1] i+=1 mailing_domain="["+mailing_domain+"]" if len(segment.ensemble_ids)==1: for ens in segment.ensemble_ids: mailing_domain=ens.mailing_domain return mailing_domain @api.model def create(self,vals): res=super(opendons_ensemble, self).create(vals) segment=self.env['opendons.segment'].search([('id','=',int(res.segment_id))]) #_logger.error('segment.id='+str(res.operation_id)) segment_domain=self.update_segment_domain(segment) segment.write( { 'ensemble_count':len(segment.ensemble_ids), 'mailing_domain':segment_domain, }) operation=self.env['opendons.operation'].search([('id','=',int(segment.operation_id))]) __last_update=fields.Date.context_today(self) operation.write({'__last_update':__last_update}) return res def write(self,vals): res=super(opendons_ensemble, self).write(vals) mailing_domain=False for val in vals: if val=='logical_operator': mailing_domain=self.env['opendons.request'].update_ensemble_domain(self) if mailing_domain: vals['mailing_domain']=mailing_domain segment=self.env['opendons.segment'].search([('id','=',int(self.segment_id))]) segment_domain=self.update_segment_domain(segment) segment.write({ 'ensemble_count':len(segment.ensemble_ids), 'mailing_domain':segment_domain }) segment=self.env['opendons.segment'].search([('id','=',int(self.segment_id))]) operation=self.env['opendons.operation'].search([('id','=',int(segment.operation_id))]) __last_update=fields.Date.context_today(self) operation.write({'__last_update':__last_update}) res=super(opendons_ensemble, self).write(vals) def copy(self,default_values=None): vals={} vals['name']='copy of ' + self.name vals['segment_id']=int(self.segment_id) vals['mailing_domain']=self.mailing_domain dup_ensemble=super(opendons_ensemble, self).create(vals) #on duplique les requêtes de l'ensemble for request_id in self.request_ids: #raise Warning(segment_id.id) req=self.env['opendons.request'].search([('id','=',int(request_id.id))]) if req: vals3={} vals3['ensemble_id']=dup_ensemble.id vals3['name']=req.name vals3['mailing_domain']=req.mailing_domain #dup_segment=super(opendons_segment,self).create(vals2) dup_request=self.env['opendons.request'].create(vals3) dup_ensemble.write({'request_ids':[(4,dup_request.id)]}) #rtn=super(opendons_operation,self).copy(default=default_values) return dup_ensemble