Skip to content

feat: add odp manager #314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion lib/optimizely/helpers/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ module Constants
FETCH_SEGMENTS_FAILED: 'Audience segments fetch failed (%s).',
ODP_EVENT_FAILED: 'ODP event send failed (%s).',
ODP_NOT_ENABLED: 'ODP is not enabled.',
ODP_NOT_INTEGRATED: 'ODP is not integrated.'
ODP_NOT_INTEGRATED: 'ODP is not integrated.',
ODP_INVALID_DATA: 'ODP data is not valid.'
}.freeze

DECISION_NOTIFICATION_TYPES = {
Expand Down Expand Up @@ -425,6 +426,16 @@ module Constants
REQUEST_TIMEOUT: 10
}.freeze

ODP_SEGMENTS_CACHE_CONFIG = {
DEFAULT_CAPACITY: 10_000,
DEFAULT_TIMEOUT_SECONDS: 600
}.freeze

ODP_MANAGER_CONFIG = {
KEY_FOR_USER_ID: 'fs_user_id',
EVENT_TYPE: 'fullstack'
}.freeze

ODP_CONFIG_STATE = {
UNDETERMINED: 'UNDETERMINED',
INTEGRATED: 'INTEGRATED',
Expand Down
29 changes: 19 additions & 10 deletions lib/optimizely/odp/odp_event_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@ class OdpEventManager
# the BlockingQueue and buffers them for either a configured batch size or for a
# maximum duration before the resulting LogEvent is sent to the NotificationCenter.

attr_reader :batch_size, :odp_config, :zaius_manager, :logger
attr_reader :batch_size, :zaius_manager, :logger
attr_accessor :odp_config

def initialize(
odp_config,
api_manager: nil,
logger: NoOpLogger.new,
proxy_config: nil
)
super()

@odp_config = odp_config
@api_host = odp_config.api_host
@api_key = odp_config.api_key
@odp_config = nil
@api_host = nil
@api_key = nil

@mutex = Mutex.new
@event_queue = SizedQueue.new(Optimizely::Helpers::Constants::ODP_EVENT_MANAGER[:DEFAULT_QUEUE_CAPACITY])
Expand All @@ -53,14 +53,20 @@ def initialize(
@retry_count = Helpers::Constants::ODP_EVENT_MANAGER[:DEFAULT_RETRY_COUNT]
# current_batch should only be accessed by processing thread
@current_batch = []
@thread = nil
@thread_exception = false
end

def start!
def start!(odp_config)
if running?
@logger.log(Logger::WARN, 'Service already started.')
return
end

@odp_config = odp_config
@api_host = odp_config.api_host
@api_key = odp_config.api_key

@thread = Thread.new { run }
@logger.log(Logger::INFO, 'Starting scheduler.')
end
Expand Down Expand Up @@ -117,7 +123,10 @@ def dispatch(event)
end

def send_event(type:, action:, identifiers:, data:)
case @odp_config.odp_state
case @odp_config&.odp_state
when nil
@logger.log(Logger::DEBUG, 'ODP event queue: cannot send before config has been set.')
return
when OdpConfig::ODP_CONFIG_STATE[:UNDETERMINED]
@logger.log(Logger::DEBUG, 'ODP event queue: cannot send before the datafile has loaded.')
return
Expand Down Expand Up @@ -154,7 +163,7 @@ def stop!
end

def running?
@thread && !!@thread.status && !@event_queue.closed?
!!@thread && !!@thread.status && !@event_queue.closed?
end

private
Expand Down Expand Up @@ -270,8 +279,8 @@ def process_config_update
# Updates the configuration used to send events.
flush_batch! unless @current_batch.empty?

@api_key = @odp_config.api_key
@api_host = @odp_config.api_host
@api_key = @odp_config&.api_key
@api_host = @odp_config&.api_host
end
end
end
144 changes: 144 additions & 0 deletions lib/optimizely/odp/odp_manager.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# frozen_string_literal: true

#
# Copyright 2022, Optimizely and contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'optimizely/logger'
require_relative '../helpers/constants'
require_relative '../helpers/validator'
require_relative '../exceptions'
require_relative 'odp_config'
require_relative 'lru_cache'
require_relative 'odp_segment_manager'
require_relative 'odp_event_manager'

module Optimizely
class OdpManager
ODP_LOGS = Helpers::Constants::ODP_LOGS
ODP_MANAGER_CONFIG = Helpers::Constants::ODP_MANAGER_CONFIG
ODP_CONFIG_STATE = Helpers::Constants::ODP_CONFIG_STATE

def initialize(disable:, segments_cache: nil, segment_manager: nil, event_manager: nil, logger: nil)
@enabled = !disable
@segment_manager = segment_manager
@event_manager = event_manager
@logger = logger || NoOpLogger.new
@odp_config = OdpConfig.new

unless @enabled
@logger.log(Logger::INFO, ODP_LOGS[:ODP_NOT_ENABLED])
return
end

unless @segment_manager
segments_cache ||= LRUCache.new(
Helpers::Constants::ODP_SEGMENTS_CACHE_CONFIG[:DEFAULT_CAPACITY],
Helpers::Constants::ODP_SEGMENTS_CACHE_CONFIG[:DEFAULT_TIMEOUT_SECONDS]
)
@segment_manager = Optimizely::OdpSegmentManager.new(segments_cache, nil, @logger)
end

@event_manager ||= Optimizely::OdpEventManager.new(logger: @logger)

@segment_manager.odp_config = @odp_config
@event_manager.start!(@odp_config)
end

def fetch_qualified_segments(user_id:, options:)
# Returns qualified segments for the user from the cache or the ODP server if not in the cache.
#
# @param user_id - The user id.
# @param options - An array of OptimizelySegmentOptions used to ignore and/or reset the cache.
#
# @return - Array of qualified segments or nil.
options ||= []
unless @enabled
@logger.log(Logger::ERROR, ODP_LOGS[:ODP_NOT_ENABLED])
return nil
end

if @odp_config.odp_state == ODP_CONFIG_STATE[:UNDETERMINED]
@logger.log(Logger::ERROR, 'Cannot fetch segments before the datafile has loaded.')
return nil
end

@segment_manager.fetch_qualified_segments(ODP_MANAGER_CONFIG[:KEY_FOR_USER_ID], user_id, options)
end

def identify_user(user_id:)
unless @enabled
@logger.log(Logger::DEBUG, 'ODP identify event is not dispatched (ODP disabled).')
return
end

case @odp_config.odp_state
when ODP_CONFIG_STATE[:UNDETERMINED]
@logger.log(Logger::DEBUG, 'ODP identify event is not dispatched (datafile not ready).')
return
when ODP_CONFIG_STATE[:NOT_INTEGRATED]
@logger.log(Logger::DEBUG, 'ODP identify event is not dispatched (ODP not integrated).')
return
end

@event_manager.send_event(
type: ODP_MANAGER_CONFIG[:EVENT_TYPE],
action: 'identified',
identifiers: {ODP_MANAGER_CONFIG[:KEY_FOR_USER_ID] => user_id},
data: {}
)
end

def send_event(type:, action:, identifiers:, data:)
# Send an event to the ODP server.
#
# @param type - the event type.
# @param action - the event action name.
# @param identifiers - a hash for identifiers.
# @param data - a hash for associated data. The default event data will be added to this data before sending to the ODP server.
unless @enabled
@logger.log(Logger::ERROR, ODP_LOGS[:ODP_NOT_ENABLED])
return
end

unless Helpers::Validator.odp_data_types_valid?(data)
@logger.log(Logger::ERROR, ODP_LOGS[:ODP_INVALID_DATA])
return
end

@event_manager.send_event(type: type, action: action, identifiers: identifiers, data: data)
end

def update_odp_config(api_key, api_host, segments_to_check)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! I am just curios who will call this function? should we add a notification listener here and make odpmanager self update itself? Or are we expecting to do something like that and trigger updates from the outside by calling this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea the idea is to register the listener at client initialization and have the callback be a client method, thus avoiding having to pass in the NotificationListener and the ProjectConfig. Seems to make sense because the callback needs access to both ProjectConfig and OdpConfig. Thoughts/opinions?

# Update the odp config, reset the cache and send signal to the event processor to update its config.
return unless @enabled

config_changed = @odp_config.update(api_key, api_host, segments_to_check)
unless config_changed
@logger.log(Logger::DEBUG, 'Odp config was not changed.')
return
end

@segment_manager.reset
@event_manager.update_config
end

def close!
return unless @enabled

@event_manager.stop!
end
end
end
11 changes: 6 additions & 5 deletions lib/optimizely/odp/odp_segment_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
module Optimizely
class OdpSegmentManager
# Schedules connections to ODP for audience segmentation and caches the results
attr_reader :odp_config, :segments_cache, :zaius_manager, :logger
attr_accessor :odp_config
attr_reader :segments_cache, :zaius_manager, :logger

def initialize(odp_config, segments_cache, api_manager = nil, logger = nil, proxy_config = nil)
@odp_config = odp_config
def initialize(segments_cache, api_manager = nil, logger = nil, proxy_config = nil)
@odp_config = nil
@logger = logger || NoOpLogger.new
@zaius_manager = api_manager || ZaiusGraphQLApiManager.new(logger: @logger, proxy_config: proxy_config)
@segments_cache = segments_cache
Expand All @@ -39,8 +40,8 @@ def initialize(odp_config, segments_cache, api_manager = nil, logger = nil, prox
#
# @return - Array of qualified segments.
def fetch_qualified_segments(user_key, user_value, options)
odp_api_key = @odp_config.api_key
odp_api_host = @odp_config.api_host
odp_api_key = @odp_config&.api_key
odp_api_host = @odp_config&.api_host
segments_to_check = @odp_config&.segments_to_check

if odp_api_key.nil? || odp_api_host.nil?
Expand Down
Loading