Campaign Strategies: Technical Development Tutorial
This comprehensive technical guide provides developers with everything needed to create campaign strategies in Python. Strategies are the core business logic components that control how campaigns operate, manage contact flow, and determine communication sequences.
Prerequisites
- Python 3.12 knowledge and experience
- Understanding of async/await patterns
- Familiarity with dataclasses and type hints
- Basic understanding of campaign concepts
- Access to Flametree campaign interface
Strategy Types and Execution Flow
Campaign strategies execute at specific points in the campaign lifecycle:
Contact Import → Data Load Strategy
↓
Communication Start → Agent Conversation
↓ (after each agent message)
Intermediate Analysis Strategy (optional)
↓
Communication End → Aggregation Strategy
Data Load Strategy
Execution Trigger: When new contacts are imported into the campaign
Purpose:
- Initialize contact data and parameters
- Plan initial communications
- Handle duplicate contact merging
- Set initial contact stages and statuses
Function Signature:
async def run(
contact: Contact,
existing_contact: Contact | None,
logger,
**kwargs
) -> Contact:
Aggregation Strategy
Execution Trigger: After each communication session completes
Purpose:
- Process conversation results and session outcomes
- Determine next actions based on conversation results
- Schedule follow-up communications
- Update contact status and stage
Function Signature:
async def run(
contact: Contact,
session_results: dict | None,
logger,
**kwargs
) -> Contact:
Contact Answer Intermediate Analysis Strategy
Execution Trigger: After each agent response during active conversations
Purpose:
- Monitor ongoing conversations in real-time
- Update contact status during long conversations
- Extend communication timeouts based on engagement
- Make intermediate decisions without waiting for session end
Function Signature:
async def on_agent_answer(
contact: Contact,
message: Message,
**kwargs
) -> Contact | None:
Strategy File Structure
Self-Contained Implementation
Critical Requirement: Strategy files must be completely self-contained. Due to technical implementation constraints, you cannot:
- Import custom modules or split code across multiple files
- Use external dependencies beyond Python 3.12 built-in libraries
- Reference external Python files or packages
Allowed Imports: Only Python 3.12 standard library modules:
import datetime
import zoneinfo
import json
import asyncio
import logging
from dataclasses import dataclass
from typing import Literal, Optional, Dict, List
# ... other standard library modules
Multi-Strategy File Organization
For better code organization and reusability, you can implement multiple strategies in a single file:
"""
Campaign Strategy File - Complete Implementation
Contains all strategies for the campaign with shared utilities
"""
import datetime
import zoneinfo
from dataclasses import dataclass
from typing import Literal, Optional, Dict, Tuple
from flametree.api.marketing.types import (
Contact, Communication, CommunicationStatus, CommunicationChannel,
ContactStatus, StatusCommunication, Message, MasterRecord, Attachment
)
# ---- Shared Constants and Configuration ---- #
TIMEZONE = zoneinfo.ZoneInfo("America/Mexico_City")
COMMUNICATION_HOURS = (9, 23)
MAX_RETRIES = 3
# ---- Shared Utility Classes (see modules below) ---- #
class OutcomeResolver:
# Implementation here...
pass
class CallScheduler:
# Implementation here...
pass
# ---- Strategy Functions ---- #
async def data_load_strategy(contact: Contact, existing_contact: Contact | None, logger, **kwargs) -> Contact:
"""Data Load Strategy Implementation"""
# Implementation logic here
return contact
async def aggregation_strategy(contact: Contact, session_results: dict | None, logger, **kwargs) -> Contact:
"""Aggregation Strategy Implementation"""
# Implementation logic here
return contact
async def intermediate_analysis_strategy(contact: Contact, message: Message, **kwargs) -> Contact | None:
"""Intermediate Analysis Strategy Implementation"""
# Implementation logic here
return contact
Strategy Assignment: When uploading the strategy file, specify the exact function name that corresponds to each strategy type in the campaign interface.
Core Data Structures
Contact Object
The Contact object is the primary data structure strategies work with:
@dataclass
class Contact:
# System identification fields (can be None)
id: str | None
external_id: str | None # Person identifier in external system
name: str | None
email: str | None
phone: str | None
object_id: str | None # Campaign subject identifier (e.g., contract ID)
# Campaign management fields
current_stage: str | None
status: ContactStatus # Success, Positive, Uncertain, Negative, Error
communications: list[Communication]
# Data storage
external_data: dict[str, Any] | None # Imported parameters (read-only)
internal_data: dict[str, Any] | None # Strategy-managed data
campaign_results: dict[str, Any] | None # Campaign results data
global_heap: dict[str, Any] | None # Campaign-wide shared data
# Master record reference
master_record: MasterRecord | None
Communication Object
Represents individual communication instances:
@dataclass
class Communication:
id: str | None
name: str # Human-readable communication name
stage_name: str # Stage this communication belongs to
start_date: datetime
finish_date: datetime
status: StatusCommunication # Current status with optional message
parameters: dict[str, Any] | None # Channel-specific parameters
flagged: bool = False # True for currently active communication
session_id: str | None # Associated session identifier
session_status: SessionStatus | None # Final session status
result: dict[str, Any] | None # Communication outcome data
def interrupt(self, message: str | None):
"""Cancel planned or stop active communication"""
self.status = StatusCommunication(
status=CommunicationStatus.INTERRUPTED,
message=message
)
Contact Status Enum
class ContactStatus(Enum):
Success = "Success" # Campaign goal achieved
Positive = "Positive" # Progress towards campaign goal
Uncertain = "Uncertain" # Default, no clear outcome yet
Negative = "Negative" # Campaign unsuccessful
Error = "Error" # Technical error occurred
Communication Status Enum
class CommunicationStatus(Enum):
PLANNED = "PLANNED" # Communication scheduled but not started
STARTED = "STARTED" # Communication is currently active
COMPLETED = "COMPLETED" # Communication finished successfully
INTERRUPTED = "INTERRUPTED" # Communication was cancelled
ERROR = "ERROR" # Communication failed due to error
Communication Channels
class CommunicationChannel(Enum):
telegram = "telegram" # Telegram messaging
whatsapp = "whatsapp" # WhatsApp messaging
chatwoot = "chatwoot" # Chatwoot CRM integration
intercom = "intercom" # Intercom platform
twilio = "twilio" # Twilio voice calls
email = "email" # Email communication
facebook = "facebook" # Facebook Messenger
human = "human" # Human agent handling
email_agent = "email_agent" # AI email agent
message_bird = "message_bird" # MessageBird WhatsApp
sip = "sip" # SIP voice calls
birdapiservice = "birdapiservice" # Bird API service
none = "none" # No specific channel
push = "push" # Push notifications
Message Object
Represents individual messages within conversations:
@dataclass
class Message:
id: str # Unique message identifier
number: int # Message sequence number in conversation
text: str # Message content
session_results: dict # Session results at time of message
meta: Optional[dict] # Additional metadata
attachments: list[Attachment] # Message attachments
👀 Note:
session_results
contains the conversation results collected up to that message point.
Attachment Object
Represents file attachments in messages:
@dataclass
class Attachment:
id: str # Unique attachment identifier
content_type: str # MIME type of the file
file_name: str # Original filename
file_path: str # Path to stored file
Master Record Object
Contains core contact information:
@dataclass
class MasterRecord:
id: str | None # Master record identifier
external_id: str | None # External system identifier
name: str | None # Contact name
email: str | None # Email address
phone: str | None # Phone number
Status Communication Object
Wraps communication status with optional message:
@dataclass
class StatusCommunication:
status: CommunicationStatus # Current status
message: str | None = None # Optional status message
Working with Nullable Fields
Many Contact fields can be None
, requiring careful handling in strategies:
def safe_contact_access(contact: Contact) -> dict:
"""Example of safely accessing potentially None fields"""
# Always check external_data before accessing
template_id = None
if contact.external_data:
template_id = contact.external_data.get("templateId")
# Initialize internal_data if None
if contact.internal_data is None:
contact.internal_data = {}
# Initialize global_heap if None
if contact.global_heap is None:
contact.global_heap = {}
# Safe access to campaign_results
campaign_results = contact.campaign_results or {}
return {
"template_id": template_id,
"has_external_data": contact.external_data is not None,
"internal_data_count": len(contact.internal_data),
"global_heap_count": len(contact.global_heap),
"campaign_results_count": len(campaign_results)
}
Reusable Strategy Modules
Outcome Resolution Module
Manages contact status transitions and stage mapping:
NUMBER_ROTATION_LENGTH = 4
TIMEZONE = zoneinfo.ZoneInfo("America/Mexico_City")
@dataclass(frozen=True, slots=True)
class OutcomeCase:
"""Maps outcomes to stages and statuses"""
stage_name: str
contact_status: ContactStatus
apply_rotation: bool = False
class OutcomeResolver:
"""Centralized outcome mapping and stage management"""
# Define outcome mappings
NOT_INTERESTED = OutcomeCase("Not interested", ContactStatus.Negative)
FOLLOW_UP = OutcomeCase("In process", ContactStatus.Positive)
REMINDER = OutcomeCase("Reminder scheduled", ContactStatus.Positive, apply_rotation=True)
TRANSFER_TO_AGENT = OutcomeCase("Transferred to human agent", ContactStatus.Uncertain)
FINISHED = OutcomeCase("Finished", ContactStatus.Success)
PROMISE_TO_FINISH = OutcomeCase("Promise to finish", ContactStatus.Success)
NO_RESPONSE = OutcomeCase("No response", ContactStatus.Uncertain)
# Internal stages
_INITIAL_STAGE = OutcomeCase("Outbound initial", ContactStatus.Uncertain, apply_rotation=True)
_FIRST_MSG_SENT = OutcomeCase("Awaiting for response", ContactStatus.Uncertain)
@classmethod
def get_entry(cls, key: str, default: OutcomeCase | None = None) -> OutcomeCase:
"""Get outcome case by key with fallback"""
if default is None:
default = cls.NO_RESPONSE
return getattr(cls, key, default)
@classmethod
def get_actual_status(cls, contact: Contact, session_results: dict,
force_outcome_status: str | None = None) -> Tuple[str, OutcomeCase]:
"""Determine contact status and stage from session results"""
if force_outcome_status is None:
outcome_status = session_results.get("OutcomeStatus", "NO_RESPONSE")
else:
outcome_status = force_outcome_status
outcome_case = cls.get_entry(outcome_status)
# Apply number rotation if needed
stage_name = outcome_case.stage_name
if outcome_case.apply_rotation:
stage_name = cls._apply_number_rotation(stage_name, contact)
modified_outcome_case = OutcomeCase(
stage_name=stage_name,
contact_status=outcome_case.contact_status,
apply_rotation=False
)
return outcome_status, modified_outcome_case
return outcome_status, outcome_case
@classmethod
def _apply_number_rotation(cls, stage_name: str, contact: Contact) -> str:
"""Apply number rotation to stage name"""
rotation_id: int | None = contact.internal_data.get("rotation_number_id")
if rotation_id is None:
rotation_id = cls._produce_new_number_rotation(contact)
contact.internal_data["rotation_number_id"] = rotation_id
return f"{stage_name} - {rotation_id}"
@staticmethod
def _produce_new_number_rotation(contact: Contact, method: Literal["counter", "random", "by_day"] = "counter") -> int:
"""Generate new rotation number using specified method"""
match method:
case "random":
rotation_id = (
int(datetime.datetime.now(tz=TIMEZONE).timestamp())
% NUMBER_ROTATION_LENGTH
)
case "counter":
rotation_id = contact.global_heap.get("latest_registered_rotation_number_id", -1)
rotation_id = (rotation_id + 1) % NUMBER_ROTATION_LENGTH
contact.global_heap["latest_registered_rotation_number_id"] = rotation_id
case "by_day":
rotation_id = (
datetime.datetime.now(tz=TIMEZONE).date().toordinal()
% NUMBER_ROTATION_LENGTH
)
case _:
raise ValueError(f"Unknown rotation method: {method}")
return rotation_id
Communication Scheduling Module
Manages time-based communication scheduling with rate limiting:
# Configuration constants
COMMUNICATION_HOURS_INTERVAL = (9, 23) # Allowed hours for communication
COMMUNICATION_COUNT_PER_INTERVAL = 7 # Max communications per time bucket
COMMUNICATION_INTERVAL_MINUTES = 59 # Bucket duration in minutes
TIMEZONE = zoneinfo.ZoneInfo("America/Mexico_City")
class CallScheduler:
"""Advanced communication scheduler with rate limiting and time windows"""
@classmethod
async def register_call(cls, storage: Dict) -> Tuple[Optional[datetime.datetime], Dict]:
"""
Reserve the next available communication slot
Returns (scheduled_time, updated_storage) or (None, storage) if no slots available
"""
# Initialize scheduler data
schedule_data = storage.setdefault("schedule_data", {})
bucket_counts = schedule_data.setdefault("bucket_counts", {})
now = datetime.datetime.now(tz=TIMEZONE)
today = now.date()
# Clean old bucket data
cls._clean_old_buckets(bucket_counts, today, TIMEZONE)
# Try to find slot today (not earlier than current time)
slot_time = cls._find_slot_for_date(bucket_counts, today, from_time=now, timezone=TIMEZONE)
# If not found today, try tomorrow
if slot_time is None:
tomorrow = today + datetime.timedelta(days=1)
slot_time = cls._find_slot_for_date(bucket_counts, tomorrow, timezone=TIMEZONE)
if slot_time is None:
return None, storage
# Reserve the slot
bucket_start = cls._get_bucket_start(slot_time, TIMEZONE)
bucket_key = str(int(bucket_start.timestamp()))
bucket_counts[bucket_key] = bucket_counts.get(bucket_key, 0) + 1
# Update last used bucket
schedule_data["last_bucket"] = int(bucket_start.timestamp())
return slot_time, storage
@classmethod
def _clean_old_buckets(cls, bucket_counts: Dict[str, int], current_date: datetime.date, timezone):
"""Remove expired bucket data"""
day_start = datetime.datetime.combine(current_date, datetime.time.min, tzinfo=timezone)
cutoff_timestamp = int(day_start.timestamp())
keys_to_remove = [key for key in bucket_counts.keys() if int(key) < cutoff_timestamp]
for key in keys_to_remove:
del bucket_counts[key]
@classmethod
def _find_slot_for_date(cls, bucket_counts: Dict[str, int], date: datetime.date,
from_time: Optional[datetime.datetime] = None, timezone=None) -> Optional[datetime.datetime]:
"""Find first available slot for specified date"""
start_hour, end_hour = COMMUNICATION_HOURS_INTERVAL
day_start = datetime.datetime(date.year, date.month, date.day, start_hour, 0, tzinfo=timezone)
day_end = datetime.datetime(date.year, date.month, date.day, end_hour, 0, tzinfo=timezone)
search_start = max(day_start, from_time) if from_time else day_start
bucket_start = cls._align_to_bucket_boundary(search_start, day_start)
bucket_duration = datetime.timedelta(minutes=COMMUNICATION_INTERVAL_MINUTES)
while bucket_start + bucket_duration <= day_end:
bucket_key = str(int(bucket_start.timestamp()))
current_count = bucket_counts.get(bucket_key, 0)
if current_count < COMMUNICATION_COUNT_PER_INTERVAL:
slot_offset = (bucket_duration / COMMUNICATION_COUNT_PER_INTERVAL) * current_count
slot_time = bucket_start + slot_offset
if from_time is None or slot_time >= from_time:
return slot_time
bucket_start += bucket_duration
return None
@classmethod
def _align_to_bucket_boundary(cls, target_time: datetime.datetime, day_start: datetime.datetime) -> datetime.datetime:
"""Align time to nearest bucket boundary"""
minutes_from_start = (target_time - day_start).total_seconds() / 60
bucket_index = int(minutes_from_start // COMMUNICATION_INTERVAL_MINUTES)
if minutes_from_start % COMMUNICATION_INTERVAL_MINUTES > 0:
bucket_index += 1
return day_start + datetime.timedelta(minutes=bucket_index * COMMUNICATION_INTERVAL_MINUTES)
@classmethod
def _get_bucket_start(cls, slot_time: datetime.datetime, timezone) -> datetime.datetime:
"""Get bucket start time for given slot"""
day_start = datetime.datetime.combine(
slot_time.date(),
datetime.time(COMMUNICATION_HOURS_INTERVAL[0]),
tzinfo=timezone
)
minutes_from_start = (slot_time - day_start).total_seconds() / 60
bucket_index = int(minutes_from_start // COMMUNICATION_INTERVAL_MINUTES)
return day_start + datetime.timedelta(minutes=bucket_index * COMMUNICATION_INTERVAL_MINUTES)
@classmethod
def adjust_time_to_business_hours(cls, planned_time: datetime.datetime) -> datetime.datetime:
"""Adjust planned time to fall within business hours"""
start_hour, end_hour = COMMUNICATION_HOURS_INTERVAL
hour = planned_time.hour
# If within allowed interval
if start_hour <= hour < end_hour:
return planned_time
# If before allowed interval on same day
if hour < start_hour:
return planned_time.replace(hour=start_hour, minute=0, second=0, microsecond=0)
# If after end of allowed interval, shift to next day
next_day = planned_time + datetime.timedelta(days=1)
return next_day.replace(hour=start_hour, minute=0, second=0, microsecond=0)
Communication Creation Helper
Standardizes communication creation with proper parameters:
EMPTY_DATA_PLACEHOLDER = "unknown"
async def create_whatsapp_communication(
contact: Contact,
stage_name: str,
name: str,
start_at: datetime.datetime,
duration_min: int = 1440, # 24 hours default
extra_params: Dict | None = None,
language: str = "en"
) -> Communication:
"""Create a WhatsApp communication with standardized parameters"""
# Build environment information
env_info = contact.external_data.copy() if contact.external_data else {}
env_info.update({
"Client Name": contact.name or EMPTY_DATA_PLACEHOLDER,
"Client Email": contact.email or EMPTY_DATA_PLACEHOLDER,
"Client Phone": contact.phone or EMPTY_DATA_PLACEHOLDER,
})
if extra_params:
env_info.update(extra_params)
parameters = {
"env_info": env_info,
"session_info": env_info,
"language": language,
}
communication = Communication(
id=None,
name=name,
stage_name=stage_name,
start_date=start_at,
finish_date=start_at + datetime.timedelta(minutes=duration_min),
status=StatusCommunication(status=CommunicationStatus.PLANNED, message=None),
parameters=parameters,
session_id=None,
session_status=None,
result=None,
flagged=False
)
return communication
Retry Logic Module
Manages communication retry attempts with configurable limits:
class RetryManager:
"""Manages retry attempts for failed communications"""
def __init__(self, max_retries: int = 3, retry_delays: Dict[str, int] = None):
self.max_retries = max_retries
self.retry_delays = retry_delays or {
"NO_RESPONSE": 1440, # 24 hours
"PROMISE_TO_FINISH": 1440, # 24 hours
"DEFAULT": 720 # 12 hours
}
def should_retry(self, contact: Contact, outcome_status: str) -> bool:
"""Check if contact should be retried based on attempt count"""
attempts = contact.internal_data.get("_communication_attempts", 0)
return attempts < self.max_retries
def get_retry_delay(self, outcome_status: str) -> int:
"""Get retry delay in minutes for given outcome"""
return self.retry_delays.get(outcome_status, self.retry_delays["DEFAULT"])
def increment_attempts(self, contact: Contact) -> int:
"""Increment and return attempt count"""
attempts = contact.internal_data.get("_communication_attempts", 0) + 1
contact.internal_data["_communication_attempts"] = attempts
return attempts
def reset_attempts(self, contact: Contact):
"""Reset attempt counter"""
contact.internal_data["_communication_attempts"] = 0
Complete Strategy Implementation Examples
Data Load Strategy Example
TIMEZONE = zoneinfo.ZoneInfo("America/Mexico_City")
TARGET_TEMPLATE_IDS = ("DocumentVerificationIsWaiting", "AppLastStep")
INITIAL_STAGE_NAME = "Outbound initial"
FINISH_STAGE_NAME = "Finished"
SKIP_STAGES = {"Finished", "Reminder scheduled", "Outbound initial"}
async def data_load_strategy(contact: Contact, existing_contact: Contact | None, logger, **kwargs) -> Contact:
"""
Initialize contacts and schedule first communication
Handles duplicate contacts and validates target criteria
"""
now = datetime.datetime.now(tz=TIMEZONE)
# Initialize data structures if None
if contact.internal_data is None:
contact.internal_data = {}
if contact.global_heap is None:
contact.global_heap = {}
# Validate contact has required template ID
template_id = contact.external_data.get("templateId") if contact.external_data else None
if template_id is None:
logger.error(f"Contact {contact.id} missing templateId")
contact.current_stage = FINISH_STAGE_NAME
return contact
# Check if contact meets target criteria
if template_id not in TARGET_TEMPLATE_IDS:
logger.info(f"Contact {contact.id} with template {template_id} not in target")
contact.current_stage = FINISH_STAGE_NAME
contact.status = ContactStatus.Success
return contact
# Skip already processed contacts
if (existing_contact and
existing_contact.current_stage in SKIP_STAGES):
logger.info(f"Contact {contact.id} already processed")
return contact
# Cancel any existing planned communications
if existing_contact:
for comm in contact.communications:
if comm.status.status == CommunicationStatus.PLANNED:
comm.interrupt("Interrupted by new load")
# Schedule initial communication using CallScheduler
start_time, contact.global_heap = await CallScheduler.register_call(contact.global_heap)
if start_time is None:
logger.warning(f"No available slots for contact {contact.id}")
return contact
# Create first communication
_, outcome_case = OutcomeResolver.get_actual_status(
contact, {}, force_outcome_status="_INITIAL_STAGE"
)
first_comm = await create_whatsapp_communication(
contact=contact,
stage_name=outcome_case.stage_name,
name="Initial WhatsApp Message",
start_at=start_time,
language="es"
)
contact.communications.append(first_comm)
contact.current_stage = outcome_case.stage_name
return contact
Aggregation Strategy Example
TIMEZONE = zoneinfo.ZoneInfo("America/Mexico_City")
async def aggregation_strategy(contact: Contact, session_results: dict | None, logger, **kwargs) -> Contact:
"""
Process completed sessions and determine next actions
Handles various conversation outcomes and schedules follow-ups
"""
retry_manager = RetryManager(max_retries=3)
now = datetime.datetime.now(tz=TIMEZONE)
actual_session_results = session_results or {}
# Initialize data structures if None
if contact.internal_data is None:
contact.internal_data = {}
# Mark current communication as completed
for comm in contact.communications:
if comm.flagged:
comm.status.status = CommunicationStatus.COMPLETED
logger.info(f"Communication completed with status: {comm.session_status}")
# Determine outcome and update contact
outcome_status, outcome_case = OutcomeResolver.get_actual_status(
contact, actual_session_results
)
contact.status = outcome_case.contact_status
contact.current_stage = outcome_case.stage_name
# Handle specific outcomes
match outcome_status:
case "REMINDER":
# Schedule reminder for specific date
reminder_date_str = actual_session_results.get("FollowUpDate")
try:
reminder_dt = datetime.datetime.fromisoformat(reminder_date_str)
if reminder_dt.tzinfo is None:
reminder_dt = reminder_dt.replace(tzinfo=TIMEZONE)
except Exception as e:
logger.error(f"Invalid reminder date: {reminder_date_str} - {e}")
reminder_dt = now + datetime.timedelta(days=1)
reminder_comm = await create_whatsapp_communication(
contact=contact,
stage_name="Reminder scheduled",
name="Scheduled Reminder",
start_at=reminder_dt,
language="es"
)
contact.communications.append(reminder_comm)
case "NO_RESPONSE" | "PROMISE_TO_FINISH":
# Handle retry logic
if retry_manager.should_retry(contact, outcome_status):
attempts = retry_manager.increment_attempts(contact)
delay_minutes = retry_manager.get_retry_delay(outcome_status)
retry_time = CallScheduler.adjust_time_to_business_hours(
now + datetime.timedelta(minutes=delay_minutes)
)
retry_comm = await create_whatsapp_communication(
contact=contact,
stage_name="Outbound initial",
name=f"Follow-up #{attempts}",
start_at=retry_time,
language="es"
)
contact.communications.append(retry_comm)
else:
logger.info(f"Max retries exceeded for contact {contact.id}")
case "TRANSFER_TO_AGENT":
# Log transfer, no additional action needed
logger.info(f"Contact {contact.id} transferred to human agent")
case "FINISHED":
# Mark as successfully completed
retry_manager.reset_attempts(contact)
logger.info(f"Contact {contact.id} successfully completed campaign")
return contact
Intermediate Analysis Strategy Example
TIMEZONE = zoneinfo.ZoneInfo("America/Mexico_City")
CHAT_DURATION_MIN = 1440 # 24 hours
SHORT_CHAT_DURATION_MIN = 180 # 3 hours for quick cases
async def intermediate_analysis_strategy(contact: Contact, message: Message, **kwargs) -> Contact | None:
"""
Monitor conversations in real-time and update contact status
Extends conversation timeouts and tracks engagement
"""
now = datetime.datetime.now(tz=TIMEZONE)
session_results = message.session_results or {}
# Initialize internal_data if None
if contact.internal_data is None:
contact.internal_data = {}
# Update timestamp for latest activity
contact.internal_data["_last_message_timestamp_int"] = str(int(now.timestamp()))
# Handle first message sent
if message.number == 1:
_, outcome_case = OutcomeResolver.get_actual_status(
contact, session_results, force_outcome_status="_FIRST_MSG_SENT"
)
contact.status = outcome_case.contact_status
contact.current_stage = outcome_case.stage_name
# Handle subsequent messages (user engagement)
elif message.number >= 2:
outcome_status, outcome_case = OutcomeResolver.get_actual_status(
contact, session_results
)
contact.status = outcome_case.contact_status
contact.current_stage = outcome_case.stage_name
# Extend communication timeout when user is engaged
for communication in contact.communications:
if communication.flagged:
# Use shorter duration for certain outcomes
if outcome_case.stage_name in ("SCHEDULE_REMINDER",):
new_finish = now + datetime.timedelta(minutes=SHORT_CHAT_DURATION_MIN)
else:
new_finish = now + datetime.timedelta(minutes=CHAT_DURATION_MIN)
communication.finish_date = new_finish
return contact
Working with Campaign and Communication Results
Strategies can access and update both campaign results and individual communication results:
def process_campaign_results(contact: Contact, session_results: dict) -> Contact:
"""Example of working with campaign and communication results"""
# Initialize campaign_results if None
if contact.campaign_results is None:
contact.campaign_results = {}
# Extract data from session results
customer_name = session_results.get("CustomerName")
payment_promise = session_results.get("PromiseToPayDate")
debt_verified = session_results.get("DebtVerified", False)
# Update campaign results (visible in Contact interface)
if customer_name:
contact.campaign_results["verified_name"] = customer_name
if payment_promise:
contact.campaign_results["promised_payment_date"] = payment_promise
if debt_verified:
contact.campaign_results["debt_acknowledged"] = True
# Store results in active communication
for comm in contact.communications:
if comm.flagged:
if comm.result is None:
comm.result = {}
comm.result.update({
"outcome_status": session_results.get("OutcomeStatus", "UNKNOWN"),
"conversation_length": session_results.get("MessageCount", 0),
"resolution_achieved": session_results.get("ResolutionAchieved", False)
})
return contact
Advanced Strategy Patterns
Dynamic Campaign Parameters
def get_dynamic_parameters(contact: Contact, base_params: dict) -> dict:
"""Generate dynamic parameters based on contact data"""
params = base_params.copy()
# Safe access to external data
external_data = contact.external_data or {}
debt_amount = external_data.get("amount", "0")
due_date = external_data.get("due_date", "")
params["env_info"].update({
"Personalized_Amount": f"${debt_amount}",
"Days_Overdue": calculate_days_overdue(due_date),
"Customer_Segment": determine_customer_segment(contact),
})
return params
Testing and Debugging
Logging Best Practices
async def example_strategy_with_logging(contact: Contact, session_results: dict | None, logger, **kwargs) -> Contact:
"""Example showing comprehensive logging practices"""
# Log strategy entry
logger.info(f"STRATEGY: Processing contact {contact.id} in stage {contact.current_stage}")
try:
# Log important data points
logger.debug(f"Session results: {session_results}")
logger.debug(f"Contact external data: {contact.external_data}")
# Log business logic decisions
outcome_status = session_results.get("OutcomeStatus", "NO_RESPONSE") if session_results else "NO_RESPONSE"
logger.info(f"STRATEGY: Outcome status determined as: {outcome_status}")
# Process outcome...
# Log final state
logger.info(f"STRATEGY: Contact {contact.id} moved to stage {contact.current_stage} with status {contact.status}")
return contact
except Exception as e:
logger.error(f"STRATEGY: Error processing contact {contact.id}: {str(e)}")
# Return contact unchanged on error
return contact
Error Handling Patterns
def safe_data_extraction(contact: Contact, key: str, default_value: any = None) -> any:
"""Safely extract data from contact with fallback"""
try:
if contact.external_data is None:
return default_value
value = contact.external_data.get(key, default_value)
if value is None or value == "":
return default_value
return value
except Exception:
return default_value
def safe_datetime_parsing(date_string: str, timezone, fallback_days: int = 1) -> datetime.datetime:
"""Safely parse datetime with fallback"""
try:
dt = datetime.datetime.fromisoformat(date_string)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone)
return dt
except Exception:
return datetime.datetime.now(tz=timezone) + datetime.timedelta(days=fallback_days)
Data Initialization Best Practices
Critical: Always initialize None fields before use to prevent errors:
def initialize_contact_data(contact: Contact):
"""Initialize contact data structures to prevent None errors"""
# Initialize internal_data for strategy use
if contact.internal_data is None:
contact.internal_data = {}
# Initialize global_heap for campaign-wide data
if contact.global_heap is None:
contact.global_heap = {}
# Initialize campaign_results for result storage
if contact.campaign_results is None:
contact.campaign_results = {}
async def safe_strategy_wrapper(contact: Contact, *args, **kwargs) -> Contact:
"""Wrapper to ensure data initialization in all strategies"""
# Always initialize before processing
initialize_contact_data(contact)
# Your strategy logic here...
return contact
Performance Optimization
Efficient Data Access
def optimize_contact_processing(contact: Contact) -> Contact:
"""Optimize contact data access patterns"""
# Cache frequently accessed values with None checks
contact_data = contact.external_data or {}
internal_data = contact.internal_data or {}
# Initialize internal_data if None
if contact.internal_data is None:
contact.internal_data = {}
# Batch updates to internal_data
updates = {}
updates["last_processed"] = datetime.datetime.now().isoformat()
updates["processing_version"] = "2.0"
contact.internal_data.update(updates)
return contact
Integration Examples
External API Integration
import json
from urllib.request import Request, urlopen
from urllib.error import URLError
API_URL = "https://api.example-crm.com/contacts/update"
async def update_external_crm(contact: Contact, outcome: str) -> bool:
"""Example external CRM update (using only standard library)"""
try:
# Prepare API request
payload = {
"external_id": contact.external_id,
"campaign_outcome": outcome,
"last_contact_date": datetime.datetime.now().isoformat(),
"current_stage": contact.current_stage
}
# Make HTTP request using urllib (standard library)
request = Request(
API_URL,
data=json.dumps(payload).encode('utf-8'),
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer YOUR_API_TOKEN'
}
)
with urlopen(request, timeout=10) as response:
result = json.loads(response.read().decode('utf-8'))
return result.get('status') == 'success'
except URLError as e:
# Log error but don't fail the strategy
print(f"CRM update failed: {e}")
return False
Deployment Checklist
Pre-Upload Validation
- All required imports are Python 3.12 standard library only
- No external module dependencies or custom imports
- Strategy functions have correct signatures
- All constants and configuration are defined within the file
- Error handling is implemented for external data access
- Logging statements are included for debugging
- Test scenarios have been validated
Function Naming Convention
When uploading strategies to the platform:
- Data Load Strategy: Function name (e.g.,
data_load_strategy
) - Aggregation Strategy: Function name (e.g.,
aggregation_strategy
) - Intermediate Analysis: Function name (e.g.,
intermediate_analysis_strategy
)
Ensure the exact function names are specified in the campaign strategy configuration interface.
FAQ
Can I use external libraries in my strategy?
No, strategy files must be completely self-contained and can only use Python 3.12 standard library modules.
How do I share code between multiple strategies?
Implement multiple strategies in a single file with shared utility classes and functions at the top of the file.
What happens if my strategy throws an exception?
The contact will be returned unchanged. Always implement proper error handling and logging to prevent strategy failures.
Can I modify the Contact object in intermediate analysis?
Yes, but changes should be minimal and focused on real-time monitoring. Major logic should remain in the aggregation strategy.
Related Resources
- Campaign Guide - Understanding campaign structure and setup
- Agent Configuration - Configuring agents for campaigns
- Sessions - Monitoring campaign conversations
- Integrations Guide - Setting up communication channels
- Python 3.12 Documentation - Official Python documentation