Skip to content

Commit

Permalink
Wire up Core Agent
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Schneider committed Jan 17, 2018
1 parent 4de21f8 commit 020f118
Showing 1 changed file with 86 additions and 15 deletions.
101 changes: 86 additions & 15 deletions apm/tracked_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from uuid import uuid4
from datetime import datetime
import threading
import pdb
import json

import socket


class ThreadLocalSingleton(object):
Expand All @@ -23,70 +25,120 @@ def release(self):

class TrackedRequest(ThreadLocalSingleton):
"""
This is a container which keeps track of all module instances for a single request. For convenience they are made
available as attributes based on their keyname
This is a container which keeps track of all module instances for a single
request. For convenience they are made available as attributes based on
their keyname
"""
def __init__(self):
super(TrackedRequest, self).__init__()
self.id = "req-" + str(uuid4())
self.req_id = 'req-' + str(uuid4())
self.notes = dict()
self.spans = []
self.socket = CoreAgentSocket()
self.send_start_request()

def send_start_request(self):
self.socket.send(json.dumps({
'StartRequest': {
'request_id': self.req_id,
}
}))

def send_finish_request(self):
self.socket.send(json.dumps({
'FinishRequest': {
'request_id': self.req_id,
}
}))


def note(self, key, value):
self.notes[key] = value

def start_span(self, operation=None):
maybe_parent = self.current_span()
if maybe_parent is not None:
parent_id = maybe_parent.id
parent_id = maybe_parent.span_id
else:
parent_id = None

new_span = Span(request=self.id, operation=operation, parent=parent_id)
new_span = Span(
self.socket,
request_id=self.req_id,
operation=operation,
parent=parent_id)
self.spans.append(new_span)
return new_span

def stop_span(self):
stopping_span = self.spans.pop()
stopping_span.stop()
if len(self.spans) == 0:
self.release()
self.finish()

def current_span(self):
if len(self.spans) > 0:
return self.spans[-1]
else:
return None

### Request is done, release any info we have about it.
def finish(self):
self.send_finish_request()
self.socket.close()
self.release()

# XXX: TrackedRequest knows too much about threads & making itself
# Move this whole method somewhere else ( a RequestManager obj? )
@classmethod
def instance(cls):
if hasattr(cls, '_thread_lookup'):
if getattr(cls._thread_lookup, 'instance', None) is not None:
return getattr(cls._thread_lookup, 'instance', None)
else:
return TrackedRequest()
else:
return TrackedRequest()
return TrackedRequest()

class Span:
def __init__(self, request=None, operation=None, parent=None):
self.id = "span-" + str(uuid4())
self.request = request
def __init__(self, socket, request_id=None, operation=None, parent=None):
self.span_id = "span-" + str(uuid4())
self.request_id = request_id
self.operation = operation
self.parent = parent
self.notes = dict()
self.start_time = datetime.now()
self.end_time = None
self.socket = socket

self.send_start_span()

def send_start_span(self):
if self.request_id is None:
return

self.socket.send(json.dumps({
'StartSpan': {
'request_id': self.request_id,
'span_id': self.span_id,
'parent_id': self.parent,
'operation': self.operation,
}
}))

def send_stop_span(self):
self.socket.send(json.dumps({
'StopSpan': {
'request_id': self.request_id,
'span_id': self.span_id,
}
}))

def dump(self):
if self.end_time is None:
print(self.operation)
return "request=%s operation=%s id=%s parent=%s notes=%s start_time=%s end_time=%s" % (
self.request,
self.request_id,
self.operation,
self.id,
self.span_id,
self.parent,
self.notes,
self.start_time.isoformat(),
Expand All @@ -95,6 +147,7 @@ def dump(self):

def stop(self):
self.end_time = datetime.now()
self.send_stop_span()

# In seconds
def duration(self):
Expand All @@ -107,3 +160,21 @@ def duration(self):
def note(self, key, value):
self.notes[key] = value

class CoreAgentSocket:
def __init__(self):
server_address = '/tmp/core_agent_socket'
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.connect(server_address)

def send(self, body):
print(threading.current_thread(), "Socket send:", body)

self.socket.sendall(self.message_length(body))
self.socket.sendall(body.encode())

def message_length(self, body):
length = len(body)
return length.to_bytes(4, 'big')

def close(self):
self.socket.close()

0 comments on commit 020f118

Please sign in to comment.