Skip to content

Commit

Permalink
Upgrade to Neo4j 4
Browse files Browse the repository at this point in the history
  • Loading branch information
beatro0t committed Aug 7, 2021
1 parent ca1d5ec commit 3150b68
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 53 deletions.
5 changes: 4 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM neo4j:3.5.13
FROM neo4j:4.3.2-community

COPY . /opt/awspx
WORKDIR /opt/awspx
Expand All @@ -25,6 +25,9 @@ RUN apt -y update && apt install -y \
&& npm install -g npm@latest

RUN cd /opt/awspx/www && npm install
RUN gosu neo4j wget -q --timeout 300 --tries 30 --output-document=/var/lib/neo4j/plugins/apoc.jar \
https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/4.3.0.0/apoc-4.3.0.0-all.jar \
&& chmod 644 /var/lib/neo4j/plugins/apoc.jar

VOLUME /opt/awspx/data
EXPOSE 7373 7474 7687 80
9 changes: 4 additions & 5 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,7 @@ function install(){
-p 127.0.0.1:7373:7373 \
-p 127.0.0.1:7474:7474 \
-v ${MOUNT}/data:/opt/awspx/data:z \
-e NEO4J_apoc_import_file_enabled=true \
-e NEO4J_dbms_security_procedures_unrestricted=apoc.\\\* \
-e NEO4JLABS_PLUGINS=\[\"apoc\"\] \
-e NEO4J_dbms_security_procedures_unrestricted=apoc.jar
--restart=always beatro0t/awspx:latest >/dev/null; then

cp $(dirname $0)/data/sample.zip -f ${MOUNT}/data/. >/dev/null 2>&1
Expand All @@ -139,16 +137,17 @@ function install(){
function hook(){

if [[ "${@}" == "neo4j" ]]; then

# Start web interface
[[ -z "$(pgrep npm)" ]] \
&& cd /opt/awspx/www \
&& nohup npm run serve>/dev/null 2>&1 &

# Start neo4j
nohup /docker-entrypoint.sh neo4j console 2>&1 &
nohup bash /docker-entrypoint.sh neo4j console 2>&1 &

# Start bash so /docker-entrypoint.sh doesn't terminate
bash
exec bash
fi

}
Expand Down
12 changes: 6 additions & 6 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def handle_db(args, console=console):
if args.load_zips:

db.load_zips(archives=args.load_zips,
db=args.database if 'database' in args else 'default.db')
db=args.database if 'database' in args else 'default')

elif args.list_dbs:
db.list()
Expand All @@ -198,8 +198,8 @@ def profile(p):
return p

def database(p):
if re.compile("[A-Za-z0-9-_]+.db").match(p) is None:
suggestion = re.sub(r'[^A-Za-z0-9-_]+(\.db)?', '', p) + ".db"
if re.compile("[A-Za-z0-9-_]+").match(p) is None:
suggestion = re.sub(r'[^A-Za-z0-9-_]+', '', p)
raise argparse.ArgumentTypeError(
f"'{p}' is invalid, perhaps you meant '{suggestion}' instead?")
return p
Expand Down Expand Up @@ -302,7 +302,7 @@ def attack(name):
pnr.add_argument('--region', dest='region', default="eu-west-1", choices=Profile.regions,
help="Region to ingest (defaults to profile region, or `eu-west-1` if not set).")
pnr.add_argument('--database', dest='database', default=None, type=database,
help="Database to store results (defaults to <profile>.db).")
help="Database to store results (defaults to <profile>).")

# Services & resources args
snr = ingest_parser.add_argument_group("Services and resources")
Expand Down Expand Up @@ -384,9 +384,9 @@ def attack(name):

args = parser.parse_args()

# Unless a database has been defined for ingest, default to <profile>.db
# Unless a database has been defined for ingest, default to <profile>
if 'database' in args and args.database is None:
args.database = f"{args.profile}.db"
args.database = f"{args.profile}"

if 'verbose' in args and args.verbose:
console.verbose()
Expand Down
7 changes: 2 additions & 5 deletions lib/aws/ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class IngestionManager(Elements):
zip = None

def __init__(self, session, console=None,
services=[], db="default.db",
services=[], db="default",
quick=False, skip_actions=False,
only_types=[], skip_types=[],
only_arns=[], skip_arns=[]):
Expand Down Expand Up @@ -268,15 +268,12 @@ def load_actions(self):
self.update(acl.principals())
self.update(acl.actions())

def save(self, db="default.db", path="/opt/awspx/data"):
def save(self, db="default", path="/opt/awspx/data"):

archive = None
edge_files = []
node_files = []

if not db.endswith(".db"):
db = "%s.db" % db

directory = f"{datetime.now().strftime('%Y%m%d%H%M%S%f')}_{db.split('.')[0]}"
labels = sorted(list(set([
next((l for l in e.labels()
Expand Down
77 changes: 41 additions & 36 deletions lib/graph/db.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import datetime
import os
import re
import shutil
import sys
import subprocess
import sys
import time
import warnings

from neo4j import GraphDatabase, exceptions
import datetime
from neo4j import ExperimentalWarning, GraphDatabase, exceptions

warnings.filterwarnings("ignore", category=ExperimentalWarning)

NEO4J_DB_DIR = "/data/databases"
NEO4J_ZIP_DIR = "/opt/awspx/data"
NEO4J_CONF_DIR = "/var/lib/neo4j/conf"
NEO4J_TRANS_DIR = "/data/transactions"


class Neo4j(object):
Expand All @@ -22,8 +25,7 @@ class Neo4j(object):
if z.endswith(".zip")]

databases = [db for db in os.listdir(f"{NEO4J_DB_DIR}/")
if os.path.isdir(f"{NEO4J_DB_DIR}/{db}")
and db.endswith(".db")]
if os.path.isdir(f"{NEO4J_DB_DIR}/{db}")]

def __init__(self,
host="localhost",
Expand All @@ -49,7 +51,9 @@ def _start(self):
while retries < max_retries and not self.available():

if retries == 0:
subprocess.Popen(["nohup", "/docker-entrypoint.sh", "neo4j", "console", "&"],

subprocess.Popen(["nohup", "/docker-entrypoint.sh",
"neo4j", "console", "&"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
time.sleep(1)
Expand All @@ -58,6 +62,7 @@ def _start(self):
if not self.available():
self.console.critical("Neo4j failed to start")
return False

elif retries == 0:
self.console.info("Neo4j has already been started")
else:
Expand All @@ -78,7 +83,13 @@ def _stop(self):
if self.running():
self.console.critical("Neo4j failed to stop")
return False
elif retries == 0:

subprocess.Popen(["rm", "-f", f"{NEO4J_DB_DIR}/store_lock",
f"{NEO4J_DB_DIR}/system/database_lock"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)

if retries == 0:
self.console.info("Neo4j has already been stopped")
else:
self.console.info("Neo4j has successfully been stopped")
Expand All @@ -87,20 +98,20 @@ def _stop(self):

def _delete(self, db):

subprocess.Popen(["rm", "-rf", f"{NEO4J_DB_DIR}/{db}"])
subprocess.Popen(["rm", "-rf", f"{NEO4J_DB_DIR}/{db}",
f"{NEO4J_TRANS_DIR}/{db}"])

def _run(self, tx, cypher):
results = tx.run(cypher)
return results

def _switch_database(self, db):

subprocess.Popen([
"sed", "-i",
'/^\(#\)\{0,1\}dbms.active_database=/s/.*/dbms.active_database=%s/' % db,
f"{NEO4J_CONF_DIR}/neo4j.conf"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT).communicate()
subprocess.Popen(["sed", "-i",
'/^\(#\)\{0,1\}dbms.default_database=/s/.*/dbms.default_database=%s/' % db,
f"{NEO4J_CONF_DIR}/neo4j.conf"
], stdout=subprocess.PIPE, stderr=subprocess.STDOUT
).communicate()

def _load(self, archives, db):

Expand Down Expand Up @@ -162,38 +173,30 @@ def _load(self, archives, db):
with open(f'{v["DIR"]}/{c}', 'w') as f:
f.write('\n'.join(rows))

directory = ARCHIVES[list(ARCHIVES.keys())[0]]["DIR"]

csvs = [f"{a['DIR']}/{csv}"
for a in ARCHIVES.values()
csvs = [f"{a['DIR']}/{csv}" for a in ARCHIVES.values()
for csv in a["CSV"]]

edges = [e for e in csvs
if re.compile("(.*/)?([A-Z]+)\.csv").match(e)]

nodes = [n for n in csvs
if n not in edges]

self._delete(db)

with open(f"{directory}/config.txt", "w") as config:

conf = ' '.join([
"--report-file /dev/null",
"--ignore-missing-nodes=true",
"--ignore-duplicate-nodes=true",
"--multiline-fields=true",
f"--database {db}",
' '.join([f"--nodes={n}" for n in nodes]),
' '.join([f"--relationships={e}" for e in edges]),
])

config.write(conf)

stdout, _ = subprocess.Popen(["/docker-entrypoint.sh", "neo4j-admin", "import", "--f", f"{directory}/config.txt"],
stdout, _ = subprocess.Popen(["/docker-entrypoint.sh", "neo4j-admin", "import",
"--report-file", "/dev/null",
"--skip-duplicate-nodes", "true",
"--skip-bad-relationships", "true",
"--multiline-fields=true",
f"--database={db}",
*[f"--nodes={n}" for n in nodes],
*[f"--relationships={e}" for e in edges]],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT).communicate()

subprocess.Popen(["rm", "-rf", *[a["DIR"] for a in ARCHIVES.values()]])

stats = re.compile("([0-9a-zA-Z]+)."
"Imported:([0-9]+)nodes"
"([0-9]+)relationships"
Expand Down Expand Up @@ -231,10 +234,12 @@ def open(self):
def close(self):
if self.driver is not None:
self.driver.close()
self.driver = None

def available(self):
try:
self.open()
self.driver.verify_connectivity()
except Exception:
return False
return True
Expand All @@ -251,7 +256,7 @@ def use(self, db):
self.console.task("Starting Neo4j",
self._start, done="Started Neo4j")

def load_zips(self, archives=[], db='default.db'):
def load_zips(self, archives=[], db='neo4j'):

archives = [f"{NEO4J_ZIP_DIR}/{a}"
if not a.startswith(f"{NEO4J_ZIP_DIR}/") else a
Expand Down Expand Up @@ -285,7 +290,7 @@ def list(self):

def run(self, cypher):

results = []
results = []

if not self.available():
self._start()
Expand All @@ -296,5 +301,5 @@ def run(self, cypher):

except exceptions.CypherSyntaxError as e:
self.console.error(str(e))

return results

0 comments on commit 3150b68

Please sign in to comment.