Skip to content
4 changes: 1 addition & 3 deletions examples/ssh_remote_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

from collections import defaultdict

from luigi import six

import luigi
from luigi.contrib.ssh import RemoteContext, RemoteTarget
from luigi.mock import MockTarget
Expand Down Expand Up @@ -77,7 +75,7 @@ def run(self):
processes_per_user[username] += 1

toplist = sorted(
six.iteritems(processes_per_user),
processes_per_user.items(),
key=lambda x: x[1],
reverse=True
)
Expand Down
4 changes: 1 addition & 3 deletions examples/top_artists.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
from collections import defaultdict
from heapq import nlargest

from luigi import six

import luigi
import luigi.contrib.hdfs
import luigi.contrib.postgres
Expand Down Expand Up @@ -132,7 +130,7 @@ def run(self):
artist_count[artist] += 1

with self.output().open('w') as out_file:
for artist, count in six.iteritems(artist_count):
for artist, count in artist_count.items():
out_file.write('{}\t{}\n'.format(artist, count))


Expand Down
4 changes: 1 addition & 3 deletions examples/wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from luigi import six

import luigi


Expand Down Expand Up @@ -75,6 +73,6 @@ def run(self):

# output data
f = self.output().open('w')
for word, count in six.iteritems(count):
for word, count in count.items():
f.write("%s\t%d\n" % (word, count))
f.close() # WARNING: file system operations are atomic therefore if you don't close the file you lose all data
12 changes: 6 additions & 6 deletions luigi/batch_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def _key(self, task_name, family, unbatched_args):
elif self._config.batch_mode == 'family':
return family
elif self._config.batch_mode == 'unbatched_params':
param_str = six.u(', ').join(six.u('{}={}').format(*kv) for kv in six.iteritems(unbatched_args))
param_str = six.u(', ').join(six.u('{}={}').format(*kv) for kv in unbatched_args.items())
return six.u('{}({})').format(family, param_str)
else:
raise ValueError('Unknown batch mode for batch notifier: {}'.format(
Expand Down Expand Up @@ -136,12 +136,12 @@ def add_scheduling_fail(self, task_name, family, unbatched_args, expl, owners):

def _task_expl_groups(self, expls):
if not self._config.group_by_error_messages:
return [((task,), msg) for task, msg in six.iteritems(expls)]
return [((task,), msg) for task, msg in expls.items()]

groups = collections.defaultdict(list)
for task, msg in six.iteritems(expls):
for task, msg in expls.items():
groups[msg].append(task)
return [(tasks, msg) for msg, tasks in six.iteritems(groups)]
return [(tasks, msg) for msg, tasks in groups.items()]

def _expls_key(self, expls_tuple):
expls = expls_tuple[0]
Expand All @@ -156,7 +156,7 @@ def _expl_key(self, expl):
def _email_body(self, fail_counts, disable_counts, scheduling_counts, fail_expls):
expls = {
(name, fail_count, disable_counts[name], scheduling_counts[name]): self._expl_body(fail_expls[name])
for name, fail_count in six.iteritems(fail_counts)
for name, fail_count in fail_counts.items()
}
expl_groups = sorted(self._task_expl_groups(expls), key=self._expls_key)
body_lines = []
Expand Down Expand Up @@ -188,7 +188,7 @@ def _send_email(self, fail_counts, disable_counts, scheduling_counts, fail_expls

def send_email(self):
try:
for owner, failures in six.iteritems(self._fail_counts):
for owner, failures in self._fail_counts.items():
self._send_email(
fail_counts=failures,
disable_counts=self._disabled_counts[owner],
Expand Down
5 changes: 2 additions & 3 deletions luigi/contrib/hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import warnings
from hashlib import md5
from itertools import groupby
from luigi import six

from luigi import configuration
import luigi
Expand Down Expand Up @@ -507,7 +506,7 @@ def run_job(self, job, tracking_url_callback=None):

jobconfs = job.jobconfs()

for k, v in six.iteritems(self.jobconfs):
for k, v in self.jobconfs.items():
jobconfs.append('%s=%s' % (k, v))

for conf in jobconfs:
Expand Down Expand Up @@ -886,7 +885,7 @@ def _flush_batch_incr_counter(self):
"""
Increments any unflushed counter values.
"""
for key, count in six.iteritems(self._counter_dict):
for key, count in self._counter_dict.items():
if count == 0:
continue
args = list(key) + [count]
Expand Down
12 changes: 5 additions & 7 deletions luigi/contrib/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import tempfile
import warnings

from luigi import six

import luigi
import luigi.contrib.hadoop
from luigi.contrib.hdfs import get_autoconfig_client
Expand Down Expand Up @@ -191,7 +189,7 @@ def partition_spec(self, partition):
Turns a dict into the a Hive partition specification string.
"""
return ','.join(["`{0}`='{1}'".format(k, v) for (k, v) in
sorted(six.iteritems(partition), key=operator.itemgetter(0))])
sorted(partition.items(), key=operator.itemgetter(0))])


class ApacheHiveCommandClient(HiveCommandClient):
Expand Down Expand Up @@ -246,7 +244,7 @@ def table_schema(self, table, database='default'):
return [(field_schema.name, field_schema.type) for field_schema in client.get_schema(database, table)]

def partition_spec(self, partition):
return "/".join("%s=%s" % (k, v) for (k, v) in sorted(six.iteritems(partition), key=operator.itemgetter(0)))
return "/".join("%s=%s" % (k, v) for (k, v) in sorted(partition.items(), key=operator.itemgetter(0)))


class HiveThriftContext(object):
Expand Down Expand Up @@ -321,7 +319,7 @@ def table_exists(self, table, database='default', partition=None):
def partition_spec(self, partition):
_validate_partition(partition)
return '/'.join([
'{}={}'.format(k, v) for (k, v) in six.iteritems(partition or {})
'{}={}'.format(k, v) for (k, v) in (partition or {}).items()
])


Expand Down Expand Up @@ -440,11 +438,11 @@ def get_arglist(self, f_name, job):
arglist += ['-i', rcfile]
hiveconfs = job.hiveconfs()
if hiveconfs:
for k, v in six.iteritems(hiveconfs):
for k, v in hiveconfs.items():
arglist += ['--hiveconf', '{0}={1}'.format(k, v)]
hivevars = job.hivevars()
if hivevars:
for k, v in six.iteritems(hivevars):
for k, v in hivevars.items():
arglist += ['--hivevar', '{0}={1}'.format(k, v)]
logger.info(arglist)
return arglist
Expand Down
10 changes: 4 additions & 6 deletions luigi/contrib/pig.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import sys
import tempfile

from luigi import six

import luigi
from luigi import configuration

Expand Down Expand Up @@ -104,15 +102,15 @@ def line(k, v):

with tempfile.NamedTemporaryFile() as param_file, tempfile.NamedTemporaryFile() as prop_file:
if self.pig_parameters():
items = six.iteritems(self.pig_parameters())
items = self.pig_parameters().items()
param_file.writelines(line(k, v) for (k, v) in items)
param_file.flush()
opts.append('-param_file')
opts.append(param_file.name)

if self.pig_properties():
items = six.iteritems(self.pig_properties())
prop_file.writelines(line(k, v) for (k, v) in items)
items = self.pig_properties().items()
prop_file.writelines(line(k, v) for k, v in items)
prop_file.flush()
opts.append('-propertyFile')
opts.append(prop_file.name)
Expand All @@ -130,7 +128,7 @@ def track_and_progress(self, cmd):
temp_stdout = tempfile.TemporaryFile('wb')
env = os.environ.copy()
env['PIG_HOME'] = self.pig_home()
for k, v in six.iteritems(self.pig_env_vars()):
for k, v in self.pig_env_vars().items():
env[k] = v

proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
Expand Down
4 changes: 1 addition & 3 deletions luigi/contrib/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@

from configparser import NoSectionError

from luigi import six

from luigi import configuration
from luigi.format import get_default_format
from luigi.parameter import OptionalParameter, Parameter
Expand Down Expand Up @@ -504,7 +502,7 @@ def _get_s3_config(key=None):
except (NoSectionError, KeyError):
return {}
# So what ports etc can be read without us having to specify all dtypes
for k, v in six.iteritems(config):
for k, v in config.items():
try:
config[k] = int(v)
except ValueError:
Expand Down
4 changes: 2 additions & 2 deletions luigi/contrib/salesforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def parse_results(fields, data):

for record in data['records']: # for each 'record' in response
row = [None] * len(fields) # create null list the length of number of columns
for obj, value in record.iteritems(): # for each obj in record
for obj, value in record.items(): # for each obj in record
if not isinstance(value, (dict, list, tuple)): # if not data structure
if obj in fields:
row[fields.index(obj)] = ensure_utf(value)
Expand All @@ -79,7 +79,7 @@ def _traverse_results(value, fields, row, path):

Traverses through ordered dict and recursively calls itself when encountering a dictionary
"""
for f, v in value.iteritems(): # for each item in obj
for f, v in value.items(): # for each item in obj
field_name = '{path}.{name}'.format(path=path, name=f) if path else f

if not isinstance(v, (dict, list, tuple)): # if not data structure
Expand Down
4 changes: 1 addition & 3 deletions luigi/contrib/scalding.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import subprocess
import warnings

from luigi import six

import luigi.configuration
import luigi.contrib.hadoop
import luigi.contrib.hadoop_jar
Expand Down Expand Up @@ -302,7 +300,7 @@ def args(self):
Returns an array of args to pass to the job.
"""
arglist = []
for k, v in six.iteritems(self.requires_hadoop()):
for k, v in self.requires_hadoop().items():
arglist.append('--' + k)
arglist.extend([t.output().path for t in flatten(v)])
arglist.extend(['--output', self.output()])
Expand Down
7 changes: 3 additions & 4 deletions luigi/db_task_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import logging
from contextlib import contextmanager

from luigi import six

from luigi import configuration
from luigi import task_history
Expand Down Expand Up @@ -123,7 +122,7 @@ def _find_or_create_task(self, task):
yield (task_record, session)
else:
task_record = TaskRecord(task_id=task._task.id, name=task.task_family, host=task.host)
for (k, v) in six.iteritems(task.parameters):
for k, v in task.parameters.items():
task_record.parameters[k] = TaskParameter(name=k, value=v)
session.add(task_record)
yield (task_record, session)
Expand All @@ -137,14 +136,14 @@ def find_all_by_parameters(self, task_name, session=None, **task_params):
"""
with self._session(session) as session:
query = session.query(TaskRecord).join(TaskEvent).filter(TaskRecord.name == task_name)
for (k, v) in six.iteritems(task_params):
for k, v in task_params.items():
alias = sqlalchemy.orm.aliased(TaskParameter)
query = query.join(alias).filter(alias.name == k, alias.value == v)

tasks = query.order_by(TaskEvent.ts)
for task in tasks:
# Sanity check
assert all(k in task.parameters and v == str(task.parameters[k].value) for (k, v) in six.iteritems(task_params))
assert all(k in task.parameters and v == str(task.parameters[k].value) for k, v in task_params.items())

yield task

Expand Down
3 changes: 1 addition & 2 deletions luigi/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

from luigi import date_interval
from luigi import task_register
from luigi import six
from luigi import configuration
from luigi.cmdline_parser import CmdlineParser

Expand Down Expand Up @@ -757,7 +756,7 @@ def _apply_regex(self, regex, input):
if re_match and any(re_match.groups()):
kwargs = {}
has_val = False
for k, v in six.iteritems(re_match.groupdict(default="0")):
for k, v in re_match.groupdict(default="0").items():
val = int(v)
if val > -1:
has_val = True
Expand Down
Loading