Skip to content

Commit a56ca01

Browse files
committed
netcdf parsing
1 parent 8489fa6 commit a56ca01

File tree

1 file changed

+110
-41
lines changed

1 file changed

+110
-41
lines changed

src/main/scala/qnt/data.scala

Lines changed: 110 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,47 +5,61 @@ import java.time.LocalDate
55

66
import com.fasterxml.jackson.databind.ObjectMapper
77
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
8-
import org.saddle.Frame
8+
import org.saddle.{Frame, Index, Mat}
99
import org.slf4j.LoggerFactory
10-
import ucar.ma2.ArrayDouble
1110
import ucar.nc2.NetcdfFile
1211

12+
import scala.collection.JavaConversions._
1313
import scala.collection.immutable.{Map => ImmutableMap}
1414
import scala.util.control.NonFatal
1515

16-
case class StockInfo (
17-
id: String,
18-
cik: Option[String],
19-
exchange: String,
20-
symbol: String,
21-
name: String,
22-
sector: Option[String],
23-
props: ImmutableMap[String, Any]
24-
) {
25-
26-
def this(props: Map[String, Any]) = this(
27-
props("id").asInstanceOf[String],
28-
//props("FIGI").asInstanceOf[String],
29-
props.get("cik") match {
30-
case Some(s) => {
31-
val str = s.asInstanceOf[String]
32-
if (str != null && str.length > 0) Some(str) else None
33-
}
34-
case _ => None
35-
},
36-
props("exchange").asInstanceOf[String],
37-
props("symbol").asInstanceOf[String],
38-
props("name").asInstanceOf[String],
39-
{
40-
val s = props("sector").asInstanceOf[String]
41-
if (s == null || s.length < 1) None else Some(s)
42-
},
43-
props - "id" - "cik" - "exchange" - "symbol" - "name" - "sector"
44-
)
4516

46-
}
4717

4818
object data {
19+
case class StockInfo (
20+
id: String,
21+
cik: Option[String],
22+
exchange: String,
23+
symbol: String,
24+
name: String,
25+
sector: Option[String],
26+
props: ImmutableMap[String, Any]
27+
) {
28+
29+
def this(props: Map[String, Any]) = this(
30+
props("id").asInstanceOf[String],
31+
//props("FIGI").asInstanceOf[String],
32+
props.get("cik") match {
33+
case Some(s) => {
34+
val str = s.asInstanceOf[String]
35+
if (str != null && str.length > 0) Some(str) else None
36+
}
37+
case _ => None
38+
},
39+
props("exchange").asInstanceOf[String],
40+
props("symbol").asInstanceOf[String],
41+
props("name").asInstanceOf[String],
42+
{
43+
val s = props("sector").asInstanceOf[String]
44+
if (s == null || s.length < 1) None else Some(s)
45+
},
46+
props - "id" - "cik" - "exchange" - "symbol" - "name" - "sector"
47+
)
48+
}
49+
50+
object Fields {
51+
val open = "open"
52+
val low = "low"
53+
val high = "high"
54+
val close = "close"
55+
val vol = "vol"
56+
val divs = "divs"
57+
val split = "split"
58+
val split_cumprod = "split_cumprod"
59+
val is_liquid = "is_liquid"
60+
61+
val values = List(open, low, high, close, vol, divs, split, split_cumprod, is_liquid )
62+
}
4963

5064
def loadStockList(
5165
minDate: LocalDate = LocalDate.of(2007, 1, 1),
@@ -65,30 +79,85 @@ object data {
6579
ids: List[String],
6680
minDate: LocalDate = LocalDate.of(2007, 1, 1),
6781
maxDate: LocalDate = LocalDate.now()
68-
) : Frame[LocalDate, String, Float] = {
82+
) : Map[String, Frame[LocalDate, String, Double]] = {
6983
var uri = "data"
7084
var params = Map(
7185
"assets" -> ids,
7286
"min_date" -> minDate.toString,
7387
"max_date" -> maxDate.toString
7488
)
75-
val ad: ArrayDouble = null;
76-
77-
var dataBytes = loadWithRetry(uri, params)
78-
val dataNetcdf = NetcdfFile.openInMemory("data", dataBytes)
79-
null
89+
val dataBytes = loadWithRetry(uri, params)
90+
netcdfBytesToFrame(dataBytes)
8091
}
8192

93+
def loadIndexList() = ???
94+
95+
def loadIndexSeries() = ???
96+
97+
8298
private val LOG = LoggerFactory.getLogger(getClass)
8399
private val RETRIES = 5
84100
private val TIMEOUT = 60*1000
85101
private val OBJECT_MAPPER = new ObjectMapper() with ScalaObjectMapper
86102

87103
OBJECT_MAPPER.registerModule(DefaultScalaModule)
88104

89-
def loadIndexList() = ???
105+
private def loadStockDailySeriesOriginChunk(
106+
ids: List[String],
107+
minDate: LocalDate = LocalDate.of(2007, 1, 1),
108+
maxDate: LocalDate = LocalDate.now()
109+
) : Map[String, Frame[LocalDate, String, Double]] = {
110+
var uri = "data"
111+
var params = Map(
112+
"assets" -> ids,
113+
"min_date" -> minDate.toString,
114+
"max_date" -> maxDate.toString
115+
)
116+
val dataBytes = loadWithRetry(uri, params)
117+
netcdfBytesToFrame(dataBytes)
118+
}
90119

91-
def loadIndexSeries() = ???
120+
private def netcdfBytesToFrame(bytes: Array[Byte]): Map[String, Frame[LocalDate, String, Double]] = {
121+
val dataNetcdf = NetcdfFile.openInMemory("data", bytes)
122+
123+
val vars = dataNetcdf.getVariables.toList.map(v=>(v.getShortName, v)).toMap
124+
125+
val timeVar = vars("time")
126+
val zeroDateStr = timeVar.getAttributes.toList.filter(i=>i.getShortName == "units").get(0).getStringValue
127+
val zeroDate = LocalDate.parse(zeroDateStr.split(" since ")(1))
128+
val timeRawArray = dataNetcdf.readSection("time").copyTo1DJavaArray().asInstanceOf[Array[Int]]
129+
val timeArray = timeRawArray.map(i => zeroDate.plusDays(i))
130+
131+
var fieldVar = vars("field")
132+
val fieldRawArray = dataNetcdf.readSection("field").copyToNDJavaArray().asInstanceOf[Array[Array[Char]]]
133+
val fieldArray = fieldRawArray.map(i => new String(i.filter(c => c != '\0')))
134+
135+
val assetVar = vars("asset")
136+
val assetRawArray = dataNetcdf.readSection("asset").copyToNDJavaArray().asInstanceOf[Array[Array[Char]]]
137+
val assetArray = assetRawArray.map(i => new String(i.filter(c => c != '\0')))
138+
139+
// C-order of dimensions: field,time,asset
140+
val values = dataNetcdf.readSection("__xarray_dataarray_variable__").copyTo1DJavaArray().asInstanceOf[Array[Double]]
141+
142+
implicit val localDateOrdering: Ordering[LocalDate] = Ordering.by(_.toEpochDay)
143+
val timeIdx = Index[LocalDate](timeArray)
144+
val assetIdx = Index[String](assetArray)
145+
val valueMatrices = values.grouped(timeArray.length*assetArray.length).map(g => Mat(timeArray.length, assetArray.length, g)).toArray
146+
147+
var result = Map[String, Frame[LocalDate, String, Double]]()
148+
149+
for(fi <- fieldArray.indices) {
150+
val field = fieldArray(fi)
151+
val mat = valueMatrices(fi)
152+
val frame = Frame[LocalDate, String, Double](mat, timeIdx, assetIdx)
153+
val frameForward = frame.row(timeIdx.reversed.toVec.contents.sorted)
154+
155+
result += (field -> frameForward)
156+
}
157+
158+
print(Fields.values)
159+
result
160+
}
92161

93162
private def loadWithRetry(uri: String, dataObj: Any = null): Array[Byte] = {
94163
val urlStr = baseUrl + uri
@@ -103,7 +172,7 @@ object data {
103172
conn.setDoOutput(dataObj != null)
104173
conn.setRequestMethod(if(dataObj == null) "GET" else "POST")
105174
conn.setUseCaches(false)
106-
conn.setDoInput(true);
175+
conn.setDoInput(true)
107176
if(dataObj != null) {
108177
val dataBytes = OBJECT_MAPPER.writeValueAsBytes(dataObj)
109178
// conn.setRequestProperty("Content-Type", "application/json; utf-8")

0 commit comments

Comments
 (0)