Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ook: #2 Creación del módulo OOK con sus dispositivos y funcionalidades especiales. #2

Merged
merged 3 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
opticomlib/__pycache__/*
test.py
test.py
build/*
dist/*
opticomlib.egg-info/*
71 changes: 45 additions & 26 deletions opticomlib/_types_.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,12 @@ def plot(self, fmt=None, n=None, xlabel=None, ylabel=None, **kargs):
plt.plot(self.t()[:n], np.abs(self[:n].signal+self[:n].noise), fmt, **kargs)
plt.xlabel(xlabel if xlabel else 'Tiempo [s]')
plt.ylabel(ylabel if ylabel else 'Amplitud [u.a.]')

if 'label' in kargs.keys():
plt.legend()
return self

def psd(self, fmt=None, n=None, **kargs):
def psd(self, fmt=None, kind: Literal['linear','log']='log', n=None, **kargs):
if fmt is None:
fmt = '-'
if n is None:
Expand All @@ -249,16 +252,22 @@ def psd(self, fmt=None, n=None, **kargs):
f = fftshift( fftfreq(n, d=self.dt())*1e-9) # GHz
psd = fftshift(self[:n]('w').abs('signal')**2/n**2)

plt.plot(f, psd*1e3, fmt, **kargs)
if kind == 'linear':
plt.plot(f, psd*1e3, fmt, **kargs)
elif kind == 'log':
plt.semilogy(f, psd*1e3, fmt, **kargs)
else:
raise TypeError('El argumento `kind` debe ser uno de los siguientes valores ("linear", "log")')
plt.xlabel('Frecuencia [GHz]')
plt.ylabel('Potencia [mW]')
if 'label' in kargs.keys():
plt.legend()
return self

def grid(self, n=None):
sps,t = global_vars.sps, self.t()
if n is None:
n = self.len()
plt.axvline(t[:n*sps][-1]+self.dt(), color='k', ls='--')
for i in t[:n*sps][::sps]:
plt.axvline(i, color='k', ls='--', alpha=0.3,lw=1)
return self
Expand Down Expand Up @@ -299,39 +308,43 @@ def __call__(self, dominio: Literal['t','w']):
else:
raise TypeError("solo se aceptan los argumentos 'w' o 't'")

def plot(self, fmt=None, mode: Literal['x','y','xy','abs']='abs', n=None, label=None, **kargs):
t = self.t()
def plot(self, fmt=None, mode: Literal['x','y','xy','abs']='abs', n=None, **kargs):
t = self.t()[:n]
if fmt is None:
fmt = ['-', '-']
if isinstance(fmt, str):
fmt = [fmt, fmt]
if n is None:
n = self.len()

if mode =='x':
label = label if label else 'Polarización X'
line = plt.plot(t[:n], np.abs(self.signal[0,:n] + self.noise[0,:n])**2, fmt[0], label=label, **kargs)
plt.legend()
if 'label' in kargs.keys():
label = kargs['label']
label_flag = True
kargs.pop('label')
else:
label_flag = False

if mode == 'x':
label = label if label_flag else 'Polarización X'
plt.plot(t, np.abs(self.signal[0,:n] + self.noise[0,:n])**2, fmt[0], label=label, **kargs)
elif mode == 'y':
label = label if label else 'Polarización Y'
line = plt.plot(t[:n], np.abs(self.signal[1,:n] + self.noise[1,:n])**2, fmt[0], label=label, **kargs)
plt.legend()
label = label if label_flag else 'Polarización Y'
plt.plot(t, np.abs(self.signal[1,:n] + self.noise[1,:n])**2, fmt[0], label=label, **kargs)
elif mode == 'xy':
line = plt.plot(t[:n], np.abs(self.signal[0,:n]+self.noise[0,:n])**2, fmt[0], t[:n], np.abs(self.signal[1,:n]+self.noise[1,:n])**2, fmt[1], label=['Polarización X', 'Polarización Y'], **kargs)
plt.legend()
plt.plot(t, np.abs(self.signal[0,:n]+self.noise[0,:n])**2, fmt[0], t, np.abs(self.signal[1,:n]+self.noise[1,:n])**2, fmt[1], label=['Polarización X', 'Polarización Y'], **kargs)
elif mode == 'abs':
label = label if label else 'Abs'
s = self.abs()[:n]
line = plt.plot(t[:n], (s[0]**2 + s[1]**2), fmt[0], label=label, **kargs)
plt.legend()
label = label if label_flag else 'Abs'
s = self[:n].abs()
plt.plot(t, (s[0]**2 + s[1]**2), fmt[0], label=label, **kargs)
else:
raise TypeError('El argumento `mode` debe se uno de los siguientes valores ("x","y","xy","abs").')

plt.legend()
plt.xlabel('Tiempo [s]')
plt.ylabel('Potencia [W]')
return self

def psd(self, fmt=None, mode: Literal['x','y']=None, n=None, **kargs):
def psd(self, fmt=None, kind: Literal['linear', 'log']='log', mode: Literal['x','y']=None, n=None, **kargs):
if fmt is None:
fmt = '-'
if mode is None:
Expand All @@ -348,16 +361,22 @@ def psd(self, fmt=None, mode: Literal['x','y']=None, n=None, **kargs):
else:
raise TypeError('El argumento `mode` debe ser uno de los siguientes valores ("x" o "y")')

plt.plot(f, psd*1e3, fmt, **kargs)
if kind == 'linear':
plt.plot(f, psd*1e3, fmt, **kargs)
elif kind == 'log':
plt.semilogy(f, psd*1e3, fmt, **kargs)
else:
raise TypeError('El argumento `kind` debe ser uno de los siguientes valores ("linear", "log")')
plt.xlabel('Frecuencia [GHz]')
plt.ylabel('Potencia [mW]')
if 'label' in kargs.keys():
plt.legend()
return self

def grid(self, n=None):
sps,t = global_vars.sps, self.t()
if n is None:
n = self.len()
plt.axvline(t[:n*sps][-1]+self.dt(),color='k', ls='--')
for i in t[:n*sps][::sps]:
plt.axvline(i,color='k', ls='--', alpha=0.3, lw=1)
return self
Expand All @@ -382,7 +401,7 @@ def print_(self, msg: str=None):
print(f'{key} : {value}')
return self

def plot(self, nbits=1000, medias_=True, legend_=True, show_=True, save_=False, filename=None, style: Literal['dark', 'light']='dark', cmap:Literal['viridis', 'plasma', 'inferno', 'cividis', 'magma', 'winter']='winter'):
def plot(self, umbral, nbits=1000, medias_=True, legend_=True, show_=True, save_=False, filename=None, style: Literal['dark', 'light']='dark', cmap:Literal['viridis', 'plasma', 'inferno', 'cividis', 'magma', 'winter']='winter'):
if not show_:
return self

Expand Down Expand Up @@ -418,7 +437,7 @@ def plot(self, nbits=1000, medias_=True, legend_=True, show_=True, save_=False,
ax[1,0].set_xticks([-1,-0.5,0,0.5,1])
ax[1,0].set_xlabel(r'Time [$t/T_{slot}$]', fontsize=12)
t_line1 = ax[1,0].axvline(self.t_opt, color = t_opt_color, ls = '--', alpha = 0.7)
y_line1 = ax[1,0].axhline(self.umbral, color = r_th_color, ls = '--', alpha = 0.7)
y_line1 = ax[1,0].axhline(umbral, color = r_th_color, ls = '--', alpha = 0.7)

t_line_span0 = ax[1,0].axvline(self.t_span0, color = t_opt_color, ls = '-', alpha = 0.4)
t_line_span1 = ax[1,0].axvline(self.t_span1, color = t_opt_color, ls = '-', alpha = 0.4)
Expand All @@ -439,7 +458,7 @@ def plot(self, nbits=1000, medias_=True, legend_=True, show_=True, save_=False,
ax[1,1].tick_params(axis='x', which='both', length=0, labelbottom=False)
ax[1,1].tick_params(axis='y', which='both', length=0, labelleft=False)
ax[1,1].grid(color='grey', ls='--', lw=0.5, alpha=0.5)
y_line2 = ax[1,1].axhline(self.umbral, color = r_th_color, ls = '--', alpha = 0.7)
y_line2 = ax[1,1].axhline(umbral, color = r_th_color, ls = '--', alpha = 0.7)

ax[0,0].sharex(ax[1,0])
ax[0,0].tick_params(axis='y', which='both', length=0, labelleft=False)
Expand Down Expand Up @@ -472,7 +491,7 @@ def plot(self, nbits=1000, medias_=True, legend_=True, show_=True, save_=False,
)

ax[0,0].hist(
t_[(y_>self.umbral*0.95) & (y_<self.umbral*1.05)],
t_[(y_>umbral*0.95) & (y_<umbral*1.05)],
bins=200,
density=True,
orientation = 'vertical',
Expand All @@ -494,7 +513,7 @@ def plot(self, nbits=1000, medias_=True, legend_=True, show_=True, save_=False,
valmin=y_min,
valmax=y_max,
valstep=(y_max-y_min)/20,
valinit=self.umbral,
valinit=umbral,
initcolor=r_th_color,
orientation='vertical',
valfmt='',
Expand Down
54 changes: 28 additions & 26 deletions opticomlib/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@



def PRBS(n: int=2**8, user: list=[], defined: int=None) -> binary_sequence:
def PRBS(n: int=2**8, user: list=[], order: int=None) -> binary_sequence:
"""
### Descripción:
Generador Pseudoaleatorio de secuencias binarias.
Expand All @@ -70,8 +70,8 @@ def PRBS(n: int=2**8, user: list=[], defined: int=None) -> binary_sequence:

if user:
output = binary_sequence( user )
elif defined:
output = binary_sequence( generate_prbs(defined) )
elif order:
output = binary_sequence( generate_prbs(order) )
else:
output = binary_sequence( np.random.randint(0, 2, n) )
output.ejecution_time = toc()
Expand Down Expand Up @@ -142,7 +142,7 @@ def DAC(input: Union[str, list, tuple, ndarray, binary_sequence],
return output


def MODULATOR(input: electrical_signal, p_laser: float=None, pol: str='x') -> optical_signal:
def MODULATOR(input: electrical_signal, p_laser: float=0, pol: str='x') -> optical_signal:
"""
### Descripción:
Modula la señal óptica de un láser de potencia dada,79 a partir de una señal eléctrica de entrada.
Expand All @@ -165,10 +165,10 @@ def MODULATOR(input: electrical_signal, p_laser: float=None, pol: str='x') -> op

if not isinstance(input, electrical_signal):
raise TypeError("`input` debe ser del tipo (electrical_signal).")
if not p_laser:
p_laser = idbm(global_vars.p_laser)
if pol not in ['x','y']:
raise TypeError("`pol` debe ser ('x' o 'y').")

p_laser = idbm(p_laser)

output = optical_signal( np.zeros((2,input.len())) )

Expand Down Expand Up @@ -221,7 +221,7 @@ def BPF(input: optical_signal, BW: float, n: int=4, fs: float=None) -> optical_s
return output


def EDFA(input: optical_signal, G: float, NF: float, BW: float=None) -> tuple: # modelo simplificado del EDFA (no satura)
def EDFA(input: optical_signal, G: float, NF: float, BW: float) -> tuple: # modelo simplificado del EDFA (no satura)
"""
### Descripción:
Amplificador de fibra dopada con Erbium. Amplifica la señal óptica a la entrada, agregando ruido de emisión espontánea amplificada (ASE).
Expand All @@ -232,7 +232,7 @@ def EDFA(input: optical_signal, G: float, NF: float, BW: float=None) -> tuple: #
- `input` - señal óptica a amplificar
- `G` - ganancia del amplificador en [dB]
- `NF` - figura de ruido del amplificador en [dB]
- `BW` [Opcional] - ancho de banda del amplificador en [Hz] (default: `BW=global_vars.BW_opt`)
- `BW` - ancho de banda del amplificador en [Hz]

---

Expand All @@ -243,8 +243,6 @@ def EDFA(input: optical_signal, G: float, NF: float, BW: float=None) -> tuple: #

if not isinstance(input, optical_signal):
raise TypeError("`input` debe ser del tipo (optical_signal).")
if BW is None:
BW = global_vars.BW_opt

output = BPF( input * idb(G)**0.5, BW )
ase = BPF( optical_signal( np.zeros_like(input.signal), np.exp(-1j*np.random.uniform(0, 2*pi, input.noise.shape)) ), BW )
Expand Down Expand Up @@ -407,7 +405,7 @@ def LPF(input: Union[ndarray, electrical_signal], BW: float, n: int=4, fs: float



def PD(input: optical_signal, BW: float=None, R: float=1.0, T: float=300.0, R_load: float=50.0, noise: Literal['ase-only','thermal-only','shot-only','ase-thermal','ase-shot','thermal-shot','all']='all') -> electrical_signal:
def PD(input: optical_signal, BW: float, responsivity: float=1.0, T: float=300.0, R_load: float=50.0, noise: Literal['ase-only','thermal-only','shot-only','ase-thermal','ase-shot','thermal-shot','all']='all') -> electrical_signal:
"""
### Descripción:
Photodetector. Simula la detección de una señal óptica por un fotodetector.
Expand All @@ -417,20 +415,18 @@ def PD(input: optical_signal, BW: float=None, R: float=1.0, T: float=300.0, R_lo
### Args:
- `input` - señal óptica a detectar
- `BW` - ancho de banda del detector en [Hz]
- `R` - Responsividad del detector en [A/W] (default: `R=1.0`)
- `T` - Temperatura del detector en [K] (default: `T=300.0`)
- `R_load` - Resistencia de carga del detector en [Ohm] (default: `R_load=50.0`)
- `responsivity` [Opcional] - Responsividad del detector en [A/W] (default: `R=1.0`)
- `T` [Opcional] - Temperatura del detector en [K] (default: `T=300.0`)
- `R_load` [Opcional] - Resistencia de carga del detector en [Ohm] (default: `R_load=50.0`)

---

### Returns:
- `electrical_signal`
"""
tic()
if BW is None:
BW = global_vars.BW_elec

i_sig = R * np.sum(input.abs('signal')**2, axis=0) # se suman las dos polarizaciones
i_sig = responsivity * np.sum(input.abs('signal')**2, axis=0) # se suman las dos polarizaciones

if 'thermal' in noise or 'all' in noise:
S_T = 4 * kB * T * BW / R_load # Density of thermal noise in [A^2]
Expand All @@ -441,11 +437,11 @@ def PD(input: optical_signal, BW: float=None, R: float=1.0, T: float=300.0, R_lo
i_N = np.vectorize(lambda s: np.random.normal(0,s))(S_N**0.5)

if 'ase' in noise or 'all' in noise:
i_sig_sp = R * np.abs(input.signal[0]*input.noise[0].conjugate() + \
i_sig_sp = responsivity * np.abs(input.signal[0]*input.noise[0].conjugate() + \
input.signal[0].conjugate()*input.noise[0] + \
input.signal[1]*input.noise[1].conjugate() + \
input.signal[1].conjugate()*input.noise[1])
i_sp_sp = R * np.sum(input.abs('noise')**2, axis=0) # se suman las dos polarizaciones
i_sp_sp = responsivity * np.sum(input.abs('noise')**2, axis=0) # se suman las dos polarizaciones

if noise == 'ase-only':
noise = i_sig_sp + i_sp_sp
Expand Down Expand Up @@ -529,7 +525,7 @@ def ADC(input: electrical_signal, fs: float=None, BW: float=None, nbits: int=8)
return output


def GET_EYE(input: Union[electrical_signal, optical_signal], nslots: int=4096, sps_resamplig: int = None):
def GET_EYE(input: Union[electrical_signal, optical_signal], nslots: int=4096, sps_resamplig: int=None):
"""
### Descripción:
Estima todos los parámetros fundamentales y métricas del diagrama de ojo de la señal eléctrica de entrada.
Expand All @@ -538,8 +534,8 @@ def GET_EYE(input: Union[electrical_signal, optical_signal], nslots: int=4096, s

### Args:
- `input` - señal eléctrica a partir de la cual se estimará el diagrama de ojos
- 'nslots' [Opcional] - cantidad de slots a considerar para la estimación de los parámetros (default: `nslots=4096`)
- 'sps_resamplig' [Opcional] - cantidad de muestras por slot a las que se desea resamplear la señal a analizar (default: `sps_resamplig=256`)
- `nslots` [Opcional] - cantidad de slots a considerar para la estimación de los parámetros (default: `nslots=4096`)
- `sps_resamplig` [Opcional] - cantidad de muestras por slot a las que se desea resamplear la señal a analizar (default: `global_vars.sps`)

---

Expand Down Expand Up @@ -602,7 +598,8 @@ def find_nearest(levels: np.ndarray, data: Union[np.ndarray, float]) -> Union[np

n = input[sps:].len()%(2*input.sps())
if n: input = input[sps:-n]
nslots = min(input.len()//sps, nslots)

nslots = min(input.len()//sps//2*2, nslots)
input = input[:nslots*sps]

if isinstance(input, optical_signal):
Expand All @@ -617,12 +614,12 @@ def find_nearest(levels: np.ndarray, data: Union[np.ndarray, float]) -> Union[np
y_set = np.unique(input)

# realizamos un resampling de la señal para obtener una mayor resolución en ambos ejes
if sps_resamplig is not None:
if sps_resamplig:
input = sg.resample(input, nslots*sps_resamplig); eye_dict['y'] = input
t = np.kron(np.ones(nslots//2), np.linspace(-1, 1-dt, 2*sps_resamplig)); eye_dict['t'] = t
else:
eye_dict['y'] = input
t = np.kron(np.ones((len(input)//sps)//2), np.linspace(-1,1,2*sps, endpoint=False)); eye_dict['t'] = t
t = np.kron(np.ones((len(input)//sps)//2), np.linspace(-1, 1-dt, 2*sps)); eye_dict['t'] = t

# Obtenemos el centroide de las muestras en el eje Y
vm = np.mean(sk.KMeans(n_clusters=2, n_init=10).fit(input.reshape(-1,1)).cluster_centers_)
Expand Down Expand Up @@ -671,7 +668,12 @@ def find_nearest(levels: np.ndarray, data: Union[np.ndarray, float]) -> Union[np
y_center = find_nearest(y_set, (state_0 + state_1)/2)

# Obtenemos el instante óptimo para realizar el down sampling
instant = np.abs(t-t_center).argmin() - sps//2 + 1; eye_dict['i'] = instant
if sps_resamplig:
instant = np.abs(t-t_center).argmin() - sps_resamplig//2 + 1
instant = int(instant/sps_resamplig*sps)
else:
instant = np.abs(t-t_center).argmin() - sps//2 + 1
eye_dict['i'] = instant

# Obtenemos el cluster superior
y_top = input[(input > y_center) & ((t_span0 < t) & (t < t_span1))]; eye_dict['y_top'] = y_top
Expand Down
Loading