forked from opensemanticsearch/open-semantic-etl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathenhance_extract_text_tika_server.py
156 lines (107 loc) · 4.56 KB
/
enhance_extract_text_tika_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import io
import pycurl
import sys
import json
# Extract text from filename
class enhance_extract_text_tika_server(object):
# copy jsonresult to fieldname
def tikafield2datafield(self, tika_results, data, tika_fieldname, data_fieldname):
try:
# if field exist in jsonresults
for tika_result in tika_results:
if tika_fieldname in tika_result:
#copy or add data from Tika fieldname to data fieldname
if data_fieldname in data:
# do not add if yet there
if not tika_result[tika_fieldname] in data[data_fieldname]:
data[data_fieldname] += "\n\n" + tika_result[tika_fieldname]
else:
data[data_fieldname] = tika_result[tika_fieldname]
except:
sys.stderr.write( 'Error while loading Tika field {} to field {}'.format(tika_fieldname, data_fieldname) )
def process (self, parameters={}, data={} ):
verbose = False
if 'verbose' in parameters:
if parameters['verbose']:
verbose = True
debug = False
if 'debug' in parameters:
if parameters['debug']:
debug = True
if 'tika_server' in parameters:
tika_server = parameters['tika_server']
else:
tika_server = 'http://localhost:9998'
uri = tika_server + '/rmeta/form/text'
httpheader = None
if 'ocr_lang' in parameters:
httpheader = [ 'X-Tika-OCRLanguage: ' + parameters['ocr_lang'] ]
filename = parameters['filename']
if verbose:
print ("Calling Tika from {}".format(uri) )
#
# Upload file to Apache Tika uri with Curl
#
tika_result_IO = io.BytesIO()
curl = pycurl.Curl()
curl.setopt(curl.POST, 1)
curl.setopt(curl.URL, uri)
if httpheader:
curl.setopt(pycurl.HTTPHEADER, httpheader)
curl.setopt(curl.HTTPPOST, [('fileupload', (curl.FORM_FILE, filename.encode('utf-8'), curl.FORM_FILENAME, filename.encode('utf-8') ))])
curl.setopt(curl.WRITEFUNCTION, tika_result_IO.write)
curl.perform()
http_status = curl.getinfo(pycurl.HTTP_CODE)
tika_results = json.loads( tika_result_IO.getvalue().decode('utf-8') )
if verbose:
print ("CURL Status code: {}".format(http_status))
if debug:
print ( tika_results )
curl.close()
tika_result_IO.close()
#
# check returned status
#
# Tika returned no error
if http_status == 200 or http_status == 204:
if verbose:
print ('Tika extracted file (HHTP-Status-Code: {}): {}'.format(http_status, filename) )
print ('Extracted text and metadata: {}'.format(tika_results))
self.tikafield2datafield(tika_results, data, 'Content-Type', 'content_type')
self.tikafield2datafield(tika_results, data, 'X-TIKA:content', 'content')
self.tikafield2datafield(tika_results, data, 'Author', 'author')
# author_s is not multivalued, so if more than one author, join
if 'author' in data:
if isinstance(data['author'], list):
data['author'] = ', '.join(data['author'])
self.tikafield2datafield(tika_results, data, 'Content-Length', 'file_size_i')
self.tikafield2datafield(tika_results, data, 'Content-Encoding', 'encoding_s')
self.tikafield2datafield(tika_results, data, 'title', 'title')
self.tikafield2datafield(tika_results, data, 'subject', 'subject')
self.tikafield2datafield(tika_results, data, 'description', 'description')
self.tikafield2datafield(tika_results, data, 'comments', 'comments')
self.tikafield2datafield(tika_results, data, 'last_modified', 'last_modified')
self.tikafield2datafield(tika_results, data, 'Keywords', 'keywords')
self.tikafield2datafield(tika_results, data, 'Category', 'category')
self.tikafield2datafield(tika_results, data, 'resourceName', 'resourcename')
self.tikafield2datafield(tika_results, data, 'url', 'url')
self.tikafield2datafield(tika_results, data, 'links', 'links')
# email & messages
self.tikafield2datafield(tika_results, data, 'Message-From', 'message_from_ss')
self.tikafield2datafield(tika_results, data, 'Message-To', 'message_to_ss')
self.tikafield2datafield(tika_results, data, 'Message-CC', 'message_cc_ss')
self.tikafield2datafield(tika_results, data, 'Message-BCC', 'message_bcc_ss')
# error handling
else:
# Tika returned error (HTTP status code <> 200)
errormessage = tika_result_IO
# todo: raise exception, so this part is done by ETL
sys.stderr.write( "Error (HTTP-Status-Code: {}) while extracting {}\n".format(http_status, filename) )
sys.stderr.write( errormessage )
if 'error_ss' in data:
data['error_ss'].append(errormessage)
else:
data['error_ss'] = [ errormessage ]
data['error_enhance_extract_text_tika_server_t'] = errormessage
data['content']=''
return parameters, data