1+ import sys
2+ from configparser import SectionProxy
3+ from typing import TextIO
4+ import time
5+
6+ from defs .common import strtobool
7+
8+ from ..protocol_settings import Registry_Type , WriteMode , registry_map_entry
9+ from .transport_base import transport_base
10+
11+
12+ class influxdb_out (transport_base ):
13+ ''' InfluxDB v1 output transport that writes data to an InfluxDB server '''
14+ host : str = "localhost"
15+ port : int = 8086
16+ database : str = "solar"
17+ username : str = ""
18+ password : str = ""
19+ measurement : str = "device_data"
20+ include_timestamp : bool = True
21+ include_device_info : bool = True
22+ batch_size : int = 100
23+ batch_timeout : float = 10.0
24+
25+ client = None
26+ batch_points = []
27+ last_batch_time = 0
28+
29+ def __init__ (self , settings : SectionProxy ):
30+ self .host = settings .get ("host" , fallback = self .host )
31+ self .port = settings .getint ("port" , fallback = self .port )
32+ self .database = settings .get ("database" , fallback = self .database )
33+ self .username = settings .get ("username" , fallback = self .username )
34+ self .password = settings .get ("password" , fallback = self .password )
35+ self .measurement = settings .get ("measurement" , fallback = self .measurement )
36+ self .include_timestamp = strtobool (settings .get ("include_timestamp" , fallback = self .include_timestamp ))
37+ self .include_device_info = strtobool (settings .get ("include_device_info" , fallback = self .include_device_info ))
38+ self .batch_size = settings .getint ("batch_size" , fallback = self .batch_size )
39+ self .batch_timeout = settings .getfloat ("batch_timeout" , fallback = self .batch_timeout )
40+
41+ self .write_enabled = True # InfluxDB output is always write-enabled
42+ super ().__init__ (settings )
43+
44+ def connect (self ):
45+ """Initialize the InfluxDB client connection"""
46+ self ._log .info ("influxdb_out connect" )
47+
48+ try :
49+ from influxdb import InfluxDBClient
50+
51+ # Create InfluxDB client
52+ self .client = InfluxDBClient (
53+ host = self .host ,
54+ port = self .port ,
55+ username = self .username if self .username else None ,
56+ password = self .password if self .password else None ,
57+ database = self .database
58+ )
59+
60+ # Test connection
61+ self .client .ping ()
62+
63+ # Create database if it doesn't exist
64+ databases = self .client .get_list_database ()
65+ if not any (db ['name' ] == self .database for db in databases ):
66+ self ._log .info (f"Creating database: { self .database } " )
67+ self .client .create_database (self .database )
68+
69+ self .connected = True
70+ self ._log .info (f"Connected to InfluxDB at { self .host } :{ self .port } " )
71+
72+ except ImportError :
73+ self ._log .error ("InfluxDB client not installed. Please install with: pip install influxdb" )
74+ self .connected = False
75+ except Exception as e :
76+ self ._log .error (f"Failed to connect to InfluxDB: { e } " )
77+ self .connected = False
78+
79+ def write_data (self , data : dict [str , str ], from_transport : transport_base ):
80+ """Write data to InfluxDB"""
81+ if not self .write_enabled or not self .connected :
82+ return
83+
84+ self ._log .info (f"write data from [{ from_transport .transport_name } ] to influxdb_out transport" )
85+ self ._log .info (data )
86+
87+ # Prepare tags for InfluxDB
88+ tags = {}
89+
90+ # Add device information as tags if enabled
91+ if self .include_device_info :
92+ tags .update ({
93+ "device_identifier" : from_transport .device_identifier ,
94+ "device_name" : from_transport .device_name ,
95+ "device_manufacturer" : from_transport .device_manufacturer ,
96+ "device_model" : from_transport .device_model ,
97+ "device_serial_number" : from_transport .device_serial_number ,
98+ "transport" : from_transport .transport_name
99+ })
100+
101+ # Prepare fields (the actual data values)
102+ fields = {}
103+ for key , value in data .items ():
104+ # Check if we should force float formatting based on protocol settings
105+ should_force_float = False
106+
107+ # Try to get registry entry from protocol settings to check unit_mod
108+ if hasattr (from_transport , 'protocolSettings' ) and from_transport .protocolSettings :
109+ # Check both input and holding registries
110+ for registry_type in [Registry_Type .INPUT , Registry_Type .HOLDING ]:
111+ registry_map = from_transport .protocolSettings .get_registry_map (registry_type )
112+ for entry in registry_map :
113+ if entry .variable_name == key :
114+ # If unit_mod is not 1.0, this value should be treated as float
115+ if entry .unit_mod != 1.0 :
116+ should_force_float = True
117+ self ._log .debug (f"Variable { key } has unit_mod { entry .unit_mod } , forcing float format" )
118+ break
119+ if should_force_float :
120+ break
121+
122+ # Try to convert to numeric values for InfluxDB
123+ try :
124+ # Try to convert to float first
125+ float_val = float (value )
126+
127+ # If it's an integer but should be forced to float, or if it's already a float
128+ if should_force_float or not float_val .is_integer ():
129+ fields [key ] = float_val
130+ else :
131+ fields [key ] = int (float_val )
132+ except (ValueError , TypeError ):
133+ # If conversion fails, store as string
134+ fields [key ] = str (value )
135+
136+ # Create InfluxDB point
137+ point = {
138+ "measurement" : self .measurement ,
139+ "tags" : tags ,
140+ "fields" : fields
141+ }
142+
143+ # Add timestamp if enabled
144+ if self .include_timestamp :
145+ point ["time" ] = int (time .time () * 1e9 ) # Convert to nanoseconds
146+
147+ # Add to batch
148+ self .batch_points .append (point )
149+
150+ # Check if we should flush the batch
151+ current_time = time .time ()
152+ if (len (self .batch_points ) >= self .batch_size or
153+ (current_time - self .last_batch_time ) >= self .batch_timeout ):
154+ self ._flush_batch ()
155+
156+ def _flush_batch (self ):
157+ """Flush the batch of points to InfluxDB"""
158+ if not self .batch_points :
159+ return
160+
161+ try :
162+ self .client .write_points (self .batch_points )
163+ self ._log .info (f"Wrote { len (self .batch_points )} points to InfluxDB" )
164+ self .batch_points = []
165+ self .last_batch_time = time .time ()
166+ except Exception as e :
167+ self ._log .error (f"Failed to write batch to InfluxDB: { e } " )
168+ self .connected = False
169+
170+ def init_bridge (self , from_transport : transport_base ):
171+ """Initialize bridge - not needed for InfluxDB output"""
172+ pass
173+
174+ def __del__ (self ):
175+ """Cleanup on destruction - flush any remaining points"""
176+ if self .batch_points :
177+ self ._flush_batch ()
178+ if self .client :
179+ try :
180+ self .client .close ()
181+ except Exception :
182+ pass
0 commit comments