-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsession.rs
More file actions
157 lines (131 loc) · 4.15 KB
/
Copy pathsession.rs
File metadata and controls
157 lines (131 loc) · 4.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use std::ffi::c_char;
use std::sync::Arc;
use thiserror::Error;
use crate::ffi::{
cass_session_close,
cass_session_connect,
cass_session_connect_keyspace_n,
cass_session_free,
cass_session_new,
struct_CassSession_,
};
use crate::future::DriverFuture;
use crate::{
Cluster,
DriverError,
SessionConfig,
SessionConfigError,
};
/// An error that is raised when a session cannot be created.
#[derive(Debug, Error)]
pub enum SessionCreationError {
/// An error that is raised when a session configuration is invalid.
#[error(transparent)]
Config(#[from] SessionConfigError),
/// An error that is raised when a session cannot be established.
#[error(transparent)]
Connection(#[from] DriverError),
}
/// A session object is used to execute queries and maintains cluster state
/// through the control connection.
///
/// The control connection is used to auto-discover nodes and monitor cluster
/// changes (topology and schema). Each session also maintains multiple pools of
/// connections to cluster nodes which are used to query the cluster.
#[derive(Clone)]
pub struct Session {
inner: Arc<SessionWrapper>,
config: Arc<Config>,
}
impl Session {
/// Creates a new Cassandra session.
fn new(config: Config) -> Self {
let session = unsafe { cass_session_new() };
Self {
inner: Arc::new(SessionWrapper(session)),
config: Arc::new(config),
}
}
/// Returns the raw pointer to the session object.
pub(crate) fn inner(&self) -> *mut struct_CassSession_ {
self.inner.inner()
}
/// Closes the session and releases all resources.
pub async fn close(self) -> Result<(), DriverError> {
let future = unsafe { cass_session_close(self.inner()) };
DriverFuture::new(future, self).await
}
/// Connects to the cluster and returns a session.
pub(crate) async fn connect(
mut config: SessionConfig,
) -> Result<Session, SessionCreationError> {
let keyspace = std::mem::take(&mut config.keyspace);
let page_size = std::mem::take(&mut config.page_size);
let cluster: Cluster = config.try_into()?;
let config = Config {
page_size,
keyspace: keyspace.clone(),
};
let session = Self::new(config);
let session = match keyspace {
Some(keyspace) => {
Self::connect_keyspace(session, cluster, keyspace).await?
}
None => Self::connect_no_keyspace(session, cluster).await?,
};
Ok(session)
}
/// Connects to the cluster without specifying a keyspace.
fn connect_no_keyspace(
session: Self,
cluster: Cluster,
) -> DriverFuture<Self> {
let future =
unsafe { cass_session_connect(session.inner(), cluster.inner()) };
DriverFuture::new(future, session)
}
/// Connects to the cluster and sets the default keyspace.
fn connect_keyspace<T>(
session: Self,
cluster: Cluster,
keyspace: T,
) -> DriverFuture<Self>
where
T: AsRef<str>,
{
let keyspace = keyspace.as_ref();
let keyspace_len = keyspace.len();
let keyspace_ptr = keyspace.as_ptr() as *const c_char;
let future = unsafe {
cass_session_connect_keyspace_n(
session.inner(),
cluster.inner(),
keyspace_ptr,
keyspace_len,
)
};
DriverFuture::new(future, session)
}
}
/// A configuration used by the [`Session`] which is not part of the driver's
/// `CassCluster` and thus should be used separately.
#[derive(Debug)]
struct Config {
page_size: Option<usize>,
keyspace: Option<String>,
}
#[repr(transparent)]
struct SessionWrapper(*mut struct_CassSession_);
impl SessionWrapper {
/// Returns the raw pointer to the session object.
pub fn inner(&self) -> *mut struct_CassSession_ {
self.0
}
}
impl Drop for SessionWrapper {
fn drop(&mut self) {
unsafe { cass_session_free(self.inner()) }
}
}
unsafe impl Send for SessionWrapper {}
unsafe impl Sync for SessionWrapper {}